2 * Copyright (c) 2016-present, Facebook, Inc.
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree. An additional grant
7 * of patent rights can be found in the PATENTS file in the same directory.
9 #include "utils/Buffer.h"
10 #include "utils/WorkQueue.h"
12 #include <gtest/gtest.h>
18 using namespace pzstd
;
22 WorkQueue
<int>* queue
;
28 while (queue
->pop(result
)) {
29 std::lock_guard
<std::mutex
> lock(*mutex
);
30 results
[result
] = result
;
36 TEST(WorkQueue
, SingleThreaded
) {
41 EXPECT_TRUE(queue
.pop(result
));
46 EXPECT_TRUE(queue
.pop(result
));
48 EXPECT_TRUE(queue
.pop(result
));
54 EXPECT_TRUE(queue
.pop(result
));
56 EXPECT_TRUE(queue
.pop(result
));
58 EXPECT_FALSE(queue
.pop(result
));
60 queue
.waitUntilFinished();
63 TEST(WorkQueue
, SPSC
) {
67 for (int i
= 0; i
< 10; ++i
) {
71 std::thread
thread([ &queue
, max
] {
73 for (int i
= 0;; ++i
) {
74 if (!queue
.pop(result
)) {
82 std::this_thread::yield();
83 for (int i
= 10; i
< max
; ++i
) {
91 TEST(WorkQueue
, SPMC
) {
93 std::vector
<int> results(50, -1);
95 std::vector
<std::thread
> threads
;
96 for (int i
= 0; i
< 5; ++i
) {
97 threads
.emplace_back(Popper
{&queue
, results
.data(), &mutex
});
100 for (int i
= 0; i
< 50; ++i
) {
105 for (auto& thread
: threads
) {
109 for (int i
= 0; i
< 50; ++i
) {
110 EXPECT_EQ(i
, results
[i
]);
114 TEST(WorkQueue
, MPMC
) {
115 WorkQueue
<int> queue
;
116 std::vector
<int> results(100, -1);
118 std::vector
<std::thread
> popperThreads
;
119 for (int i
= 0; i
< 4; ++i
) {
120 popperThreads
.emplace_back(Popper
{&queue
, results
.data(), &mutex
});
123 std::vector
<std::thread
> pusherThreads
;
124 for (int i
= 0; i
< 2; ++i
) {
126 auto max
= (i
+ 1) * 50;
127 pusherThreads
.emplace_back(
128 [ &queue
, min
, max
] {
129 for (int i
= min
; i
< max
; ++i
) {
135 for (auto& thread
: pusherThreads
) {
140 for (auto& thread
: popperThreads
) {
144 for (int i
= 0; i
< 100; ++i
) {
145 EXPECT_EQ(i
, results
[i
]);
149 TEST(WorkQueue
, BoundedSizeWorks
) {
150 WorkQueue
<int> queue(1);
159 EXPECT_EQ(5, result
);
162 TEST(WorkQueue
, BoundedSizePushAfterFinish
) {
163 WorkQueue
<int> queue(1);
166 std::thread
pusher([&queue
] {
169 // Dirtily try and make sure that pusher has run.
170 std::this_thread::sleep_for(std::chrono::seconds(1));
172 EXPECT_TRUE(queue
.pop(result
));
173 EXPECT_EQ(5, result
);
174 EXPECT_FALSE(queue
.pop(result
));
179 TEST(WorkQueue
, SetMaxSize
) {
180 WorkQueue
<int> queue(2);
185 std::thread
pusher([&queue
] {
188 // Dirtily try and make sure that pusher has run.
189 std::this_thread::sleep_for(std::chrono::seconds(1));
191 EXPECT_TRUE(queue
.pop(result
));
192 EXPECT_EQ(5, result
);
193 EXPECT_TRUE(queue
.pop(result
));
194 EXPECT_EQ(6, result
);
195 EXPECT_FALSE(queue
.pop(result
));
200 TEST(WorkQueue
, BoundedSizeMPMC
) {
201 WorkQueue
<int> queue(10);
202 std::vector
<int> results(200, -1);
204 std::vector
<std::thread
> popperThreads
;
205 for (int i
= 0; i
< 4; ++i
) {
206 popperThreads
.emplace_back(Popper
{&queue
, results
.data(), &mutex
});
209 std::vector
<std::thread
> pusherThreads
;
210 for (int i
= 0; i
< 2; ++i
) {
212 auto max
= (i
+ 1) * 100;
213 pusherThreads
.emplace_back(
214 [ &queue
, min
, max
] {
215 for (int i
= min
; i
< max
; ++i
) {
221 for (auto& thread
: pusherThreads
) {
226 for (auto& thread
: popperThreads
) {
230 for (int i
= 0; i
< 200; ++i
) {
231 EXPECT_EQ(i
, results
[i
]);
235 TEST(WorkQueue
, FailedPush
) {
236 WorkQueue
<std::unique_ptr
<int>> queue
;
237 std::unique_ptr
<int> x(new int{5});
238 EXPECT_TRUE(queue
.push(std::move(x
)));
239 EXPECT_EQ(nullptr, x
);
242 EXPECT_FALSE(queue
.push(std::move(x
)));
243 EXPECT_NE(nullptr, x
);
247 TEST(BufferWorkQueue
, SizeCalculatedCorrectly
) {
249 BufferWorkQueue queue
;
251 EXPECT_EQ(0, queue
.size());
254 BufferWorkQueue queue
;
255 queue
.push(Buffer(10));
257 EXPECT_EQ(10, queue
.size());
260 BufferWorkQueue queue
;
261 queue
.push(Buffer(10));
262 queue
.push(Buffer(5));
264 EXPECT_EQ(15, queue
.size());
267 BufferWorkQueue queue
;
268 queue
.push(Buffer(10));
269 queue
.push(Buffer(5));
273 EXPECT_EQ(5, queue
.size());