1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
7 * Copyright (c) 2016-present, Facebook, Inc.
10 * This source code is licensed under both the BSD-style license (found in the
11 * LICENSE file in the root directory of this source tree) and the GPLv2 (found
12 * in the COPYING file in the root directory of this source tree).
14 #include "util/work_queue.h"
16 #include <gtest/gtest.h>
23 namespace ROCKSDB_NAMESPACE
{
25 // Unit test for work_queue.h.
27 // This file is an excerpt from Facebook's zstd repo at
28 // https://github.com/facebook/zstd/. The relevant file is
29 // contrib/pzstd/utils/test/WorkQueueTest.cpp.
32 WorkQueue
<int>* queue
;
38 while (queue
->pop(result
)) {
39 std::lock_guard
<std::mutex
> lock(*mutex
);
40 results
[result
] = result
;
45 TEST(WorkQueue
, SingleThreaded
) {
50 EXPECT_TRUE(queue
.pop(result
));
55 EXPECT_TRUE(queue
.pop(result
));
57 EXPECT_TRUE(queue
.pop(result
));
63 EXPECT_TRUE(queue
.pop(result
));
65 EXPECT_TRUE(queue
.pop(result
));
67 EXPECT_FALSE(queue
.pop(result
));
69 queue
.waitUntilFinished();
72 TEST(WorkQueue
, SPSC
) {
76 for (int i
= 0; i
< 10; ++i
) {
80 std::thread
thread([&queue
, max
] {
82 for (int i
= 0;; ++i
) {
83 if (!queue
.pop(result
)) {
91 std::this_thread::yield();
92 for (int i
= 10; i
< max
; ++i
) {
100 TEST(WorkQueue
, SPMC
) {
101 WorkQueue
<int> queue
;
102 std::vector
<int> results(50, -1);
104 std::vector
<std::thread
> threads
;
105 for (int i
= 0; i
< 5; ++i
) {
106 threads
.emplace_back(Popper
{&queue
, results
.data(), &mutex
});
109 for (int i
= 0; i
< 50; ++i
) {
114 for (auto& thread
: threads
) {
118 for (int i
= 0; i
< 50; ++i
) {
119 EXPECT_EQ(i
, results
[i
]);
123 TEST(WorkQueue
, MPMC
) {
124 WorkQueue
<int> queue
;
125 std::vector
<int> results(100, -1);
127 std::vector
<std::thread
> popperThreads
;
128 for (int i
= 0; i
< 4; ++i
) {
129 popperThreads
.emplace_back(Popper
{&queue
, results
.data(), &mutex
});
132 std::vector
<std::thread
> pusherThreads
;
133 for (int i
= 0; i
< 2; ++i
) {
135 auto max
= (i
+ 1) * 50;
136 pusherThreads
.emplace_back([&queue
, min
, max
] {
137 for (int j
= min
; j
< max
; ++j
) {
143 for (auto& thread
: pusherThreads
) {
148 for (auto& thread
: popperThreads
) {
152 for (int i
= 0; i
< 100; ++i
) {
153 EXPECT_EQ(i
, results
[i
]);
157 TEST(WorkQueue
, BoundedSizeWorks
) {
158 WorkQueue
<int> queue(1);
167 EXPECT_EQ(5, result
);
170 TEST(WorkQueue
, BoundedSizePushAfterFinish
) {
171 WorkQueue
<int> queue(1);
174 std::thread
pusher([&queue
] { queue
.push(6); });
175 // Dirtily try and make sure that pusher has run.
176 std::this_thread::sleep_for(std::chrono::seconds(1));
178 EXPECT_TRUE(queue
.pop(result
));
179 EXPECT_EQ(5, result
);
180 EXPECT_FALSE(queue
.pop(result
));
185 TEST(WorkQueue
, SetMaxSize
) {
186 WorkQueue
<int> queue(2);
191 std::thread
pusher([&queue
] { queue
.push(7); });
192 // Dirtily try and make sure that pusher has run.
193 std::this_thread::sleep_for(std::chrono::seconds(1));
195 EXPECT_TRUE(queue
.pop(result
));
196 EXPECT_EQ(5, result
);
197 EXPECT_TRUE(queue
.pop(result
));
198 EXPECT_EQ(6, result
);
199 EXPECT_FALSE(queue
.pop(result
));
204 TEST(WorkQueue
, BoundedSizeMPMC
) {
205 WorkQueue
<int> queue(10);
206 std::vector
<int> results(200, -1);
208 std::cerr
<< "Creating popperThreads" << std::endl
;
209 std::vector
<std::thread
> popperThreads
;
210 for (int i
= 0; i
< 4; ++i
) {
211 popperThreads
.emplace_back(Popper
{&queue
, results
.data(), &mutex
});
214 std::cerr
<< "Creating pusherThreads" << std::endl
;
215 std::vector
<std::thread
> pusherThreads
;
216 for (int i
= 0; i
< 2; ++i
) {
218 auto max
= (i
+ 1) * 100;
219 pusherThreads
.emplace_back([&queue
, min
, max
] {
220 for (int j
= min
; j
< max
; ++j
) {
226 std::cerr
<< "Joining pusherThreads" << std::endl
;
227 for (auto& thread
: pusherThreads
) {
230 std::cerr
<< "Finishing queue" << std::endl
;
233 std::cerr
<< "Joining popperThreads" << std::endl
;
234 for (auto& thread
: popperThreads
) {
238 std::cerr
<< "Inspecting results" << std::endl
;
239 for (int i
= 0; i
< 200; ++i
) {
240 EXPECT_EQ(i
, results
[i
]);
244 TEST(WorkQueue
, FailedPush
) {
245 WorkQueue
<int> queue
;
246 EXPECT_TRUE(queue
.push(1));
248 EXPECT_FALSE(queue
.push(1));
251 TEST(WorkQueue
, FailedPop
) {
252 WorkQueue
<int> queue
;
254 EXPECT_TRUE(queue
.push(x
));
257 EXPECT_TRUE(queue
.pop(x
));
259 EXPECT_FALSE(queue
.pop(x
));
263 } // namespace ROCKSDB_NAMESPACE
265 int main(int argc
, char** argv
) {
266 ::testing::InitGoogleTest(&argc
, argv
);
267 return RUN_ALL_TESTS();