1 // Copyright (C) 2011-2013 Tim Blechmann
3 // Distributed under the Boost Software License, Version 1.0. (See
4 // accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
7 #include <boost/lockfree/spsc_queue.hpp>
8 #include <boost/thread.hpp>
10 #define BOOST_TEST_MAIN
11 #ifdef BOOST_LOCKFREE_INCLUDE_TESTS
12 #include <boost/test/included/unit_test.hpp>
14 #include <boost/test/unit_test.hpp>
20 #include "test_helpers.hpp"
21 #include "test_common.hpp"
23 using namespace boost
;
24 using namespace boost::lockfree
;
27 #ifndef BOOST_LOCKFREE_STRESS_TEST
28 static const boost::uint32_t nodes_per_thread
= 100000;
30 static const boost::uint32_t nodes_per_thread
= 100000000;
33 struct spsc_queue_tester
35 spsc_queue
<int, capacity
<128> > sf
;
37 boost::lockfree::detail::atomic
<long> spsc_queue_cnt
, received_nodes
;
39 static_hashed_set
<int, 1<<16 > working_set
;
41 spsc_queue_tester(void):
42 spsc_queue_cnt(0), received_nodes(0)
47 for (boost::uint32_t i
= 0; i
!= nodes_per_thread
; ++i
) {
48 int id
= generate_id
<int>();
49 working_set
.insert(id
);
51 while (sf
.push(id
) == false)
59 bool get_element(void)
62 bool success
= sf
.pop(data
);
67 bool erased
= working_set
.erase(data
);
74 boost::lockfree::detail::atomic
<bool> running
;
79 bool success
= get_element();
80 if (!running
&& !success
)
84 while ( get_element() );
91 BOOST_REQUIRE(sf
.empty());
93 boost::thread
reader(boost::bind(&spsc_queue_tester::get
, this));
94 boost::thread
writer(boost::bind(&spsc_queue_tester::add
, this));
95 cout
<< "reader and writer threads created" << endl
;
98 cout
<< "writer threads joined. waiting for readers to finish" << endl
;
102 BOOST_REQUIRE_EQUAL(received_nodes
, nodes_per_thread
);
103 BOOST_REQUIRE_EQUAL(spsc_queue_cnt
, 0);
104 BOOST_REQUIRE(sf
.empty());
105 BOOST_REQUIRE(working_set
.count_nodes() == 0);
109 BOOST_AUTO_TEST_CASE( spsc_queue_test_caching
)
111 boost::shared_ptr
<spsc_queue_tester
> test1(new spsc_queue_tester
);
115 struct spsc_queue_tester_buffering
117 spsc_queue
<int, capacity
<128> > sf
;
119 boost::lockfree::detail::atomic
<long> spsc_queue_cnt
;
121 static_hashed_set
<int, 1<<16 > working_set
;
122 boost::lockfree::detail::atomic
<size_t> received_nodes
;
124 spsc_queue_tester_buffering(void):
125 spsc_queue_cnt(0), received_nodes(0)
128 static const size_t buf_size
= 5;
132 boost::array
<int, buf_size
> input_buffer
;
133 for (boost::uint32_t i
= 0; i
!= nodes_per_thread
; i
+=buf_size
) {
134 for (size_t i
= 0; i
!= buf_size
; ++i
) {
135 int id
= generate_id
<int>();
136 working_set
.insert(id
);
137 input_buffer
[i
] = id
;
143 pushed
+= sf
.push(input_buffer
.c_array() + pushed
,
144 input_buffer
.size() - pushed
);
145 } while (pushed
!= buf_size
);
147 spsc_queue_cnt
+=buf_size
;
152 bool get_elements(void)
154 boost::array
<int, buf_size
> output_buffer
;
156 size_t popd
= sf
.pop(output_buffer
.c_array(), output_buffer
.size());
159 received_nodes
+= popd
;
160 spsc_queue_cnt
-= popd
;
162 for (size_t i
= 0; i
!= popd
; ++i
) {
163 bool erased
= working_set
.erase(output_buffer
[i
]);
172 boost::lockfree::detail::atomic
<bool> running
;
177 bool success
= get_elements();
178 if (!running
&& !success
)
182 while ( get_elements() );
189 boost::thread
reader(boost::bind(&spsc_queue_tester_buffering::get
, this));
190 boost::thread
writer(boost::bind(&spsc_queue_tester_buffering::add
, this));
191 cout
<< "reader and writer threads created" << endl
;
194 cout
<< "writer threads joined. waiting for readers to finish" << endl
;
198 BOOST_REQUIRE_EQUAL(received_nodes
, nodes_per_thread
);
199 BOOST_REQUIRE_EQUAL(spsc_queue_cnt
, 0);
200 BOOST_REQUIRE(sf
.empty());
201 BOOST_REQUIRE(working_set
.count_nodes() == 0);
206 BOOST_AUTO_TEST_CASE( spsc_queue_test_buffering
)
208 boost::shared_ptr
<spsc_queue_tester_buffering
> test1(new spsc_queue_tester_buffering
);