]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/libs/lockfree/test/spsc_queue_stress_test.cpp
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / boost / libs / lockfree / test / spsc_queue_stress_test.cpp
1 // Copyright (C) 2011-2013 Tim Blechmann
2 //
3 // Distributed under the Boost Software License, Version 1.0. (See
4 // accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6
7 #include <boost/lockfree/spsc_queue.hpp>
8 #include <boost/thread.hpp>
9
10 #define BOOST_TEST_MAIN
11 #ifdef BOOST_LOCKFREE_INCLUDE_TESTS
12 #include <boost/test/included/unit_test.hpp>
13 #else
14 #include <boost/test/unit_test.hpp>
15 #endif
16
17 #include <iostream>
18 #include <memory>
19
20 #include "test_helpers.hpp"
21 #include "test_common.hpp"
22
23 using namespace boost;
24 using namespace boost::lockfree;
25 using namespace std;
26
27 #ifndef BOOST_LOCKFREE_STRESS_TEST
28 static const boost::uint32_t nodes_per_thread = 100000;
29 #else
30 static const boost::uint32_t nodes_per_thread = 100000000;
31 #endif
32
33 struct spsc_queue_tester
34 {
35 spsc_queue<int, capacity<128> > sf;
36
37 boost::lockfree::detail::atomic<long> spsc_queue_cnt, received_nodes;
38
39 static_hashed_set<int, 1<<16 > working_set;
40
41 spsc_queue_tester(void):
42 spsc_queue_cnt(0), received_nodes(0)
43 {}
44
45 void add(void)
46 {
47 for (boost::uint32_t i = 0; i != nodes_per_thread; ++i) {
48 int id = generate_id<int>();
49 working_set.insert(id);
50
51 while (sf.push(id) == false)
52 {}
53
54 ++spsc_queue_cnt;
55 }
56 running = false;
57 }
58
59 bool get_element(void)
60 {
61 int data;
62 bool success = sf.pop(data);
63
64 if (success) {
65 ++received_nodes;
66 --spsc_queue_cnt;
67 bool erased = working_set.erase(data);
68 assert(erased);
69 return true;
70 } else
71 return false;
72 }
73
74 boost::lockfree::detail::atomic<bool> running;
75
76 void get(void)
77 {
78 for(;;) {
79 bool success = get_element();
80 if (!running && !success)
81 break;
82 }
83
84 while ( get_element() );
85 }
86
87 void run(void)
88 {
89 running = true;
90
91 BOOST_REQUIRE(sf.empty());
92
93 boost::thread reader(boost::bind(&spsc_queue_tester::get, this));
94 boost::thread writer(boost::bind(&spsc_queue_tester::add, this));
95 cout << "reader and writer threads created" << endl;
96
97 writer.join();
98 cout << "writer threads joined. waiting for readers to finish" << endl;
99
100 reader.join();
101
102 BOOST_REQUIRE_EQUAL(received_nodes, nodes_per_thread);
103 BOOST_REQUIRE_EQUAL(spsc_queue_cnt, 0);
104 BOOST_REQUIRE(sf.empty());
105 BOOST_REQUIRE(working_set.count_nodes() == 0);
106 }
107 };
108
109 BOOST_AUTO_TEST_CASE( spsc_queue_test_caching )
110 {
111 boost::shared_ptr<spsc_queue_tester> test1(new spsc_queue_tester);
112 test1->run();
113 }
114
115 struct spsc_queue_tester_buffering
116 {
117 spsc_queue<int, capacity<128> > sf;
118
119 boost::lockfree::detail::atomic<long> spsc_queue_cnt;
120
121 static_hashed_set<int, 1<<16 > working_set;
122 boost::lockfree::detail::atomic<size_t> received_nodes;
123
124 spsc_queue_tester_buffering(void):
125 spsc_queue_cnt(0), received_nodes(0)
126 {}
127
128 static const size_t buf_size = 5;
129
130 void add(void)
131 {
132 boost::array<int, buf_size> input_buffer;
133 for (boost::uint32_t i = 0; i != nodes_per_thread; i+=buf_size) {
134 for (size_t i = 0; i != buf_size; ++i) {
135 int id = generate_id<int>();
136 working_set.insert(id);
137 input_buffer[i] = id;
138 }
139
140 size_t pushed = 0;
141
142 do {
143 pushed += sf.push(input_buffer.c_array() + pushed,
144 input_buffer.size() - pushed);
145 } while (pushed != buf_size);
146
147 spsc_queue_cnt+=buf_size;
148 }
149 running = false;
150 }
151
152 bool get_elements(void)
153 {
154 boost::array<int, buf_size> output_buffer;
155
156 size_t popd = sf.pop(output_buffer.c_array(), output_buffer.size());
157
158 if (popd) {
159 received_nodes += popd;
160 spsc_queue_cnt -= popd;
161
162 for (size_t i = 0; i != popd; ++i) {
163 bool erased = working_set.erase(output_buffer[i]);
164 assert(erased);
165 }
166
167 return true;
168 } else
169 return false;
170 }
171
172 boost::lockfree::detail::atomic<bool> running;
173
174 void get(void)
175 {
176 for(;;) {
177 bool success = get_elements();
178 if (!running && !success)
179 break;
180 }
181
182 while ( get_elements() );
183 }
184
185 void run(void)
186 {
187 running = true;
188
189 boost::thread reader(boost::bind(&spsc_queue_tester_buffering::get, this));
190 boost::thread writer(boost::bind(&spsc_queue_tester_buffering::add, this));
191 cout << "reader and writer threads created" << endl;
192
193 writer.join();
194 cout << "writer threads joined. waiting for readers to finish" << endl;
195
196 reader.join();
197
198 BOOST_REQUIRE_EQUAL(received_nodes, nodes_per_thread);
199 BOOST_REQUIRE_EQUAL(spsc_queue_cnt, 0);
200 BOOST_REQUIRE(sf.empty());
201 BOOST_REQUIRE(working_set.count_nodes() == 0);
202 }
203 };
204
205
206 BOOST_AUTO_TEST_CASE( spsc_queue_test_buffering )
207 {
208 boost::shared_ptr<spsc_queue_tester_buffering> test1(new spsc_queue_tester_buffering);
209 test1->run();
210 }
211