]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // Copyright (C) 2004-2008 The Trustees of Indiana University. |
2 | // Copyright (C) 2007 Douglas Gregor | |
3 | // Copyright (C) 2007 Matthias Troyer <troyer@boost-consulting.com> | |
4 | ||
5 | // Use, modification and distribution is subject to the Boost Software | |
6 | // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at | |
7 | // http://www.boost.org/LICENSE_1_0.txt) | |
8 | ||
9 | // Authors: Douglas Gregor | |
10 | // Matthias Troyer | |
11 | // Andrew Lumsdaine | |
12 | #ifndef BOOST_GRAPH_DISTRIBUTED_MPI_PROCESS_GROUP | |
13 | #define BOOST_GRAPH_DISTRIBUTED_MPI_PROCESS_GROUP | |
14 | ||
15 | #ifndef BOOST_GRAPH_USE_MPI | |
16 | #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included" | |
17 | #endif | |
18 | ||
19 | //#define NO_SPLIT_BATCHES | |
20 | #define SEND_OOB_BSEND | |
21 | ||
22 | #include <boost/optional.hpp> | |
23 | #include <boost/shared_ptr.hpp> | |
24 | #include <boost/weak_ptr.hpp> | |
25 | #include <utility> | |
26 | #include <memory> | |
27 | #include <boost/function/function1.hpp> | |
28 | #include <boost/function/function2.hpp> | |
29 | #include <boost/function/function0.hpp> | |
30 | #include <boost/mpi.hpp> | |
31 | #include <boost/property_map/parallel/process_group.hpp> | |
32 | #include <boost/serialization/vector.hpp> | |
33 | #include <boost/utility/enable_if.hpp> | |
34 | ||
35 | namespace boost { namespace graph { namespace distributed { | |
36 | ||
37 | // Process group tags | |
38 | struct mpi_process_group_tag : virtual boost::parallel::linear_process_group_tag { }; | |
39 | ||
40 | class mpi_process_group | |
41 | { | |
42 | struct impl; | |
43 | ||
44 | public: | |
45 | /// Number of tags available to each data structure. | |
46 | static const int max_tags = 256; | |
47 | ||
48 | /** | |
49 | * The type of a "receive" handler, that will be provided with | |
50 | * (source, tag) pairs when a message is received. Users can provide a | |
51 | * receive handler for a distributed data structure, for example, to | |
52 | * automatically pick up and respond to messages as needed. | |
53 | */ | |
54 | typedef function<void(int source, int tag)> receiver_type; | |
55 | ||
56 | /** | |
57 | * The type of a handler for the on-synchronize event, which will be | |
58 | * executed at the beginning of synchronize(). | |
59 | */ | |
60 | typedef function0<void> on_synchronize_event_type; | |
61 | ||
62 | /// Used as a tag to help create an "empty" process group. | |
63 | struct create_empty {}; | |
64 | ||
65 | /// The type used to buffer message data | |
66 | typedef boost::mpi::packed_oprimitive::buffer_type buffer_type; | |
67 | ||
68 | /// The type used to identify a process | |
69 | typedef int process_id_type; | |
70 | ||
71 | /// The type used to count the number of processes | |
72 | typedef int process_size_type; | |
73 | ||
74 | /// The type of communicator used to transmit data via MPI | |
75 | typedef boost::mpi::communicator communicator_type; | |
76 | ||
77 | /// Classification of the capabilities of this process group | |
78 | struct communication_category | |
79 | : virtual boost::parallel::bsp_process_group_tag, | |
80 | virtual mpi_process_group_tag { }; | |
81 | ||
82 | // TBD: We can eliminate the "source" field and possibly the | |
83 | // "offset" field. | |
84 | struct message_header { | |
85 | /// The process that sent the message | |
86 | process_id_type source; | |
87 | ||
88 | /// The message tag | |
89 | int tag; | |
90 | ||
91 | /// The offset of the message into the buffer | |
92 | std::size_t offset; | |
93 | ||
94 | /// The length of the message in the buffer, in bytes | |
95 | std::size_t bytes; | |
96 | ||
97 | template <class Archive> | |
98 | void serialize(Archive& ar, int) | |
99 | { | |
100 | ar & source & tag & offset & bytes; | |
101 | } | |
102 | }; | |
103 | ||
104 | /** | |
105 | * Stores the outgoing messages for a particular processor. | |
106 | * | |
107 | * @todo Evaluate whether we should use a deque instance, which | |
108 | * would reduce could reduce the cost of "sending" messages but | |
109 | * increases the time spent in the synchronization step. | |
110 | */ | |
111 | struct outgoing_messages { | |
112 | outgoing_messages() {} | |
113 | ~outgoing_messages() {} | |
114 | ||
115 | std::vector<message_header> headers; | |
116 | buffer_type buffer; | |
117 | ||
118 | template <class Archive> | |
119 | void serialize(Archive& ar, int) | |
120 | { | |
121 | ar & headers & buffer; | |
122 | } | |
123 | ||
124 | void swap(outgoing_messages& x) | |
125 | { | |
126 | headers.swap(x.headers); | |
127 | buffer.swap(x.buffer); | |
128 | } | |
129 | }; | |
130 | ||
131 | private: | |
132 | /** | |
133 | * Virtual base from which every trigger will be launched. See @c | |
134 | * trigger_launcher for more information. | |
135 | */ | |
136 | class trigger_base : boost::noncopyable | |
137 | { | |
138 | public: | |
139 | explicit trigger_base(int tag) : tag_(tag) { } | |
140 | ||
141 | /// Retrieve the tag associated with this trigger | |
142 | int tag() const { return tag_; } | |
143 | ||
144 | virtual ~trigger_base() { } | |
145 | ||
146 | /** | |
147 | * Invoked to receive a message that matches a particular trigger. | |
148 | * | |
149 | * @param source the source of the message | |
150 | * @param tag the (local) tag of the message | |
151 | * @param context the context under which the trigger is being | |
152 | * invoked | |
153 | */ | |
154 | virtual void | |
155 | receive(mpi_process_group const& pg, int source, int tag, | |
156 | trigger_receive_context context, int block=-1) const = 0; | |
157 | ||
158 | protected: | |
159 | // The message tag associated with this trigger | |
160 | int tag_; | |
161 | }; | |
162 | ||
163 | /** | |
164 | * Launches a specific handler in response to a trigger. This | |
165 | * function object wraps up the handler function object and a buffer | |
166 | * for incoming data. | |
167 | */ | |
168 | template<typename Type, typename Handler> | |
169 | class trigger_launcher : public trigger_base | |
170 | { | |
171 | public: | |
172 | explicit trigger_launcher(mpi_process_group& self, int tag, | |
173 | const Handler& handler) | |
174 | : trigger_base(tag), self(self), handler(handler) | |
175 | {} | |
176 | ||
177 | void | |
178 | receive(mpi_process_group const& pg, int source, int tag, | |
179 | trigger_receive_context context, int block=-1) const; | |
180 | ||
181 | private: | |
182 | mpi_process_group& self; | |
183 | mutable Handler handler; | |
184 | }; | |
185 | ||
186 | /** | |
187 | * Launches a specific handler with a message reply in response to a | |
188 | * trigger. This function object wraps up the handler function | |
189 | * object and a buffer for incoming data. | |
190 | */ | |
191 | template<typename Type, typename Handler> | |
192 | class reply_trigger_launcher : public trigger_base | |
193 | { | |
194 | public: | |
195 | explicit reply_trigger_launcher(mpi_process_group& self, int tag, | |
196 | const Handler& handler) | |
197 | : trigger_base(tag), self(self), handler(handler) | |
198 | {} | |
199 | ||
200 | void | |
201 | receive(mpi_process_group const& pg, int source, int tag, | |
202 | trigger_receive_context context, int block=-1) const; | |
203 | ||
204 | private: | |
205 | mpi_process_group& self; | |
206 | mutable Handler handler; | |
207 | }; | |
208 | ||
209 | template<typename Type, typename Handler> | |
210 | class global_trigger_launcher : public trigger_base | |
211 | { | |
212 | public: | |
213 | explicit global_trigger_launcher(mpi_process_group& self, int tag, | |
214 | const Handler& handler) | |
215 | : trigger_base(tag), handler(handler) | |
216 | { | |
217 | } | |
218 | ||
219 | void | |
220 | receive(mpi_process_group const& pg, int source, int tag, | |
221 | trigger_receive_context context, int block=-1) const; | |
222 | ||
223 | private: | |
224 | mutable Handler handler; | |
225 | // TBD: do not forget to cancel any outstanding Irecv when deleted, | |
226 | // if we decide to use Irecv | |
227 | }; | |
228 | ||
229 | template<typename Type, typename Handler> | |
230 | class global_irecv_trigger_launcher : public trigger_base | |
231 | { | |
232 | public: | |
233 | explicit global_irecv_trigger_launcher(mpi_process_group& self, int tag, | |
234 | const Handler& handler, int sz) | |
235 | : trigger_base(tag), handler(handler), buffer_size(sz) | |
236 | { | |
237 | prepare_receive(self,tag); | |
238 | } | |
239 | ||
240 | void | |
241 | receive(mpi_process_group const& pg, int source, int tag, | |
242 | trigger_receive_context context, int block=-1) const; | |
243 | ||
244 | private: | |
245 | void prepare_receive(mpi_process_group const& pg, int tag, bool force=false) const; | |
246 | Handler handler; | |
247 | int buffer_size; | |
248 | // TBD: do not forget to cancel any outstanding Irecv when deleted, | |
249 | // if we decide to use Irecv | |
250 | }; | |
251 | ||
252 | public: | |
253 | /** | |
254 | * Construct a new BSP process group from an MPI communicator. The | |
255 | * MPI communicator will be duplicated to create a new communicator | |
256 | * for this process group to use. | |
257 | */ | |
258 | mpi_process_group(communicator_type parent_comm = communicator_type()); | |
259 | ||
260 | /** | |
261 | * Construct a new BSP process group from an MPI communicator. The | |
262 | * MPI communicator will be duplicated to create a new communicator | |
263 | * for this process group to use. This constructor allows to tune the | |
264 | * size of message batches. | |
265 | * | |
266 | * @param num_headers The maximum number of headers in a message batch | |
267 | * | |
268 | * @param buffer_size The maximum size of the message buffer in a batch. | |
269 | * | |
270 | */ | |
271 | mpi_process_group( std::size_t num_headers, std::size_t buffer_size, | |
272 | communicator_type parent_comm = communicator_type()); | |
273 | ||
274 | /** | |
275 | * Construct a copy of the BSP process group for a new distributed | |
276 | * data structure. This data structure will synchronize with all | |
277 | * other members of the process group's equivalence class (including | |
278 | * @p other), but will have its own set of tags. | |
279 | * | |
280 | * @param other The process group that this new process group will | |
281 | * be based on, using a different set of tags within the same | |
282 | * communication and synchronization space. | |
283 | * | |
284 | * @param handler A message handler that will be passed (source, | |
285 | * tag) pairs for each message received by this data | |
286 | * structure. The handler is expected to receive the messages | |
287 | * immediately. The handler can be changed after-the-fact by | |
288 | * calling @c replace_handler. | |
289 | * | |
290 | * @param out_of_band_receive An anachronism. TODO: remove this. | |
291 | */ | |
292 | mpi_process_group(const mpi_process_group& other, | |
293 | const receiver_type& handler, | |
294 | bool out_of_band_receive = false); | |
295 | ||
296 | /** | |
297 | * Construct a copy of the BSP process group for a new distributed | |
298 | * data structure. This data structure will synchronize with all | |
299 | * other members of the process group's equivalence class (including | |
300 | * @p other), but will have its own set of tags. | |
301 | */ | |
302 | mpi_process_group(const mpi_process_group& other, | |
303 | attach_distributed_object, | |
304 | bool out_of_band_receive = false); | |
305 | ||
306 | /** | |
307 | * Create an "empty" process group, with no information. This is an | |
308 | * internal routine that users should never need. | |
309 | */ | |
310 | explicit mpi_process_group(create_empty) {} | |
311 | ||
312 | /** | |
313 | * Destroys this copy of the process group. | |
314 | */ | |
315 | ~mpi_process_group(); | |
316 | ||
317 | /** | |
318 | * Replace the current message handler with a new message handler. | |
319 | * | |
320 | * @param handle The new message handler. | |
321 | * @param out_of_band_receive An anachronism: remove this | |
322 | */ | |
323 | void replace_handler(const receiver_type& handler, | |
324 | bool out_of_band_receive = false); | |
325 | ||
326 | /** | |
327 | * Turns this process group into the process group for a new | |
328 | * distributed data structure or object, allocating its own tag | |
329 | * block. | |
330 | */ | |
331 | void make_distributed_object(); | |
332 | ||
333 | /** | |
334 | * Replace the handler to be invoked at the beginning of synchronize. | |
335 | */ | |
336 | void | |
337 | replace_on_synchronize_handler(const on_synchronize_event_type& handler = 0); | |
338 | ||
339 | /** | |
340 | * Return the block number of the current data structure. A value of | |
341 | * 0 indicates that this particular instance of the process group is | |
342 | * not associated with any distributed data structure. | |
343 | */ | |
344 | int my_block_number() const { return block_num? *block_num : 0; } | |
345 | ||
346 | /** | |
347 | * Encode a block number/tag pair into a single encoded tag for | |
348 | * transmission. | |
349 | */ | |
350 | int encode_tag(int block_num, int tag) const | |
351 | { return block_num * max_tags + tag; } | |
352 | ||
353 | /** | |
354 | * Decode an encoded tag into a block number/tag pair. | |
355 | */ | |
356 | std::pair<int, int> decode_tag(int encoded_tag) const | |
357 | { return std::make_pair(encoded_tag / max_tags, encoded_tag % max_tags); } | |
358 | ||
359 | // @todo Actually write up the friend declarations so these could be | |
360 | // private. | |
361 | ||
362 | // private: | |
363 | ||
364 | /** Allocate a block of tags for this instance. The block should not | |
365 | * have been allocated already, e.g., my_block_number() == | |
366 | * 0. Returns the newly-allocated block number. | |
367 | */ | |
368 | int allocate_block(bool out_of_band_receive = false); | |
369 | ||
370 | /** Potentially emit a receive event out of band. Returns true if an event | |
371 | * was actually sent, false otherwise. | |
372 | */ | |
373 | bool maybe_emit_receive(int process, int encoded_tag) const; | |
374 | ||
375 | /** Emit a receive event. Returns true if an event was actually | |
376 | * sent, false otherwise. | |
377 | */ | |
378 | bool emit_receive(int process, int encoded_tag) const; | |
379 | ||
380 | /** Emit an on-synchronize event to all block handlers. */ | |
381 | void emit_on_synchronize() const; | |
382 | ||
383 | /** Retrieve a reference to the stored receiver in this block. */ | |
384 | template<typename Receiver> | |
385 | Receiver* get_receiver(); | |
386 | ||
387 | template<typename T> | |
388 | void | |
389 | send_impl(int dest, int tag, const T& value, | |
390 | mpl::true_ /*is_mpi_datatype*/) const; | |
391 | ||
392 | template<typename T> | |
393 | void | |
394 | send_impl(int dest, int tag, const T& value, | |
395 | mpl::false_ /*is_mpi_datatype*/) const; | |
396 | ||
397 | template<typename T> | |
398 | typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type | |
399 | array_send_impl(int dest, int tag, const T values[], std::size_t n) const; | |
400 | ||
401 | template<typename T> | |
402 | bool | |
403 | receive_impl(int source, int tag, T& value, | |
404 | mpl::true_ /*is_mpi_datatype*/) const; | |
405 | ||
406 | template<typename T> | |
407 | bool | |
408 | receive_impl(int source, int tag, T& value, | |
409 | mpl::false_ /*is_mpi_datatype*/) const; | |
410 | ||
411 | // Receive an array of values | |
412 | template<typename T> | |
413 | typename disable_if<boost::mpi::is_mpi_datatype<T>, bool>::type | |
414 | array_receive_impl(int source, int tag, T* values, std::size_t& n) const; | |
415 | ||
416 | optional<std::pair<mpi_process_group::process_id_type, int> > probe() const; | |
417 | ||
418 | void synchronize() const; | |
419 | ||
420 | operator bool() { return bool(impl_); } | |
421 | ||
422 | mpi_process_group base() const; | |
423 | ||
424 | /** | |
425 | * Create a new trigger for a specific message tag. Triggers handle | |
426 | * out-of-band messaging, and the handler itself will be called | |
427 | * whenever a message is available. The handler itself accepts four | |
428 | * arguments: the source of the message, the message tag (which will | |
429 | * be the same as @p tag), the message data (of type @c Type), and a | |
430 | * boolean flag that states whether the message was received | |
431 | * out-of-band. The last will be @c true for out-of-band receives, | |
432 | * or @c false for receives at the end of a synchronization step. | |
433 | */ | |
434 | template<typename Type, typename Handler> | |
435 | void trigger(int tag, const Handler& handler); | |
436 | ||
437 | /** | |
438 | * Create a new trigger for a specific message tag, along with a way | |
439 | * to send a reply with data back to the sender. Triggers handle | |
440 | * out-of-band messaging, and the handler itself will be called | |
441 | * whenever a message is available. The handler itself accepts four | |
442 | * arguments: the source of the message, the message tag (which will | |
443 | * be the same as @p tag), the message data (of type @c Type), and a | |
444 | * boolean flag that states whether the message was received | |
445 | * out-of-band. The last will be @c true for out-of-band receives, | |
446 | * or @c false for receives at the end of a synchronization | |
447 | * step. The handler also returns a value, which will be routed back | |
448 | * to the sender. | |
449 | */ | |
450 | template<typename Type, typename Handler> | |
451 | void trigger_with_reply(int tag, const Handler& handler); | |
452 | ||
453 | template<typename Type, typename Handler> | |
454 | void global_trigger(int tag, const Handler& handler, std::size_t buffer_size=0); | |
455 | ||
456 | ||
457 | ||
458 | /** | |
459 | * Poll for any out-of-band messages. This routine will check if any | |
460 | * out-of-band messages are available. Those that are available will | |
461 | * be handled immediately, if possible. | |
462 | * | |
463 | * @returns if an out-of-band message has been received, but we are | |
464 | * unable to actually receive the message, a (source, tag) pair will | |
465 | * be returned. Otherwise, returns an empty optional. | |
466 | * | |
467 | * @param wait When true, we should block until a message comes in. | |
468 | * | |
469 | * @param synchronizing whether we are currently synchronizing the | |
470 | * process group | |
471 | */ | |
472 | optional<std::pair<int, int> > | |
473 | poll(bool wait = false, int block = -1, bool synchronizing = false) const; | |
474 | ||
475 | /** | |
476 | * Determines the context of the trigger currently executing. If | |
477 | * multiple triggers are executing (recursively), then the context | |
478 | * for the most deeply nested trigger will be returned. If no | |
479 | * triggers are executing, returns @c trc_none. This might be used, | |
480 | * for example, to determine whether a reply to a message should | |
481 | * itself be sent out-of-band or whether it can go via the normal, | |
482 | * slower communication route. | |
483 | */ | |
484 | trigger_receive_context trigger_context() const; | |
485 | ||
486 | /// INTERNAL ONLY | |
487 | void receive_batch(process_id_type source, outgoing_messages& batch) const; | |
488 | ||
489 | /// INTERNAL ONLY | |
490 | /// | |
491 | /// Determine the actual communicator and tag will be used for a | |
492 | /// transmission with the given tag. | |
493 | std::pair<boost::mpi::communicator, int> | |
494 | actual_communicator_and_tag(int tag, int block) const; | |
495 | ||
496 | /// set the size of the message buffer used for buffered oob sends | |
497 | ||
498 | static void set_message_buffer_size(std::size_t s); | |
499 | ||
500 | /// get the size of the message buffer used for buffered oob sends | |
501 | ||
502 | static std::size_t message_buffer_size(); | |
503 | static int old_buffer_size; | |
504 | static void* old_buffer; | |
505 | private: | |
506 | ||
507 | void install_trigger(int tag, int block, | |
508 | shared_ptr<trigger_base> const& launcher); | |
509 | ||
510 | void poll_requests(int block=-1) const; | |
511 | ||
512 | ||
513 | // send a batch if the buffer is full now or would get full | |
514 | void maybe_send_batch(process_id_type dest) const; | |
515 | ||
516 | // actually send a batch | |
517 | void send_batch(process_id_type dest, outgoing_messages& batch) const; | |
518 | void send_batch(process_id_type dest) const; | |
519 | ||
520 | void pack_headers() const; | |
521 | ||
522 | /** | |
523 | * Process a batch of incoming messages immediately. | |
524 | * | |
525 | * @param source the source of these messages | |
526 | */ | |
527 | void process_batch(process_id_type source) const; | |
528 | void receive_batch(boost::mpi::status& status) const; | |
529 | ||
530 | //void free_finished_sends() const; | |
531 | ||
532 | /// Status messages used internally by the process group | |
533 | enum status_messages { | |
534 | /// the first of the reserved message tags | |
535 | msg_reserved_first = 126, | |
536 | /// Sent from a processor when sending batched messages | |
537 | msg_batch = 126, | |
538 | /// Sent from a processor when sending large batched messages, larger than | |
539 | /// the maximum buffer size for messages to be received by MPI_Irecv | |
540 | msg_large_batch = 127, | |
541 | /// Sent from a source processor to everyone else when that | |
542 | /// processor has entered the synchronize() function. | |
543 | msg_synchronizing = 128, | |
544 | /// the last of the reserved message tags | |
545 | msg_reserved_last = 128 | |
546 | }; | |
547 | ||
548 | /** | |
549 | * Description of a block of tags associated to a particular | |
550 | * distributed data structure. This structure will live as long as | |
551 | * the distributed data structure is around, and will be used to | |
552 | * help send messages to the data structure. | |
553 | */ | |
554 | struct block_type | |
555 | { | |
556 | block_type() { } | |
557 | ||
558 | /// Handler for receive events | |
559 | receiver_type on_receive; | |
560 | ||
561 | /// Handler executed at the start of synchronization | |
562 | on_synchronize_event_type on_synchronize; | |
563 | ||
564 | /// Individual message triggers. Note: at present, this vector is | |
565 | /// indexed by the (local) tag of the trigger. Any tags that | |
566 | /// don't have triggers will have NULL pointers in that spot. | |
567 | std::vector<shared_ptr<trigger_base> > triggers; | |
568 | }; | |
569 | ||
570 | /** | |
571 | * Data structure containing all of the blocks for the distributed | |
572 | * data structures attached to a process group. | |
573 | */ | |
574 | typedef std::vector<block_type*> blocks_type; | |
575 | ||
576 | /// Iterator into @c blocks_type. | |
577 | typedef blocks_type::iterator block_iterator; | |
578 | ||
579 | /** | |
580 | * Deleter used to deallocate a block when its distributed data | |
581 | * structure is destroyed. This type will be used as the deleter for | |
582 | * @c block_num. | |
583 | */ | |
584 | struct deallocate_block; | |
585 | ||
586 | static std::vector<char> message_buffer; | |
587 | ||
588 | public: | |
589 | /** | |
590 | * Data associated with the process group and all of its attached | |
591 | * distributed data structures. | |
592 | */ | |
593 | shared_ptr<impl> impl_; | |
594 | ||
595 | /** | |
596 | * When non-null, indicates that this copy of the process group is | |
597 | * associated with a particular distributed data structure. The | |
598 | * integer value contains the block number (a value > 0) associated | |
599 | * with that data structure. The deleter for this @c shared_ptr is a | |
600 | * @c deallocate_block object that will deallocate the associated | |
601 | * block in @c impl_->blocks. | |
602 | */ | |
603 | shared_ptr<int> block_num; | |
604 | ||
605 | /** | |
606 | * Rank of this process, to avoid having to call rank() repeatedly. | |
607 | */ | |
608 | int rank; | |
609 | ||
610 | /** | |
611 | * Number of processes in this process group, to avoid having to | |
612 | * call communicator::size() repeatedly. | |
613 | */ | |
614 | int size; | |
615 | }; | |
616 | ||
617 | ||
618 | ||
619 | inline mpi_process_group::process_id_type | |
620 | process_id(const mpi_process_group& pg) | |
621 | { return pg.rank; } | |
622 | ||
623 | inline mpi_process_group::process_size_type | |
624 | num_processes(const mpi_process_group& pg) | |
625 | { return pg.size; } | |
626 | ||
627 | mpi_process_group::communicator_type communicator(const mpi_process_group& pg); | |
628 | ||
629 | template<typename T> | |
630 | void | |
631 | send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, | |
632 | int tag, const T& value); | |
633 | ||
634 | template<typename InputIterator> | |
635 | void | |
636 | send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, | |
637 | int tag, InputIterator first, InputIterator last); | |
638 | ||
639 | template<typename T> | |
640 | inline void | |
641 | send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, | |
642 | int tag, T* first, T* last) | |
643 | { send(pg, dest, tag, first, last - first); } | |
644 | ||
645 | template<typename T> | |
646 | inline void | |
647 | send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, | |
648 | int tag, const T* first, const T* last) | |
649 | { send(pg, dest, tag, first, last - first); } | |
650 | ||
651 | template<typename T> | |
652 | mpi_process_group::process_id_type | |
653 | receive(const mpi_process_group& pg, int tag, T& value); | |
654 | ||
655 | template<typename T> | |
656 | mpi_process_group::process_id_type | |
657 | receive(const mpi_process_group& pg, | |
658 | mpi_process_group::process_id_type source, int tag, T& value); | |
659 | ||
660 | optional<std::pair<mpi_process_group::process_id_type, int> > | |
661 | probe(const mpi_process_group& pg); | |
662 | ||
663 | void synchronize(const mpi_process_group& pg); | |
664 | ||
665 | template<typename T, typename BinaryOperation> | |
666 | T* | |
667 | all_reduce(const mpi_process_group& pg, T* first, T* last, T* out, | |
668 | BinaryOperation bin_op); | |
669 | ||
670 | template<typename T, typename BinaryOperation> | |
671 | T* | |
672 | scan(const mpi_process_group& pg, T* first, T* last, T* out, | |
673 | BinaryOperation bin_op); | |
674 | ||
675 | template<typename InputIterator, typename T> | |
676 | void | |
677 | all_gather(const mpi_process_group& pg, | |
678 | InputIterator first, InputIterator last, std::vector<T>& out); | |
679 | ||
680 | template<typename InputIterator> | |
681 | mpi_process_group | |
682 | process_subgroup(const mpi_process_group& pg, | |
683 | InputIterator first, InputIterator last); | |
684 | ||
685 | template<typename T> | |
686 | void | |
687 | broadcast(const mpi_process_group& pg, T& val, | |
688 | mpi_process_group::process_id_type root); | |
689 | ||
690 | ||
691 | /******************************************************************* | |
692 | * Out-of-band communication * | |
693 | *******************************************************************/ | |
694 | ||
695 | template<typename T> | |
696 | typename enable_if<boost::mpi::is_mpi_datatype<T> >::type | |
697 | send_oob(const mpi_process_group& pg, mpi_process_group::process_id_type dest, | |
698 | int tag, const T& value, int block=-1) | |
699 | { | |
700 | using boost::mpi::get_mpi_datatype; | |
701 | ||
702 | // Determine the actual message tag we will use for the send, and which | |
703 | // communicator we will use. | |
704 | std::pair<boost::mpi::communicator, int> actual | |
705 | = pg.actual_communicator_and_tag(tag, block); | |
706 | ||
707 | #ifdef SEND_OOB_BSEND | |
708 | if (mpi_process_group::message_buffer_size()) { | |
709 | MPI_Bsend(const_cast<T*>(&value), 1, get_mpi_datatype<T>(value), dest, | |
710 | actual.second, actual.first); | |
711 | return; | |
712 | } | |
713 | #endif | |
714 | MPI_Request request; | |
715 | MPI_Isend(const_cast<T*>(&value), 1, get_mpi_datatype<T>(value), dest, | |
716 | actual.second, actual.first, &request); | |
717 | ||
718 | int done=0; | |
719 | do { | |
720 | pg.poll(); | |
721 | MPI_Test(&request,&done,MPI_STATUS_IGNORE); | |
722 | } while (!done); | |
723 | } | |
724 | ||
725 | template<typename T> | |
726 | typename disable_if<boost::mpi::is_mpi_datatype<T> >::type | |
727 | send_oob(const mpi_process_group& pg, mpi_process_group::process_id_type dest, | |
728 | int tag, const T& value, int block=-1) | |
729 | { | |
730 | using boost::mpi::packed_oarchive; | |
731 | ||
732 | // Determine the actual message tag we will use for the send, and which | |
733 | // communicator we will use. | |
734 | std::pair<boost::mpi::communicator, int> actual | |
735 | = pg.actual_communicator_and_tag(tag, block); | |
736 | ||
737 | // Serialize the data into a buffer | |
738 | packed_oarchive out(actual.first); | |
739 | out << value; | |
740 | std::size_t size = out.size(); | |
741 | ||
742 | // Send the actual message data | |
743 | #ifdef SEND_OOB_BSEND | |
744 | if (mpi_process_group::message_buffer_size()) { | |
745 | MPI_Bsend(const_cast<void*>(out.address()), size, MPI_PACKED, | |
746 | dest, actual.second, actual.first); | |
747 | return; | |
748 | } | |
749 | #endif | |
750 | MPI_Request request; | |
751 | MPI_Isend(const_cast<void*>(out.address()), size, MPI_PACKED, | |
752 | dest, actual.second, actual.first, &request); | |
753 | ||
754 | int done=0; | |
755 | do { | |
756 | pg.poll(); | |
757 | MPI_Test(&request,&done,MPI_STATUS_IGNORE); | |
758 | } while (!done); | |
759 | } | |
760 | ||
761 | template<typename T> | |
762 | typename enable_if<boost::mpi::is_mpi_datatype<T> >::type | |
763 | receive_oob(const mpi_process_group& pg, | |
764 | mpi_process_group::process_id_type source, int tag, T& value, int block=-1); | |
765 | ||
766 | template<typename T> | |
767 | typename disable_if<boost::mpi::is_mpi_datatype<T> >::type | |
768 | receive_oob(const mpi_process_group& pg, | |
769 | mpi_process_group::process_id_type source, int tag, T& value, int block=-1); | |
770 | ||
771 | template<typename SendT, typename ReplyT> | |
772 | typename enable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type | |
773 | send_oob_with_reply(const mpi_process_group& pg, | |
774 | mpi_process_group::process_id_type dest, | |
775 | int tag, const SendT& send_value, ReplyT& reply_value, | |
776 | int block = -1); | |
777 | ||
778 | template<typename SendT, typename ReplyT> | |
779 | typename disable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type | |
780 | send_oob_with_reply(const mpi_process_group& pg, | |
781 | mpi_process_group::process_id_type dest, | |
782 | int tag, const SendT& send_value, ReplyT& reply_value, | |
783 | int block = -1); | |
784 | ||
785 | } } } // end namespace boost::graph::distributed | |
786 | ||
787 | BOOST_IS_BITWISE_SERIALIZABLE(boost::graph::distributed::mpi_process_group::message_header) | |
788 | namespace boost { namespace mpi { | |
789 | template<> | |
790 | struct is_mpi_datatype<boost::graph::distributed::mpi_process_group::message_header> : mpl::true_ { }; | |
791 | } } // end namespace boost::mpi | |
792 | ||
793 | namespace std { | |
794 | /// optimized swap for outgoing messages | |
795 | inline void | |
796 | swap(boost::graph::distributed::mpi_process_group::outgoing_messages& x, | |
797 | boost::graph::distributed::mpi_process_group::outgoing_messages& y) | |
798 | { | |
799 | x.swap(y); | |
800 | } | |
801 | ||
802 | ||
803 | } | |
804 | ||
805 | BOOST_CLASS_IMPLEMENTATION(boost::graph::distributed::mpi_process_group::outgoing_messages,object_serializable) | |
806 | BOOST_CLASS_TRACKING(boost::graph::distributed::mpi_process_group::outgoing_messages,track_never) | |
807 | ||
808 | #include <boost/graph/distributed/detail/mpi_process_group.ipp> | |
809 | ||
810 | #endif // BOOST_PARALLEL_MPI_MPI_PROCESS_GROUP_HPP |