]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/libs/thread/test/sync/mutual_exclusion/sync_pq/pq_multi_thread_pass.cpp
update sources to v12.2.3
[ceph.git] / ceph / src / boost / libs / thread / test / sync / mutual_exclusion / sync_pq / pq_multi_thread_pass.cpp
CommitLineData
7c673cae
FG
1// Copyright (C) 2014 Ian Forbed
2// Copyright (C) 2014 Vicente J. Botet Escriba
3//
4// Distributed under the Boost Software License, Version 1.0. (See accompanying
5// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6//
7
8#include <boost/config.hpp>
9#if ! defined BOOST_NO_CXX11_DECLTYPE
10#define BOOST_RESULT_OF_USE_DECLTYPE
11#endif
12
13#define BOOST_THREAD_VERSION 4
14#define BOOST_THREAD_PROVIDES_EXECUTORS
15
16#include <exception>
17
18#include <boost/thread/thread.hpp>
19#include <boost/thread/barrier.hpp>
20#include <boost/thread/concurrent_queues/sync_priority_queue.hpp>
21
22#include <boost/core/lightweight_test.hpp>
23
b32b8144
FG
24#ifdef BOOST_MSVC
25#pragma warning(disable: 4127) // conditional expression is constant
26#endif
27
7c673cae
FG
28typedef boost::concurrent::sync_priority_queue<int> sync_pq;
29
30int call_pull(sync_pq* q, boost::barrier* go)
31{
32 go->wait();
33 return q->pull();
34
35}
36
37void call_push(sync_pq* q, boost::barrier* go, int val)
38{
39 go->wait();
40 q->push(val);
41}
42
43void test_pull(const int n)
44{
45 sync_pq pq;
46 BOOST_TEST(pq.empty());
47 for(int i = 0; i < n; i++)
48 {
49 pq.push(i);
50 }
51 BOOST_TEST(!pq.empty());
52 BOOST_TEST_EQ(pq.size(), std::size_t(n));
53 pq.close();
54 BOOST_TEST(pq.closed());
55 boost::barrier b(n);
56 boost::thread_group tg;
57 for(int i = 0; i < n; i++)
58 {
59 tg.create_thread(boost::bind(call_pull, &pq, &b));
60 }
61 tg.join_all();
62 BOOST_TEST(pq.empty());
63}
64
65void test_push(const int n)
66{
67 sync_pq pq;
68 BOOST_TEST(pq.empty());
69
70 boost::barrier b(n);
71 boost::thread_group tg;
72 for(int i = 0; i < n; i++)
73 {
74 tg.create_thread(boost::bind(call_push, &pq, &b, i));
75 }
76 tg.join_all();
77 BOOST_TEST(!pq.empty());
78 BOOST_TEST_EQ(pq.size(), std::size_t(n));
79}
80
81void test_both(const int n)
82{
83 sync_pq pq;
84 BOOST_TEST(pq.empty());
85
86 boost::barrier b(2*n);
87 boost::thread_group tg;
88 for(int i = 0; i < n; i++)
89 {
90 tg.create_thread(boost::bind(call_pull, &pq, &b));
91 tg.create_thread(boost::bind(call_push, &pq, &b, i));
92 }
93 tg.join_all();
94 BOOST_TEST(pq.empty());
95 BOOST_TEST_EQ(pq.size(), std::size_t(0));
96}
97
98void push_range(sync_pq* q, const int begin, const int end)
99{
100 for(int i = begin; i < end; i++)
101 q->push(i);
102}
103
104void atomic_pull(sync_pq* q, boost::atomic<int>* sum)
105{
106 while(1)
107 {
108 try{
109 const int val = q->pull();
110 sum->fetch_add(val);
111 }
112 catch(std::exception& ){
113 break;
114 }
115 }
116}
117
118/**
119 * This test computes the sum of the first N integers upto $limit using
120 * $n threads for the push operation and $n threads for the pull and count
121 * operation. The push operation push a range of numbers on the queue while
122 * the pull operation pull from the queue and increments an atomic int.
123 * At the end of execution the value of atomic<int> $sum should be the same
124 * as n*(n+1)/2 as this is the closed form solution to this problem.
125 */
126void compute_sum(const int n)
127{
128 const int limit = 1000;
129 sync_pq pq;
130 BOOST_TEST(pq.empty());
131 boost::atomic<int> sum(0);
132 boost::thread_group tg1;
133 boost::thread_group tg2;
134 for(int i = 0; i < n; i++)
135 {
136 tg1.create_thread(boost::bind(push_range, &pq, i*(limit/n)+1, (i+1)*(limit/n)+1));
137 tg2.create_thread(boost::bind(atomic_pull, &pq, &sum));
138 }
139 tg1.join_all();
140 pq.close(); //Wait until all enqueuing is done before closing.
141 BOOST_TEST(pq.closed());
142 tg2.join_all();
143 BOOST_TEST(pq.empty());
144 BOOST_TEST_EQ(sum.load(), limit*(limit+1)/2);
145}
146
147void move_between_queues(sync_pq* q1, sync_pq* q2)
148{
149 while(1){
150 try{
151 const int val = q1->pull();
152 q2->push(val);
153 }
154 catch(std::exception& ){
155 break;
156 }
157 }
158}
159
160/**
161 * This test computes the sum of the first N integers upto $limit by moving
162 * numbers between 2 sync_priority_queues. A range of numbers are pushed onto
163 * one queue by $n threads while $n threads pull from this queue and push onto
164 * another sync_pq. At the end the main thread ensures the the values in the
165 * second queue are in proper order and then sums all the values from this
166 * queue. The sum should match n*(n+1)/2, the closed form solution to this
167 * problem.
168 */
169void sum_with_moving(const int n)
170{
171 const int limit = 1000;
172 sync_pq pq1;
173 sync_pq pq2;
174 BOOST_TEST(pq1.empty());
175 BOOST_TEST(pq2.empty());
176 boost::thread_group tg1;
177 boost::thread_group tg2;
178 for(int i = 0; i < n; i++)
179 {
180 tg1.create_thread(boost::bind(push_range, &pq1, i*(limit/n)+1, (i+1)*(limit/n)+1));
181 tg2.create_thread(boost::bind(move_between_queues, &pq1, &pq2));
182 }
183 tg1.join_all();
184 pq1.close(); //Wait until all enqueuing is done before closing.
185 BOOST_TEST(pq1.closed());
186 tg2.join_all();
187 BOOST_TEST(pq1.empty());
188 BOOST_TEST(!pq2.empty());
189 int sum = 0;
190 for(int i = 1000; i > 0; i--){
191 const int val = pq2.pull();
192 BOOST_TEST_EQ(i,val);
193 sum += val;
194 }
195 BOOST_TEST(pq2.empty());
196 BOOST_TEST_EQ(sum, limit*(limit+1)/2);
197}
198
199int main()
200{
201 for(int i = 1; i <= 64; i *= 2)
202 {
203 test_pull(i);
204 test_push(i);
205 test_both(i);
206 }
207 //These numbers must divide 1000
208 compute_sum(1);
209 compute_sum(4);
210 compute_sum(10);
211 compute_sum(25);
212 compute_sum(50);
213 sum_with_moving(1);
214 sum_with_moving(4);
215 sum_with_moving(10);
216 sum_with_moving(25);
217 sum_with_moving(50);
218 return boost::report_errors();
219}