]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/util/work_queue_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / util / work_queue_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 /*
7 * Copyright (c) 2016-present, Facebook, Inc.
8 * All rights reserved.
9 *
10 * This source code is licensed under both the BSD-style license (found in the
11 * LICENSE file in the root directory of this source tree) and the GPLv2 (found
12 * in the COPYING file in the root directory of this source tree).
13 */
14 #include "util/work_queue.h"
15
16 #include <gtest/gtest.h>
17 #include <iostream>
18 #include <memory>
19 #include <mutex>
20 #include <thread>
21 #include <vector>
22
23 namespace ROCKSDB_NAMESPACE {
24
25 // Unit test for work_queue.h.
26 //
27 // This file is an excerpt from Facebook's zstd repo at
28 // https://github.com/facebook/zstd/. The relevant file is
29 // contrib/pzstd/utils/test/WorkQueueTest.cpp.
30
31 struct Popper {
32 WorkQueue<int>* queue;
33 int* results;
34 std::mutex* mutex;
35
36 void operator()() {
37 int result;
38 while (queue->pop(result)) {
39 std::lock_guard<std::mutex> lock(*mutex);
40 results[result] = result;
41 }
42 }
43 };
44
45 TEST(WorkQueue, SingleThreaded) {
46 WorkQueue<int> queue;
47 int result;
48
49 queue.push(5);
50 EXPECT_TRUE(queue.pop(result));
51 EXPECT_EQ(5, result);
52
53 queue.push(1);
54 queue.push(2);
55 EXPECT_TRUE(queue.pop(result));
56 EXPECT_EQ(1, result);
57 EXPECT_TRUE(queue.pop(result));
58 EXPECT_EQ(2, result);
59
60 queue.push(1);
61 queue.push(2);
62 queue.finish();
63 EXPECT_TRUE(queue.pop(result));
64 EXPECT_EQ(1, result);
65 EXPECT_TRUE(queue.pop(result));
66 EXPECT_EQ(2, result);
67 EXPECT_FALSE(queue.pop(result));
68
69 queue.waitUntilFinished();
70 }
71
72 TEST(WorkQueue, SPSC) {
73 WorkQueue<int> queue;
74 const int max = 100;
75
76 for (int i = 0; i < 10; ++i) {
77 queue.push(i);
78 }
79
80 std::thread thread([&queue, max] {
81 int result;
82 for (int i = 0;; ++i) {
83 if (!queue.pop(result)) {
84 EXPECT_EQ(i, max);
85 break;
86 }
87 EXPECT_EQ(i, result);
88 }
89 });
90
91 std::this_thread::yield();
92 for (int i = 10; i < max; ++i) {
93 queue.push(i);
94 }
95 queue.finish();
96
97 thread.join();
98 }
99
100 TEST(WorkQueue, SPMC) {
101 WorkQueue<int> queue;
102 std::vector<int> results(50, -1);
103 std::mutex mutex;
104 std::vector<std::thread> threads;
105 for (int i = 0; i < 5; ++i) {
106 threads.emplace_back(Popper{&queue, results.data(), &mutex});
107 }
108
109 for (int i = 0; i < 50; ++i) {
110 queue.push(i);
111 }
112 queue.finish();
113
114 for (auto& thread : threads) {
115 thread.join();
116 }
117
118 for (int i = 0; i < 50; ++i) {
119 EXPECT_EQ(i, results[i]);
120 }
121 }
122
123 TEST(WorkQueue, MPMC) {
124 WorkQueue<int> queue;
125 std::vector<int> results(100, -1);
126 std::mutex mutex;
127 std::vector<std::thread> popperThreads;
128 for (int i = 0; i < 4; ++i) {
129 popperThreads.emplace_back(Popper{&queue, results.data(), &mutex});
130 }
131
132 std::vector<std::thread> pusherThreads;
133 for (int i = 0; i < 2; ++i) {
134 auto min = i * 50;
135 auto max = (i + 1) * 50;
136 pusherThreads.emplace_back([&queue, min, max] {
137 for (int j = min; j < max; ++j) {
138 queue.push(j);
139 }
140 });
141 }
142
143 for (auto& thread : pusherThreads) {
144 thread.join();
145 }
146 queue.finish();
147
148 for (auto& thread : popperThreads) {
149 thread.join();
150 }
151
152 for (int i = 0; i < 100; ++i) {
153 EXPECT_EQ(i, results[i]);
154 }
155 }
156
157 TEST(WorkQueue, BoundedSizeWorks) {
158 WorkQueue<int> queue(1);
159 int result;
160 queue.push(5);
161 queue.pop(result);
162 queue.push(5);
163 queue.pop(result);
164 queue.push(5);
165 queue.finish();
166 queue.pop(result);
167 EXPECT_EQ(5, result);
168 }
169
170 TEST(WorkQueue, BoundedSizePushAfterFinish) {
171 WorkQueue<int> queue(1);
172 int result;
173 queue.push(5);
174 std::thread pusher([&queue] { queue.push(6); });
175 // Dirtily try and make sure that pusher has run.
176 std::this_thread::sleep_for(std::chrono::seconds(1));
177 queue.finish();
178 EXPECT_TRUE(queue.pop(result));
179 EXPECT_EQ(5, result);
180 EXPECT_FALSE(queue.pop(result));
181
182 pusher.join();
183 }
184
185 TEST(WorkQueue, SetMaxSize) {
186 WorkQueue<int> queue(2);
187 int result;
188 queue.push(5);
189 queue.push(6);
190 queue.setMaxSize(1);
191 std::thread pusher([&queue] { queue.push(7); });
192 // Dirtily try and make sure that pusher has run.
193 std::this_thread::sleep_for(std::chrono::seconds(1));
194 queue.finish();
195 EXPECT_TRUE(queue.pop(result));
196 EXPECT_EQ(5, result);
197 EXPECT_TRUE(queue.pop(result));
198 EXPECT_EQ(6, result);
199 EXPECT_FALSE(queue.pop(result));
200
201 pusher.join();
202 }
203
204 TEST(WorkQueue, BoundedSizeMPMC) {
205 WorkQueue<int> queue(10);
206 std::vector<int> results(200, -1);
207 std::mutex mutex;
208 std::cerr << "Creating popperThreads" << std::endl;
209 std::vector<std::thread> popperThreads;
210 for (int i = 0; i < 4; ++i) {
211 popperThreads.emplace_back(Popper{&queue, results.data(), &mutex});
212 }
213
214 std::cerr << "Creating pusherThreads" << std::endl;
215 std::vector<std::thread> pusherThreads;
216 for (int i = 0; i < 2; ++i) {
217 auto min = i * 100;
218 auto max = (i + 1) * 100;
219 pusherThreads.emplace_back([&queue, min, max] {
220 for (int j = min; j < max; ++j) {
221 queue.push(j);
222 }
223 });
224 }
225
226 std::cerr << "Joining pusherThreads" << std::endl;
227 for (auto& thread : pusherThreads) {
228 thread.join();
229 }
230 std::cerr << "Finishing queue" << std::endl;
231 queue.finish();
232
233 std::cerr << "Joining popperThreads" << std::endl;
234 for (auto& thread : popperThreads) {
235 thread.join();
236 }
237
238 std::cerr << "Inspecting results" << std::endl;
239 for (int i = 0; i < 200; ++i) {
240 EXPECT_EQ(i, results[i]);
241 }
242 }
243
244 TEST(WorkQueue, FailedPush) {
245 WorkQueue<int> queue;
246 EXPECT_TRUE(queue.push(1));
247 queue.finish();
248 EXPECT_FALSE(queue.push(1));
249 }
250
251 TEST(WorkQueue, FailedPop) {
252 WorkQueue<int> queue;
253 int x = 5;
254 EXPECT_TRUE(queue.push(x));
255 queue.finish();
256 x = 0;
257 EXPECT_TRUE(queue.pop(x));
258 EXPECT_EQ(5, x);
259 EXPECT_FALSE(queue.pop(x));
260 EXPECT_EQ(5, x);
261 }
262
263 } // namespace ROCKSDB_NAMESPACE
264
265 int main(int argc, char** argv) {
266 ::testing::InitGoogleTest(&argc, argv);
267 return RUN_ALL_TESTS();
268 }