]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/libs/thread/test/sync/mutual_exclusion/sync_pq/pq_multi_thread_pass.cpp
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / boost / libs / thread / test / sync / mutual_exclusion / sync_pq / pq_multi_thread_pass.cpp
1 // Copyright (C) 2014 Ian Forbed
2 // Copyright (C) 2014 Vicente J. Botet Escriba
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7
8 #include <boost/config.hpp>
9 #if ! defined BOOST_NO_CXX11_DECLTYPE
10 #define BOOST_RESULT_OF_USE_DECLTYPE
11 #endif
12
13 #define BOOST_THREAD_VERSION 4
14 #define BOOST_THREAD_PROVIDES_EXECUTORS
15
16 #include <exception>
17
18 #include <boost/thread/thread.hpp>
19 #include <boost/thread/barrier.hpp>
20 #include <boost/thread/concurrent_queues/sync_priority_queue.hpp>
21
22 #include <boost/core/lightweight_test.hpp>
23
24 typedef boost::concurrent::sync_priority_queue<int> sync_pq;
25
26 int call_pull(sync_pq* q, boost::barrier* go)
27 {
28 go->wait();
29 return q->pull();
30
31 }
32
33 void call_push(sync_pq* q, boost::barrier* go, int val)
34 {
35 go->wait();
36 q->push(val);
37 }
38
39 void test_pull(const int n)
40 {
41 sync_pq pq;
42 BOOST_TEST(pq.empty());
43 for(int i = 0; i < n; i++)
44 {
45 pq.push(i);
46 }
47 BOOST_TEST(!pq.empty());
48 BOOST_TEST_EQ(pq.size(), std::size_t(n));
49 pq.close();
50 BOOST_TEST(pq.closed());
51 boost::barrier b(n);
52 boost::thread_group tg;
53 for(int i = 0; i < n; i++)
54 {
55 tg.create_thread(boost::bind(call_pull, &pq, &b));
56 }
57 tg.join_all();
58 BOOST_TEST(pq.empty());
59 }
60
61 void test_push(const int n)
62 {
63 sync_pq pq;
64 BOOST_TEST(pq.empty());
65
66 boost::barrier b(n);
67 boost::thread_group tg;
68 for(int i = 0; i < n; i++)
69 {
70 tg.create_thread(boost::bind(call_push, &pq, &b, i));
71 }
72 tg.join_all();
73 BOOST_TEST(!pq.empty());
74 BOOST_TEST_EQ(pq.size(), std::size_t(n));
75 }
76
77 void test_both(const int n)
78 {
79 sync_pq pq;
80 BOOST_TEST(pq.empty());
81
82 boost::barrier b(2*n);
83 boost::thread_group tg;
84 for(int i = 0; i < n; i++)
85 {
86 tg.create_thread(boost::bind(call_pull, &pq, &b));
87 tg.create_thread(boost::bind(call_push, &pq, &b, i));
88 }
89 tg.join_all();
90 BOOST_TEST(pq.empty());
91 BOOST_TEST_EQ(pq.size(), std::size_t(0));
92 }
93
94 void push_range(sync_pq* q, const int begin, const int end)
95 {
96 for(int i = begin; i < end; i++)
97 q->push(i);
98 }
99
100 void atomic_pull(sync_pq* q, boost::atomic<int>* sum)
101 {
102 while(1)
103 {
104 try{
105 const int val = q->pull();
106 sum->fetch_add(val);
107 }
108 catch(std::exception& ){
109 break;
110 }
111 }
112 }
113
114 /**
115 * This test computes the sum of the first N integers upto $limit using
116 * $n threads for the push operation and $n threads for the pull and count
117 * operation. The push operation push a range of numbers on the queue while
118 * the pull operation pull from the queue and increments an atomic int.
119 * At the end of execution the value of atomic<int> $sum should be the same
120 * as n*(n+1)/2 as this is the closed form solution to this problem.
121 */
122 void compute_sum(const int n)
123 {
124 const int limit = 1000;
125 sync_pq pq;
126 BOOST_TEST(pq.empty());
127 boost::atomic<int> sum(0);
128 boost::thread_group tg1;
129 boost::thread_group tg2;
130 for(int i = 0; i < n; i++)
131 {
132 tg1.create_thread(boost::bind(push_range, &pq, i*(limit/n)+1, (i+1)*(limit/n)+1));
133 tg2.create_thread(boost::bind(atomic_pull, &pq, &sum));
134 }
135 tg1.join_all();
136 pq.close(); //Wait until all enqueuing is done before closing.
137 BOOST_TEST(pq.closed());
138 tg2.join_all();
139 BOOST_TEST(pq.empty());
140 BOOST_TEST_EQ(sum.load(), limit*(limit+1)/2);
141 }
142
143 void move_between_queues(sync_pq* q1, sync_pq* q2)
144 {
145 while(1){
146 try{
147 const int val = q1->pull();
148 q2->push(val);
149 }
150 catch(std::exception& ){
151 break;
152 }
153 }
154 }
155
156 /**
157 * This test computes the sum of the first N integers upto $limit by moving
158 * numbers between 2 sync_priority_queues. A range of numbers are pushed onto
159 * one queue by $n threads while $n threads pull from this queue and push onto
160 * another sync_pq. At the end the main thread ensures the the values in the
161 * second queue are in proper order and then sums all the values from this
162 * queue. The sum should match n*(n+1)/2, the closed form solution to this
163 * problem.
164 */
165 void sum_with_moving(const int n)
166 {
167 const int limit = 1000;
168 sync_pq pq1;
169 sync_pq pq2;
170 BOOST_TEST(pq1.empty());
171 BOOST_TEST(pq2.empty());
172 boost::thread_group tg1;
173 boost::thread_group tg2;
174 for(int i = 0; i < n; i++)
175 {
176 tg1.create_thread(boost::bind(push_range, &pq1, i*(limit/n)+1, (i+1)*(limit/n)+1));
177 tg2.create_thread(boost::bind(move_between_queues, &pq1, &pq2));
178 }
179 tg1.join_all();
180 pq1.close(); //Wait until all enqueuing is done before closing.
181 BOOST_TEST(pq1.closed());
182 tg2.join_all();
183 BOOST_TEST(pq1.empty());
184 BOOST_TEST(!pq2.empty());
185 int sum = 0;
186 for(int i = 1000; i > 0; i--){
187 const int val = pq2.pull();
188 BOOST_TEST_EQ(i,val);
189 sum += val;
190 }
191 BOOST_TEST(pq2.empty());
192 BOOST_TEST_EQ(sum, limit*(limit+1)/2);
193 }
194
195 int main()
196 {
197 for(int i = 1; i <= 64; i *= 2)
198 {
199 test_pull(i);
200 test_push(i);
201 test_both(i);
202 }
203 //These numbers must divide 1000
204 compute_sum(1);
205 compute_sum(4);
206 compute_sum(10);
207 compute_sum(25);
208 compute_sum(50);
209 sum_with_moving(1);
210 sum_with_moving(4);
211 sum_with_moving(10);
212 sum_with_moving(25);
213 sum_with_moving(50);
214 return boost::report_errors();
215 }