]>
git.proxmox.com Git - ceph.git/blob - ceph/src/boost/libs/mpi/example/generate_collect.cpp
1 // Copyright (C) 2006 Douglas Gregor <doug.gregor@gmail.com>
3 // Use, modification and distribution is subject to the Boost Software
4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
7 // An example using Boost.MPI's split() operation on communicators to
8 // create separate data-generating processes and data-collecting
10 #include <boost/mpi.hpp>
13 #include <boost/serialization/vector.hpp>
14 namespace mpi
= boost::mpi
;
16 enum message_tags
{ msg_data_packet
, msg_broadcast_data
, msg_finished
};
18 void generate_data(mpi::communicator local
, mpi::communicator world
)
23 // The rank of the collector within the world communicator
24 int master_collector
= local
.size();
26 srand(time(0) + world
.rank());
28 // Send out several blocks of random data to the collectors.
29 int num_data_blocks
= rand() % 3 + 1;
30 for (int block
= 0; block
< num_data_blocks
; ++block
) {
31 // Generate some random data
32 int num_samples
= rand() % 1000;
33 std::vector
<int> data
;
34 for (int i
= 0; i
< num_samples
; ++i
) {
35 data
.push_back(rand());
38 // Send our data to the master collector process.
39 std::cout
<< "Generator #" << local
.rank() << " sends some data..."
41 world
.send(master_collector
, msg_data_packet
, data
);
44 // Wait for all of the generators to complete
47 // The first generator will send the message to the master collector
48 // indicating that we're done.
49 if (local
.rank() == 0)
50 world
.send(master_collector
, msg_finished
);
53 void collect_data(mpi::communicator local
, mpi::communicator world
)
55 // The rank of the collector within the world communicator
56 int master_collector
= world
.size() - local
.size();
58 if (world
.rank() == master_collector
) {
61 mpi::status msg
= world
.probe();
62 if (msg
.tag() == msg_data_packet
) {
63 // Receive the packet of data
64 std::vector
<int> data
;
65 world
.recv(msg
.source(), msg
.tag(), data
);
67 // Tell each of the collectors that we'll be broadcasting some data
68 for (int dest
= 1; dest
< local
.size(); ++dest
)
69 local
.send(dest
, msg_broadcast_data
, msg
.source());
71 // Broadcast the actual data.
72 broadcast(local
, data
, 0);
73 } else if (msg
.tag() == msg_finished
) {
74 // Receive the message
75 world
.recv(msg
.source(), msg
.tag());
77 // Tell each of the collectors that we're finished
78 for (int dest
= 1; dest
< local
.size(); ++dest
)
79 local
.send(dest
, msg_finished
);
86 // Wait for a message from the master collector
87 mpi::status msg
= local
.probe();
88 if (msg
.tag() == msg_broadcast_data
) {
89 // Receive the broadcast message
91 local
.recv(msg
.source(), msg
.tag(), originator
);
93 // Receive the data broadcasted from the master collector
94 std::vector
<int> data
;
95 broadcast(local
, data
, 0);
97 std::cout
<< "Collector #" << local
.rank()
98 << " is processing data from generator #" << originator
100 } else if (msg
.tag() == msg_finished
) {
101 // Receive the message
102 local
.recv(msg
.source(), msg
.tag());
110 int main(int argc
, char* argv
[])
112 mpi::environment
env(argc
, argv
);
113 mpi::communicator world
;
115 if (world
.size() < 3) {
116 if (world
.rank() == 0) {
117 std::cerr
<< "Error: this example requires at least 3 processes."
123 bool is_generator
= world
.rank() < 2 * world
.size() / 3;
124 mpi::communicator local
= world
.split(is_generator
? 0 : 1);
125 if (is_generator
) generate_data(local
, world
);
126 else collect_data(local
, world
);