]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/libs/graph_parallel/include/boost/graph/distributed/detail/queue.ipp
bump version to 12.2.2-pve1
[ceph.git] / ceph / src / boost / libs / graph_parallel / include / boost / graph / distributed / detail / queue.ipp
CommitLineData
7c673cae
FG
1// Copyright (C) 2004-2006 The Trustees of Indiana University.
2
3// Use, modification and distribution is subject to the Boost Software
4// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5// http://www.boost.org/LICENSE_1_0.txt)
6
7// Authors: Douglas Gregor
8// Andrew Lumsdaine
9#include <boost/optional.hpp>
10#include <cassert>
11#include <boost/graph/parallel/algorithm.hpp>
12#include <boost/graph/parallel/process_group.hpp>
13#include <functional>
14#include <algorithm>
15#include <boost/graph/parallel/simple_trigger.hpp>
16
17#ifndef BOOST_GRAPH_USE_MPI
18#error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
19#endif
20
21namespace boost { namespace graph { namespace distributed {
22
23template<BOOST_DISTRIBUTED_QUEUE_PARMS>
24BOOST_DISTRIBUTED_QUEUE_TYPE::
25distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
26 const Buffer& buffer, bool polling)
27 : process_group(process_group, attach_distributed_object()),
28 owner(owner),
29 buffer(buffer),
30 polling(polling)
31{
32 if (!polling)
33 outgoing_buffers.reset(
34 new outgoing_buffers_t(num_processes(process_group)));
35
36 setup_triggers();
37}
38
39template<BOOST_DISTRIBUTED_QUEUE_PARMS>
40BOOST_DISTRIBUTED_QUEUE_TYPE::
41distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
42 const Buffer& buffer, const UnaryPredicate& pred,
43 bool polling)
44 : process_group(process_group, attach_distributed_object()),
45 owner(owner),
46 buffer(buffer),
47 pred(pred),
48 polling(polling)
49{
50 if (!polling)
51 outgoing_buffers.reset(
52 new outgoing_buffers_t(num_processes(process_group)));
53
54 setup_triggers();
55}
56
57template<BOOST_DISTRIBUTED_QUEUE_PARMS>
58BOOST_DISTRIBUTED_QUEUE_TYPE::
59distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
60 const UnaryPredicate& pred, bool polling)
61 : process_group(process_group, attach_distributed_object()),
62 owner(owner),
63 pred(pred),
64 polling(polling)
65{
66 if (!polling)
67 outgoing_buffers.reset(
68 new outgoing_buffers_t(num_processes(process_group)));
69
70 setup_triggers();
71}
72
73template<BOOST_DISTRIBUTED_QUEUE_PARMS>
74void
75BOOST_DISTRIBUTED_QUEUE_TYPE::push(const value_type& x)
76{
77 typename ProcessGroup::process_id_type dest = get(owner, x);
78 if (outgoing_buffers)
79 outgoing_buffers->at(dest).push_back(x);
80 else if (dest == process_id(process_group))
81 buffer.push(x);
82 else
83 send(process_group, get(owner, x), msg_push, x);
84}
85
86template<BOOST_DISTRIBUTED_QUEUE_PARMS>
87bool
88BOOST_DISTRIBUTED_QUEUE_TYPE::empty() const
89{
90 /* Processes will stay here until the buffer is nonempty or
91 synchronization with the other processes indicates that all local
92 buffers are empty (and no messages are in transit).
93 */
94 while (buffer.empty() && !do_synchronize()) ;
95
96 return buffer.empty();
97}
98
99template<BOOST_DISTRIBUTED_QUEUE_PARMS>
100typename BOOST_DISTRIBUTED_QUEUE_TYPE::size_type
101BOOST_DISTRIBUTED_QUEUE_TYPE::size() const
102{
103 empty();
104 return buffer.size();
105}
106
107template<BOOST_DISTRIBUTED_QUEUE_PARMS>
108void BOOST_DISTRIBUTED_QUEUE_TYPE::setup_triggers()
109{
110 using boost::graph::parallel::simple_trigger;
111
112 simple_trigger(process_group, msg_push, this,
113 &distributed_queue::handle_push);
114 simple_trigger(process_group, msg_multipush, this,
115 &distributed_queue::handle_multipush);
116}
117
118template<BOOST_DISTRIBUTED_QUEUE_PARMS>
119void
120BOOST_DISTRIBUTED_QUEUE_TYPE::
121handle_push(int /*source*/, int /*tag*/, const value_type& value,
122 trigger_receive_context)
123{
124 if (pred(value)) buffer.push(value);
125}
126
127template<BOOST_DISTRIBUTED_QUEUE_PARMS>
128void
129BOOST_DISTRIBUTED_QUEUE_TYPE::
130handle_multipush(int /*source*/, int /*tag*/,
131 const std::vector<value_type>& values,
132 trigger_receive_context)
133{
134 for (std::size_t i = 0; i < values.size(); ++i)
135 if (pred(values[i])) buffer.push(values[i]);
136}
137
138template<BOOST_DISTRIBUTED_QUEUE_PARMS>
139bool
140BOOST_DISTRIBUTED_QUEUE_TYPE::do_synchronize() const
141{
142#ifdef PBGL_ACCOUNTING
143 ++num_synchronizations;
144#endif
145
146 using boost::parallel::all_reduce;
147 using std::swap;
148
149 typedef typename ProcessGroup::process_id_type process_id_type;
150
151 if (outgoing_buffers) {
152 // Transfer all of the push requests
153 process_id_type id = process_id(process_group);
154 process_id_type np = num_processes(process_group);
155 for (process_id_type dest = 0; dest < np; ++dest) {
156 outgoing_buffer_t& outgoing = outgoing_buffers->at(dest);
157 std::size_t size = outgoing.size();
158 if (size != 0) {
159 if (dest != id) {
160 send(process_group, dest, msg_multipush, outgoing);
161 } else {
162 for (std::size_t i = 0; i < size; ++i)
163 buffer.push(outgoing[i]);
164 }
165 outgoing.clear();
166 }
167 }
168 }
169 synchronize(process_group);
170
171 unsigned local_size = buffer.size();
172 unsigned global_size =
173 all_reduce(process_group, local_size, std::plus<unsigned>());
174 return global_size == 0;
175}
176
177} } } // end namespace boost::graph::distributed