]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // Copyright (C) 2014 Ian Forbed |
2 | // Copyright (C) 2014 Vicente J. Botet Escriba | |
3 | // | |
4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |
5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
6 | // | |
7 | ||
8 | #include <boost/config.hpp> | |
9 | #if ! defined BOOST_NO_CXX11_DECLTYPE | |
10 | #define BOOST_RESULT_OF_USE_DECLTYPE | |
11 | #endif | |
12 | ||
13 | #define BOOST_THREAD_VERSION 4 | |
14 | #define BOOST_THREAD_PROVIDES_EXECUTORS | |
15 | ||
16 | #include <exception> | |
17 | ||
18 | #include <boost/thread/thread.hpp> | |
19 | #include <boost/thread/barrier.hpp> | |
20 | #include <boost/thread/concurrent_queues/sync_priority_queue.hpp> | |
21 | ||
22 | #include <boost/core/lightweight_test.hpp> | |
23 | ||
b32b8144 FG |
24 | #ifdef BOOST_MSVC |
25 | #pragma warning(disable: 4127) // conditional expression is constant | |
26 | #endif | |
27 | ||
7c673cae FG |
28 | typedef boost::concurrent::sync_priority_queue<int> sync_pq; |
29 | ||
30 | int call_pull(sync_pq* q, boost::barrier* go) | |
31 | { | |
32 | go->wait(); | |
33 | return q->pull(); | |
34 | ||
35 | } | |
36 | ||
37 | void call_push(sync_pq* q, boost::barrier* go, int val) | |
38 | { | |
39 | go->wait(); | |
40 | q->push(val); | |
41 | } | |
42 | ||
43 | void test_pull(const int n) | |
44 | { | |
45 | sync_pq pq; | |
46 | BOOST_TEST(pq.empty()); | |
47 | for(int i = 0; i < n; i++) | |
48 | { | |
49 | pq.push(i); | |
50 | } | |
51 | BOOST_TEST(!pq.empty()); | |
52 | BOOST_TEST_EQ(pq.size(), std::size_t(n)); | |
53 | pq.close(); | |
54 | BOOST_TEST(pq.closed()); | |
55 | boost::barrier b(n); | |
56 | boost::thread_group tg; | |
57 | for(int i = 0; i < n; i++) | |
58 | { | |
59 | tg.create_thread(boost::bind(call_pull, &pq, &b)); | |
60 | } | |
61 | tg.join_all(); | |
62 | BOOST_TEST(pq.empty()); | |
63 | } | |
64 | ||
65 | void test_push(const int n) | |
66 | { | |
67 | sync_pq pq; | |
68 | BOOST_TEST(pq.empty()); | |
69 | ||
70 | boost::barrier b(n); | |
71 | boost::thread_group tg; | |
72 | for(int i = 0; i < n; i++) | |
73 | { | |
74 | tg.create_thread(boost::bind(call_push, &pq, &b, i)); | |
75 | } | |
76 | tg.join_all(); | |
77 | BOOST_TEST(!pq.empty()); | |
78 | BOOST_TEST_EQ(pq.size(), std::size_t(n)); | |
79 | } | |
80 | ||
81 | void test_both(const int n) | |
82 | { | |
83 | sync_pq pq; | |
84 | BOOST_TEST(pq.empty()); | |
85 | ||
86 | boost::barrier b(2*n); | |
87 | boost::thread_group tg; | |
88 | for(int i = 0; i < n; i++) | |
89 | { | |
90 | tg.create_thread(boost::bind(call_pull, &pq, &b)); | |
91 | tg.create_thread(boost::bind(call_push, &pq, &b, i)); | |
92 | } | |
93 | tg.join_all(); | |
94 | BOOST_TEST(pq.empty()); | |
95 | BOOST_TEST_EQ(pq.size(), std::size_t(0)); | |
96 | } | |
97 | ||
98 | void push_range(sync_pq* q, const int begin, const int end) | |
99 | { | |
100 | for(int i = begin; i < end; i++) | |
101 | q->push(i); | |
102 | } | |
103 | ||
104 | void atomic_pull(sync_pq* q, boost::atomic<int>* sum) | |
105 | { | |
106 | while(1) | |
107 | { | |
108 | try{ | |
109 | const int val = q->pull(); | |
110 | sum->fetch_add(val); | |
111 | } | |
112 | catch(std::exception& ){ | |
113 | break; | |
114 | } | |
115 | } | |
116 | } | |
117 | ||
118 | /** | |
119 | * This test computes the sum of the first N integers upto $limit using | |
120 | * $n threads for the push operation and $n threads for the pull and count | |
121 | * operation. The push operation push a range of numbers on the queue while | |
122 | * the pull operation pull from the queue and increments an atomic int. | |
123 | * At the end of execution the value of atomic<int> $sum should be the same | |
124 | * as n*(n+1)/2 as this is the closed form solution to this problem. | |
125 | */ | |
126 | void compute_sum(const int n) | |
127 | { | |
128 | const int limit = 1000; | |
129 | sync_pq pq; | |
130 | BOOST_TEST(pq.empty()); | |
131 | boost::atomic<int> sum(0); | |
132 | boost::thread_group tg1; | |
133 | boost::thread_group tg2; | |
134 | for(int i = 0; i < n; i++) | |
135 | { | |
136 | tg1.create_thread(boost::bind(push_range, &pq, i*(limit/n)+1, (i+1)*(limit/n)+1)); | |
137 | tg2.create_thread(boost::bind(atomic_pull, &pq, &sum)); | |
138 | } | |
139 | tg1.join_all(); | |
140 | pq.close(); //Wait until all enqueuing is done before closing. | |
141 | BOOST_TEST(pq.closed()); | |
142 | tg2.join_all(); | |
143 | BOOST_TEST(pq.empty()); | |
144 | BOOST_TEST_EQ(sum.load(), limit*(limit+1)/2); | |
145 | } | |
146 | ||
147 | void move_between_queues(sync_pq* q1, sync_pq* q2) | |
148 | { | |
149 | while(1){ | |
150 | try{ | |
151 | const int val = q1->pull(); | |
152 | q2->push(val); | |
153 | } | |
154 | catch(std::exception& ){ | |
155 | break; | |
156 | } | |
157 | } | |
158 | } | |
159 | ||
160 | /** | |
161 | * This test computes the sum of the first N integers upto $limit by moving | |
162 | * numbers between 2 sync_priority_queues. A range of numbers are pushed onto | |
163 | * one queue by $n threads while $n threads pull from this queue and push onto | |
164 | * another sync_pq. At the end the main thread ensures the the values in the | |
165 | * second queue are in proper order and then sums all the values from this | |
166 | * queue. The sum should match n*(n+1)/2, the closed form solution to this | |
167 | * problem. | |
168 | */ | |
169 | void sum_with_moving(const int n) | |
170 | { | |
171 | const int limit = 1000; | |
172 | sync_pq pq1; | |
173 | sync_pq pq2; | |
174 | BOOST_TEST(pq1.empty()); | |
175 | BOOST_TEST(pq2.empty()); | |
176 | boost::thread_group tg1; | |
177 | boost::thread_group tg2; | |
178 | for(int i = 0; i < n; i++) | |
179 | { | |
180 | tg1.create_thread(boost::bind(push_range, &pq1, i*(limit/n)+1, (i+1)*(limit/n)+1)); | |
181 | tg2.create_thread(boost::bind(move_between_queues, &pq1, &pq2)); | |
182 | } | |
183 | tg1.join_all(); | |
184 | pq1.close(); //Wait until all enqueuing is done before closing. | |
185 | BOOST_TEST(pq1.closed()); | |
186 | tg2.join_all(); | |
187 | BOOST_TEST(pq1.empty()); | |
188 | BOOST_TEST(!pq2.empty()); | |
189 | int sum = 0; | |
190 | for(int i = 1000; i > 0; i--){ | |
191 | const int val = pq2.pull(); | |
192 | BOOST_TEST_EQ(i,val); | |
193 | sum += val; | |
194 | } | |
195 | BOOST_TEST(pq2.empty()); | |
196 | BOOST_TEST_EQ(sum, limit*(limit+1)/2); | |
197 | } | |
198 | ||
199 | int main() | |
200 | { | |
201 | for(int i = 1; i <= 64; i *= 2) | |
202 | { | |
203 | test_pull(i); | |
204 | test_push(i); | |
205 | test_both(i); | |
206 | } | |
207 | //These numbers must divide 1000 | |
208 | compute_sum(1); | |
209 | compute_sum(4); | |
210 | compute_sum(10); | |
211 | compute_sum(25); | |
212 | compute_sum(50); | |
213 | sum_with_moving(1); | |
214 | sum_with_moving(4); | |
215 | sum_with_moving(10); | |
216 | sum_with_moving(25); | |
217 | sum_with_moving(50); | |
218 | return boost::report_errors(); | |
219 | } |