]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // Copyright (C) 2006 Douglas Gregor <doug.gregor@gmail.com> |
2 | ||
3 | // Use, modification and distribution is subject to the Boost Software | |
4 | // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at | |
5 | // http://www.boost.org/LICENSE_1_0.txt) | |
6 | ||
7 | // An example using Boost.MPI's split() operation on communicators to | |
8 | // create separate data-generating processes and data-collecting | |
9 | // processes. | |
10 | #include <boost/mpi.hpp> | |
11 | #include <iostream> | |
12 | #include <cstdlib> | |
13 | #include <boost/serialization/vector.hpp> | |
14 | namespace mpi = boost::mpi; | |
15 | ||
16 | enum message_tags { msg_data_packet, msg_broadcast_data, msg_finished }; | |
17 | ||
18 | void generate_data(mpi::communicator local, mpi::communicator world) | |
19 | { | |
20 | using std::srand; | |
21 | using std::rand; | |
22 | ||
23 | // The rank of the collector within the world communicator | |
24 | int master_collector = local.size(); | |
25 | ||
26 | srand(time(0) + world.rank()); | |
27 | ||
28 | // Send out several blocks of random data to the collectors. | |
29 | int num_data_blocks = rand() % 3 + 1; | |
30 | for (int block = 0; block < num_data_blocks; ++block) { | |
31 | // Generate some random data | |
32 | int num_samples = rand() % 1000; | |
33 | std::vector<int> data; | |
34 | for (int i = 0; i < num_samples; ++i) { | |
35 | data.push_back(rand()); | |
36 | } | |
37 | ||
38 | // Send our data to the master collector process. | |
39 | std::cout << "Generator #" << local.rank() << " sends some data..." | |
40 | << std::endl; | |
41 | world.send(master_collector, msg_data_packet, data); | |
42 | } | |
43 | ||
44 | // Wait for all of the generators to complete | |
45 | (local.barrier)(); | |
46 | ||
47 | // The first generator will send the message to the master collector | |
48 | // indicating that we're done. | |
49 | if (local.rank() == 0) | |
50 | world.send(master_collector, msg_finished); | |
51 | } | |
52 | ||
53 | void collect_data(mpi::communicator local, mpi::communicator world) | |
54 | { | |
55 | // The rank of the collector within the world communicator | |
56 | int master_collector = world.size() - local.size(); | |
57 | ||
58 | if (world.rank() == master_collector) { | |
59 | while (true) { | |
60 | // Wait for a message | |
61 | mpi::status msg = world.probe(); | |
62 | if (msg.tag() == msg_data_packet) { | |
63 | // Receive the packet of data | |
64 | std::vector<int> data; | |
65 | world.recv(msg.source(), msg.tag(), data); | |
66 | ||
67 | // Tell each of the collectors that we'll be broadcasting some data | |
68 | for (int dest = 1; dest < local.size(); ++dest) | |
69 | local.send(dest, msg_broadcast_data, msg.source()); | |
70 | ||
71 | // Broadcast the actual data. | |
72 | broadcast(local, data, 0); | |
73 | } else if (msg.tag() == msg_finished) { | |
74 | // Receive the message | |
75 | world.recv(msg.source(), msg.tag()); | |
76 | ||
77 | // Tell each of the collectors that we're finished | |
78 | for (int dest = 1; dest < local.size(); ++dest) | |
79 | local.send(dest, msg_finished); | |
80 | ||
81 | break; | |
82 | } | |
83 | } | |
84 | } else { | |
85 | while (true) { | |
86 | // Wait for a message from the master collector | |
87 | mpi::status msg = local.probe(); | |
88 | if (msg.tag() == msg_broadcast_data) { | |
89 | // Receive the broadcast message | |
90 | int originator; | |
91 | local.recv(msg.source(), msg.tag(), originator); | |
92 | ||
93 | // Receive the data broadcasted from the master collector | |
94 | std::vector<int> data; | |
95 | broadcast(local, data, 0); | |
96 | ||
97 | std::cout << "Collector #" << local.rank() | |
98 | << " is processing data from generator #" << originator | |
99 | << "." << std::endl; | |
100 | } else if (msg.tag() == msg_finished) { | |
101 | // Receive the message | |
102 | local.recv(msg.source(), msg.tag()); | |
103 | ||
104 | break; | |
105 | } | |
106 | } | |
107 | } | |
108 | } | |
109 | ||
110 | int main(int argc, char* argv[]) | |
111 | { | |
112 | mpi::environment env(argc, argv); | |
113 | mpi::communicator world; | |
114 | ||
115 | if (world.size() < 3) { | |
116 | if (world.rank() == 0) { | |
117 | std::cerr << "Error: this example requires at least 3 processes." | |
118 | << std::endl; | |
119 | } | |
120 | env.abort(-1); | |
121 | } | |
122 | ||
123 | bool is_generator = world.rank() < 2 * world.size() / 3; | |
124 | mpi::communicator local = world.split(is_generator? 0 : 1); | |
125 | if (is_generator) generate_data(local, world); | |
126 | else collect_data(local, world); | |
127 | ||
128 | return 0; | |
129 | } |