]> git.proxmox.com Git - ceph.git/blob - ceph/src/zstd/contrib/pzstd/utils/test/WorkQueueTest.cpp
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / zstd / contrib / pzstd / utils / test / WorkQueueTest.cpp
1 /**
2 * Copyright (c) 2016-present, Facebook, Inc.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree. An additional grant
7 * of patent rights can be found in the PATENTS file in the same directory.
8 */
9 #include "utils/Buffer.h"
10 #include "utils/WorkQueue.h"
11
12 #include <gtest/gtest.h>
13 #include <memory>
14 #include <mutex>
15 #include <thread>
16 #include <vector>
17
18 using namespace pzstd;
19
20 namespace {
21 struct Popper {
22 WorkQueue<int>* queue;
23 int* results;
24 std::mutex* mutex;
25
26 void operator()() {
27 int result;
28 while (queue->pop(result)) {
29 std::lock_guard<std::mutex> lock(*mutex);
30 results[result] = result;
31 }
32 }
33 };
34 }
35
36 TEST(WorkQueue, SingleThreaded) {
37 WorkQueue<int> queue;
38 int result;
39
40 queue.push(5);
41 EXPECT_TRUE(queue.pop(result));
42 EXPECT_EQ(5, result);
43
44 queue.push(1);
45 queue.push(2);
46 EXPECT_TRUE(queue.pop(result));
47 EXPECT_EQ(1, result);
48 EXPECT_TRUE(queue.pop(result));
49 EXPECT_EQ(2, result);
50
51 queue.push(1);
52 queue.push(2);
53 queue.finish();
54 EXPECT_TRUE(queue.pop(result));
55 EXPECT_EQ(1, result);
56 EXPECT_TRUE(queue.pop(result));
57 EXPECT_EQ(2, result);
58 EXPECT_FALSE(queue.pop(result));
59
60 queue.waitUntilFinished();
61 }
62
63 TEST(WorkQueue, SPSC) {
64 WorkQueue<int> queue;
65 const int max = 100;
66
67 for (int i = 0; i < 10; ++i) {
68 queue.push(int{i});
69 }
70
71 std::thread thread([ &queue, max ] {
72 int result;
73 for (int i = 0;; ++i) {
74 if (!queue.pop(result)) {
75 EXPECT_EQ(i, max);
76 break;
77 }
78 EXPECT_EQ(i, result);
79 }
80 });
81
82 std::this_thread::yield();
83 for (int i = 10; i < max; ++i) {
84 queue.push(int{i});
85 }
86 queue.finish();
87
88 thread.join();
89 }
90
91 TEST(WorkQueue, SPMC) {
92 WorkQueue<int> queue;
93 std::vector<int> results(50, -1);
94 std::mutex mutex;
95 std::vector<std::thread> threads;
96 for (int i = 0; i < 5; ++i) {
97 threads.emplace_back(Popper{&queue, results.data(), &mutex});
98 }
99
100 for (int i = 0; i < 50; ++i) {
101 queue.push(int{i});
102 }
103 queue.finish();
104
105 for (auto& thread : threads) {
106 thread.join();
107 }
108
109 for (int i = 0; i < 50; ++i) {
110 EXPECT_EQ(i, results[i]);
111 }
112 }
113
114 TEST(WorkQueue, MPMC) {
115 WorkQueue<int> queue;
116 std::vector<int> results(100, -1);
117 std::mutex mutex;
118 std::vector<std::thread> popperThreads;
119 for (int i = 0; i < 4; ++i) {
120 popperThreads.emplace_back(Popper{&queue, results.data(), &mutex});
121 }
122
123 std::vector<std::thread> pusherThreads;
124 for (int i = 0; i < 2; ++i) {
125 auto min = i * 50;
126 auto max = (i + 1) * 50;
127 pusherThreads.emplace_back(
128 [ &queue, min, max ] {
129 for (int i = min; i < max; ++i) {
130 queue.push(int{i});
131 }
132 });
133 }
134
135 for (auto& thread : pusherThreads) {
136 thread.join();
137 }
138 queue.finish();
139
140 for (auto& thread : popperThreads) {
141 thread.join();
142 }
143
144 for (int i = 0; i < 100; ++i) {
145 EXPECT_EQ(i, results[i]);
146 }
147 }
148
149 TEST(WorkQueue, BoundedSizeWorks) {
150 WorkQueue<int> queue(1);
151 int result;
152 queue.push(5);
153 queue.pop(result);
154 queue.push(5);
155 queue.pop(result);
156 queue.push(5);
157 queue.finish();
158 queue.pop(result);
159 EXPECT_EQ(5, result);
160 }
161
162 TEST(WorkQueue, BoundedSizePushAfterFinish) {
163 WorkQueue<int> queue(1);
164 int result;
165 queue.push(5);
166 std::thread pusher([&queue] {
167 queue.push(6);
168 });
169 // Dirtily try and make sure that pusher has run.
170 std::this_thread::sleep_for(std::chrono::seconds(1));
171 queue.finish();
172 EXPECT_TRUE(queue.pop(result));
173 EXPECT_EQ(5, result);
174 EXPECT_FALSE(queue.pop(result));
175
176 pusher.join();
177 }
178
179 TEST(WorkQueue, SetMaxSize) {
180 WorkQueue<int> queue(2);
181 int result;
182 queue.push(5);
183 queue.push(6);
184 queue.setMaxSize(1);
185 std::thread pusher([&queue] {
186 queue.push(7);
187 });
188 // Dirtily try and make sure that pusher has run.
189 std::this_thread::sleep_for(std::chrono::seconds(1));
190 queue.finish();
191 EXPECT_TRUE(queue.pop(result));
192 EXPECT_EQ(5, result);
193 EXPECT_TRUE(queue.pop(result));
194 EXPECT_EQ(6, result);
195 EXPECT_FALSE(queue.pop(result));
196
197 pusher.join();
198 }
199
200 TEST(WorkQueue, BoundedSizeMPMC) {
201 WorkQueue<int> queue(10);
202 std::vector<int> results(200, -1);
203 std::mutex mutex;
204 std::vector<std::thread> popperThreads;
205 for (int i = 0; i < 4; ++i) {
206 popperThreads.emplace_back(Popper{&queue, results.data(), &mutex});
207 }
208
209 std::vector<std::thread> pusherThreads;
210 for (int i = 0; i < 2; ++i) {
211 auto min = i * 100;
212 auto max = (i + 1) * 100;
213 pusherThreads.emplace_back(
214 [ &queue, min, max ] {
215 for (int i = min; i < max; ++i) {
216 queue.push(int{i});
217 }
218 });
219 }
220
221 for (auto& thread : pusherThreads) {
222 thread.join();
223 }
224 queue.finish();
225
226 for (auto& thread : popperThreads) {
227 thread.join();
228 }
229
230 for (int i = 0; i < 200; ++i) {
231 EXPECT_EQ(i, results[i]);
232 }
233 }
234
235 TEST(WorkQueue, FailedPush) {
236 WorkQueue<std::unique_ptr<int>> queue;
237 std::unique_ptr<int> x(new int{5});
238 EXPECT_TRUE(queue.push(std::move(x)));
239 EXPECT_EQ(nullptr, x);
240 queue.finish();
241 x.reset(new int{6});
242 EXPECT_FALSE(queue.push(std::move(x)));
243 EXPECT_NE(nullptr, x);
244 EXPECT_EQ(6, *x);
245 }
246
247 TEST(BufferWorkQueue, SizeCalculatedCorrectly) {
248 {
249 BufferWorkQueue queue;
250 queue.finish();
251 EXPECT_EQ(0, queue.size());
252 }
253 {
254 BufferWorkQueue queue;
255 queue.push(Buffer(10));
256 queue.finish();
257 EXPECT_EQ(10, queue.size());
258 }
259 {
260 BufferWorkQueue queue;
261 queue.push(Buffer(10));
262 queue.push(Buffer(5));
263 queue.finish();
264 EXPECT_EQ(15, queue.size());
265 }
266 {
267 BufferWorkQueue queue;
268 queue.push(Buffer(10));
269 queue.push(Buffer(5));
270 queue.finish();
271 Buffer buffer;
272 queue.pop(buffer);
273 EXPECT_EQ(5, queue.size());
274 }
275 }