]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/third-party/folly/folly/synchronization/test/DistributedMutexTest.cpp
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / third-party / folly / folly / synchronization / test / DistributedMutexTest.cpp
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include <folly/synchronization/DistributedMutex.h>
7 #include <folly/container/Array.h>
8 #include <folly/synchronization/Baton.h>
9
10 #ifdef OS_AIX
11 #include "gtest/gtest.h"
12 #else
13 #include <gtest/gtest.h>
14 #endif
15
16 #if !defined(ROCKSDB_LITE) && !defined(__ARM_ARCH) && \
17 !defined(ROCKSDB_VALGRIND_RUN)
18
19 #include <chrono>
20 #include <cmath>
21 #include <thread>
22
23 namespace folly {
24 namespace test {
25 template <template <typename> class Atomic>
26 using TestDistributedMutex =
27 folly::detail::distributed_mutex::DistributedMutex<Atomic, false>;
28 } // namespace test
29
30 namespace {
31 constexpr auto kStressFactor = 1000;
32 constexpr auto kStressTestSeconds = 2;
33 constexpr auto kForever = std::chrono::hours{100};
34
35 int sum(int n) {
36 return (n * (n + 1)) / 2;
37 }
38
39 template <template <typename> class Atom = std::atomic>
40 void basicNThreads(int numThreads, int iterations = kStressFactor) {
41 auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{};
42 auto&& barrier = std::atomic<int>{0};
43 auto&& threads = std::vector<std::thread>{};
44 auto&& result = std::vector<int>{};
45
46 auto&& function = [&](int id) {
47 return [&, id] {
48 for (auto j = 0; j < iterations; ++j) {
49 auto lck = std::unique_lock<_t<std::decay<decltype(mutex)>>>{mutex};
50 EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
51 std::this_thread::yield();
52 result.push_back(id);
53 EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
54 }
55 };
56 };
57
58 for (auto i = 1; i <= numThreads; ++i) {
59 threads.push_back(std::thread(function(i)));
60 }
61 for (auto& thread : threads) {
62 thread.join();
63 }
64
65 auto total = 0;
66 for (auto value : result) {
67 total += value;
68 }
69 EXPECT_EQ(total, sum(numThreads) * iterations);
70 }
71
72 template <template <typename> class Atom = std::atomic>
73 void lockWithTryAndTimedNThreads(
74 int numThreads,
75 std::chrono::seconds duration) {
76 auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{};
77 auto&& barrier = std::atomic<int>{0};
78 auto&& threads = std::vector<std::thread>{};
79 auto&& stop = std::atomic<bool>{false};
80
81 auto&& lockUnlockFunction = [&]() {
82 while (!stop.load()) {
83 auto lck = std::unique_lock<_t<std::decay<decltype(mutex)>>>{mutex};
84 EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
85 std::this_thread::yield();
86 EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
87 }
88 };
89
90 auto tryLockFunction = [&]() {
91 while (!stop.load()) {
92 using Mutex = _t<std::decay<decltype(mutex)>>;
93 auto lck = std::unique_lock<Mutex>{mutex, std::defer_lock};
94 if (lck.try_lock()) {
95 EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
96 std::this_thread::yield();
97 EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
98 }
99 }
100 };
101
102 auto timedLockFunction = [&]() {
103 while (!stop.load()) {
104 using Mutex = _t<std::decay<decltype(mutex)>>;
105 auto lck = std::unique_lock<Mutex>{mutex, std::defer_lock};
106 if (lck.try_lock_for(kForever)) {
107 EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
108 std::this_thread::yield();
109 EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
110 }
111 }
112 };
113
114 for (auto i = 0; i < (numThreads / 3); ++i) {
115 threads.push_back(std::thread(lockUnlockFunction));
116 }
117 for (auto i = 0; i < (numThreads / 3); ++i) {
118 threads.push_back(std::thread(tryLockFunction));
119 }
120 for (auto i = 0; i < (numThreads / 3); ++i) {
121 threads.push_back(std::thread(timedLockFunction));
122 }
123
124 /* sleep override */
125 std::this_thread::sleep_for(duration);
126 stop.store(true);
127 for (auto& thread : threads) {
128 thread.join();
129 }
130 }
131
132 template <template <typename> class Atom = std::atomic>
133 void combineNThreads(int numThreads, std::chrono::seconds duration) {
134 auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{};
135 auto&& barrier = std::atomic<int>{0};
136 auto&& threads = std::vector<std::thread>{};
137 auto&& stop = std::atomic<bool>{false};
138
139 auto&& function = [&]() {
140 return [&] {
141 auto&& expected = std::uint64_t{0};
142 auto&& local = std::atomic<std::uint64_t>{0};
143 auto&& result = std::atomic<std::uint64_t>{0};
144 while (!stop.load()) {
145 ++expected;
146 auto current = mutex.lock_combine([&]() {
147 result.fetch_add(1);
148 EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
149 EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 1);
150 std::this_thread::yield();
151 SCOPE_EXIT {
152 EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
153 };
154 EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 2);
155 return local.fetch_add(1);
156 });
157 EXPECT_EQ(current, expected - 1);
158 }
159
160 EXPECT_EQ(expected, result.load());
161 };
162 };
163
164 for (auto i = 1; i <= numThreads; ++i) {
165 threads.push_back(std::thread(function()));
166 }
167
168 /* sleep override */
169 std::this_thread::sleep_for(duration);
170 stop.store(true);
171 for (auto& thread : threads) {
172 thread.join();
173 }
174 }
175
176 template <template <typename> class Atom = std::atomic>
177 void combineWithLockNThreads(int numThreads, std::chrono::seconds duration) {
178 auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{};
179 auto&& barrier = std::atomic<int>{0};
180 auto&& threads = std::vector<std::thread>{};
181 auto&& stop = std::atomic<bool>{false};
182
183 auto&& lockUnlockFunction = [&]() {
184 while (!stop.load()) {
185 auto lck = std::unique_lock<_t<std::decay<decltype(mutex)>>>{mutex};
186 EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
187 std::this_thread::yield();
188 EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
189 }
190 };
191
192 auto&& combineFunction = [&]() {
193 auto&& expected = std::uint64_t{0};
194 auto&& total = std::atomic<std::uint64_t>{0};
195
196 while (!stop.load()) {
197 ++expected;
198 auto current = mutex.lock_combine([&]() {
199 auto iteration = total.fetch_add(1);
200 EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
201 EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 1);
202 std::this_thread::yield();
203 SCOPE_EXIT {
204 EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
205 };
206 EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 2);
207 return iteration;
208 });
209
210 EXPECT_EQ(expected, current + 1);
211 }
212
213 EXPECT_EQ(expected, total.load());
214 };
215
216 for (auto i = 1; i < (numThreads / 2); ++i) {
217 threads.push_back(std::thread(combineFunction));
218 }
219 for (auto i = 0; i < (numThreads / 2); ++i) {
220 threads.push_back(std::thread(lockUnlockFunction));
221 }
222
223 /* sleep override */
224 std::this_thread::sleep_for(duration);
225 stop.store(true);
226 for (auto& thread : threads) {
227 thread.join();
228 }
229 }
230
231 template <template <typename> class Atom = std::atomic>
232 void combineWithTryLockNThreads(int numThreads, std::chrono::seconds duration) {
233 auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{};
234 auto&& barrier = std::atomic<int>{0};
235 auto&& threads = std::vector<std::thread>{};
236 auto&& stop = std::atomic<bool>{false};
237
238 auto&& lockUnlockFunction = [&]() {
239 while (!stop.load()) {
240 auto lck = std::unique_lock<_t<std::decay<decltype(mutex)>>>{mutex};
241 EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
242 std::this_thread::yield();
243 EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
244 }
245 };
246
247 auto&& combineFunction = [&]() {
248 auto&& expected = std::uint64_t{0};
249 auto&& total = std::atomic<std::uint64_t>{0};
250
251 while (!stop.load()) {
252 ++expected;
253 auto current = mutex.lock_combine([&]() {
254 auto iteration = total.fetch_add(1);
255 EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
256 EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 1);
257 std::this_thread::yield();
258 SCOPE_EXIT {
259 EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
260 };
261 EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 2);
262 return iteration;
263 });
264
265 EXPECT_EQ(expected, current + 1);
266 }
267
268 EXPECT_EQ(expected, total.load());
269 };
270
271 auto tryLockFunction = [&]() {
272 while (!stop.load()) {
273 using Mutex = _t<std::decay<decltype(mutex)>>;
274 auto lck = std::unique_lock<Mutex>{mutex, std::defer_lock};
275 if (lck.try_lock()) {
276 EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
277 std::this_thread::yield();
278 EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
279 }
280 }
281 };
282
283 for (auto i = 0; i < (numThreads / 3); ++i) {
284 threads.push_back(std::thread(lockUnlockFunction));
285 }
286 for (auto i = 0; i < (numThreads / 3); ++i) {
287 threads.push_back(std::thread(combineFunction));
288 }
289 for (auto i = 0; i < (numThreads / 3); ++i) {
290 threads.push_back(std::thread(tryLockFunction));
291 }
292
293 /* sleep override */
294 std::this_thread::sleep_for(duration);
295 stop.store(true);
296 for (auto& thread : threads) {
297 thread.join();
298 }
299 }
300
301 template <template <typename> class Atom = std::atomic>
302 void combineWithLockTryAndTimedNThreads(
303 int numThreads,
304 std::chrono::seconds duration) {
305 auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{};
306 auto&& barrier = std::atomic<int>{0};
307 auto&& threads = std::vector<std::thread>{};
308 auto&& stop = std::atomic<bool>{false};
309
310 auto&& lockUnlockFunction = [&]() {
311 while (!stop.load()) {
312 auto lck = std::unique_lock<_t<std::decay<decltype(mutex)>>>{mutex};
313 EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
314 std::this_thread::yield();
315 EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
316 }
317 };
318
319 auto&& combineFunction = [&]() {
320 auto&& expected = std::uint64_t{0};
321 auto&& total = std::atomic<std::uint64_t>{0};
322
323 while (!stop.load()) {
324 ++expected;
325 auto current = mutex.lock_combine([&]() {
326 auto iteration = total.fetch_add(1);
327 EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
328 EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 1);
329 std::this_thread::yield();
330 SCOPE_EXIT {
331 EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
332 };
333 EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 2);
334
335 // return a non-trivially-copyable object that occupies all the
336 // storage we use to coalesce returns to test that codepath
337 return folly::make_array(
338 iteration,
339 iteration + 1,
340 iteration + 2,
341 iteration + 3,
342 iteration + 4,
343 iteration + 5);
344 });
345
346 EXPECT_EQ(expected, current[0] + 1);
347 EXPECT_EQ(expected, current[1]);
348 EXPECT_EQ(expected, current[2] - 1);
349 EXPECT_EQ(expected, current[3] - 2);
350 EXPECT_EQ(expected, current[4] - 3);
351 EXPECT_EQ(expected, current[5] - 4);
352 }
353
354 EXPECT_EQ(expected, total.load());
355 };
356
357 auto tryLockFunction = [&]() {
358 while (!stop.load()) {
359 using Mutex = _t<std::decay<decltype(mutex)>>;
360 auto lck = std::unique_lock<Mutex>{mutex, std::defer_lock};
361 if (lck.try_lock()) {
362 EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
363 std::this_thread::yield();
364 EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
365 }
366 }
367 };
368
369 auto timedLockFunction = [&]() {
370 while (!stop.load()) {
371 using Mutex = _t<std::decay<decltype(mutex)>>;
372 auto lck = std::unique_lock<Mutex>{mutex, std::defer_lock};
373 if (lck.try_lock_for(kForever)) {
374 EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
375 std::this_thread::yield();
376 EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
377 }
378 }
379 };
380
381 for (auto i = 0; i < (numThreads / 4); ++i) {
382 threads.push_back(std::thread(lockUnlockFunction));
383 }
384 for (auto i = 0; i < (numThreads / 4); ++i) {
385 threads.push_back(std::thread(combineFunction));
386 }
387 for (auto i = 0; i < (numThreads / 4); ++i) {
388 threads.push_back(std::thread(tryLockFunction));
389 }
390 for (auto i = 0; i < (numThreads / 4); ++i) {
391 threads.push_back(std::thread(timedLockFunction));
392 }
393
394 /* sleep override */
395 std::this_thread::sleep_for(duration);
396 stop.store(true);
397 for (auto& thread : threads) {
398 thread.join();
399 }
400 }
401 } // namespace
402
403 TEST(DistributedMutex, InternalDetailTestOne) {
404 auto value = 0;
405 auto ptr = reinterpret_cast<std::uintptr_t>(&value);
406 EXPECT_EQ(folly::detail::distributed_mutex::extractPtr<int>(ptr), &value);
407 ptr = ptr | 0b1;
408 EXPECT_EQ(folly::detail::distributed_mutex::extractPtr<int>(ptr), &value);
409 }
410
411 TEST(DistributedMutex, Basic) {
412 auto&& mutex = folly::DistributedMutex{};
413 auto state = mutex.lock();
414 mutex.unlock(std::move(state));
415 }
416
417 TEST(DistributedMutex, BasicTryLock) {
418 auto&& mutex = folly::DistributedMutex{};
419
420 while (true) {
421 auto state = mutex.try_lock();
422 if (state) {
423 mutex.unlock(std::move(state));
424 break;
425 }
426 }
427 }
428
429 TEST(DistributedMutex, StressTwoThreads) {
430 basicNThreads(2);
431 }
432 TEST(DistributedMutex, StressThreeThreads) {
433 basicNThreads(3);
434 }
435 TEST(DistributedMutex, StressFourThreads) {
436 basicNThreads(4);
437 }
438 TEST(DistributedMutex, StressFiveThreads) {
439 basicNThreads(5);
440 }
441 TEST(DistributedMutex, StressSixThreads) {
442 basicNThreads(6);
443 }
444 TEST(DistributedMutex, StressSevenThreads) {
445 basicNThreads(7);
446 }
447 TEST(DistributedMutex, StressEightThreads) {
448 basicNThreads(8);
449 }
450 TEST(DistributedMutex, StressSixteenThreads) {
451 basicNThreads(16);
452 }
453 TEST(DistributedMutex, StressThirtyTwoThreads) {
454 basicNThreads(32);
455 }
456 TEST(DistributedMutex, StressSixtyFourThreads) {
457 basicNThreads(64);
458 }
459 TEST(DistributedMutex, StressHundredThreads) {
460 basicNThreads(100);
461 }
462 TEST(DistributedMutex, StressHardwareConcurrencyThreads) {
463 basicNThreads(std::thread::hardware_concurrency());
464 }
465
466 TEST(DistributedMutex, StressThreeThreadsLockTryAndTimed) {
467 lockWithTryAndTimedNThreads(3, std::chrono::seconds{kStressTestSeconds});
468 }
469 TEST(DistributedMutex, StressSixThreadsLockTryAndTimed) {
470 lockWithTryAndTimedNThreads(6, std::chrono::seconds{kStressTestSeconds});
471 }
472 TEST(DistributedMutex, StressTwelveThreadsLockTryAndTimed) {
473 lockWithTryAndTimedNThreads(12, std::chrono::seconds{kStressTestSeconds});
474 }
475 TEST(DistributedMutex, StressTwentyFourThreadsLockTryAndTimed) {
476 lockWithTryAndTimedNThreads(24, std::chrono::seconds{kStressTestSeconds});
477 }
478 TEST(DistributedMutex, StressFourtyEightThreadsLockTryAndTimed) {
479 lockWithTryAndTimedNThreads(48, std::chrono::seconds{kStressTestSeconds});
480 }
481 TEST(DistributedMutex, StressSixtyFourThreadsLockTryAndTimed) {
482 lockWithTryAndTimedNThreads(64, std::chrono::seconds{kStressTestSeconds});
483 }
484 TEST(DistributedMutex, StressHwConcThreadsLockTryAndTimed) {
485 lockWithTryAndTimedNThreads(
486 std::thread::hardware_concurrency(),
487 std::chrono::seconds{kStressTestSeconds});
488 }
489
490 TEST(DistributedMutex, StressTwoThreadsCombine) {
491 combineNThreads(2, std::chrono::seconds{kStressTestSeconds});
492 }
493 TEST(DistributedMutex, StressThreeThreadsCombine) {
494 combineNThreads(3, std::chrono::seconds{kStressTestSeconds});
495 }
496 TEST(DistributedMutex, StressFourThreadsCombine) {
497 combineNThreads(4, std::chrono::seconds{kStressTestSeconds});
498 }
499 TEST(DistributedMutex, StressFiveThreadsCombine) {
500 combineNThreads(5, std::chrono::seconds{kStressTestSeconds});
501 }
502 TEST(DistributedMutex, StressSixThreadsCombine) {
503 combineNThreads(6, std::chrono::seconds{kStressTestSeconds});
504 }
505 TEST(DistributedMutex, StressSevenThreadsCombine) {
506 combineNThreads(7, std::chrono::seconds{kStressTestSeconds});
507 }
508 TEST(DistributedMutex, StressEightThreadsCombine) {
509 combineNThreads(8, std::chrono::seconds{kStressTestSeconds});
510 }
511 TEST(DistributedMutex, StressSixteenThreadsCombine) {
512 combineNThreads(16, std::chrono::seconds{kStressTestSeconds});
513 }
514 TEST(DistributedMutex, StressThirtyTwoThreadsCombine) {
515 combineNThreads(32, std::chrono::seconds{kStressTestSeconds});
516 }
517 TEST(DistributedMutex, StressSixtyFourThreadsCombine) {
518 combineNThreads(64, std::chrono::seconds{kStressTestSeconds});
519 }
520 TEST(DistributedMutex, StressHundredThreadsCombine) {
521 combineNThreads(100, std::chrono::seconds{kStressTestSeconds});
522 }
523 TEST(DistributedMutex, StressHardwareConcurrencyThreadsCombine) {
524 combineNThreads(
525 std::thread::hardware_concurrency(),
526 std::chrono::seconds{kStressTestSeconds});
527 }
528
529 TEST(DistributedMutex, StressTwoThreadsCombineAndLock) {
530 combineWithLockNThreads(2, std::chrono::seconds{kStressTestSeconds});
531 }
532 TEST(DistributedMutex, StressFourThreadsCombineAndLock) {
533 combineWithLockNThreads(4, std::chrono::seconds{kStressTestSeconds});
534 }
535 TEST(DistributedMutex, StressEightThreadsCombineAndLock) {
536 combineWithLockNThreads(8, std::chrono::seconds{kStressTestSeconds});
537 }
538 TEST(DistributedMutex, StressSixteenThreadsCombineAndLock) {
539 combineWithLockNThreads(16, std::chrono::seconds{kStressTestSeconds});
540 }
541 TEST(DistributedMutex, StressThirtyTwoThreadsCombineAndLock) {
542 combineWithLockNThreads(32, std::chrono::seconds{kStressTestSeconds});
543 }
544 TEST(DistributedMutex, StressSixtyFourThreadsCombineAndLock) {
545 combineWithLockNThreads(64, std::chrono::seconds{kStressTestSeconds});
546 }
547 TEST(DistributedMutex, StressHardwareConcurrencyThreadsCombineAndLock) {
548 combineWithLockNThreads(
549 std::thread::hardware_concurrency(),
550 std::chrono::seconds{kStressTestSeconds});
551 }
552
553 TEST(DistributedMutex, StressThreeThreadsCombineTryLockAndLock) {
554 combineWithTryLockNThreads(3, std::chrono::seconds{kStressTestSeconds});
555 }
556 TEST(DistributedMutex, StressSixThreadsCombineTryLockAndLock) {
557 combineWithTryLockNThreads(6, std::chrono::seconds{kStressTestSeconds});
558 }
559 TEST(DistributedMutex, StressTwelveThreadsCombineTryLockAndLock) {
560 combineWithTryLockNThreads(12, std::chrono::seconds{kStressTestSeconds});
561 }
562 TEST(DistributedMutex, StressTwentyFourThreadsCombineTryLockAndLock) {
563 combineWithTryLockNThreads(24, std::chrono::seconds{kStressTestSeconds});
564 }
565 TEST(DistributedMutex, StressFourtyEightThreadsCombineTryLockAndLock) {
566 combineWithTryLockNThreads(48, std::chrono::seconds{kStressTestSeconds});
567 }
568 TEST(DistributedMutex, StressSixtyFourThreadsCombineTryLockAndLock) {
569 combineWithTryLockNThreads(64, std::chrono::seconds{kStressTestSeconds});
570 }
571 TEST(DistributedMutex, StressHardwareConcurrencyThreadsCombineTryLockAndLock) {
572 combineWithTryLockNThreads(
573 std::thread::hardware_concurrency(),
574 std::chrono::seconds{kStressTestSeconds});
575 }
576
577 TEST(DistributedMutex, StressThreeThreadsCombineTryLockLockAndTimed) {
578 combineWithLockTryAndTimedNThreads(
579 3, std::chrono::seconds{kStressTestSeconds});
580 }
581 TEST(DistributedMutex, StressSixThreadsCombineTryLockLockAndTimed) {
582 combineWithLockTryAndTimedNThreads(
583 6, std::chrono::seconds{kStressTestSeconds});
584 }
585 TEST(DistributedMutex, StressTwelveThreadsCombineTryLockLockAndTimed) {
586 combineWithLockTryAndTimedNThreads(
587 12, std::chrono::seconds{kStressTestSeconds});
588 }
589 TEST(DistributedMutex, StressTwentyFourThreadsCombineTryLockLockAndTimed) {
590 combineWithLockTryAndTimedNThreads(
591 24, std::chrono::seconds{kStressTestSeconds});
592 }
593 TEST(DistributedMutex, StressFourtyEightThreadsCombineTryLockLockAndTimed) {
594 combineWithLockTryAndTimedNThreads(
595 48, std::chrono::seconds{kStressTestSeconds});
596 }
597 TEST(DistributedMutex, StressSixtyFourThreadsCombineTryLockLockAndTimed) {
598 combineWithLockTryAndTimedNThreads(
599 64, std::chrono::seconds{kStressTestSeconds});
600 }
601 TEST(DistributedMutex, StressHwConcurrencyThreadsCombineTryLockLockAndTimed) {
602 combineWithLockTryAndTimedNThreads(
603 std::thread::hardware_concurrency(),
604 std::chrono::seconds{kStressTestSeconds});
605 }
606
607 TEST(DistributedMutex, StressTryLock) {
608 auto&& mutex = folly::DistributedMutex{};
609
610 for (auto i = 0; i < kStressFactor; ++i) {
611 while (true) {
612 auto state = mutex.try_lock();
613 if (state) {
614 mutex.unlock(std::move(state));
615 break;
616 }
617 }
618 }
619 }
620
621 TEST(DistributedMutex, TimedLockTimeout) {
622 auto&& mutex = folly::DistributedMutex{};
623 auto&& start = folly::Baton<>{};
624 auto&& done = folly::Baton<>{};
625
626 auto thread = std::thread{[&]() {
627 auto state = mutex.lock();
628 start.post();
629 done.wait();
630 mutex.unlock(std::move(state));
631 }};
632
633 start.wait();
634 auto result = mutex.try_lock_for(std::chrono::milliseconds{10});
635 EXPECT_FALSE(result);
636 done.post();
637 thread.join();
638 }
639
640 TEST(DistributedMutex, TimedLockAcquireAfterUnlock) {
641 auto&& mutex = folly::DistributedMutex{};
642 auto&& start = folly::Baton<>{};
643
644 auto thread = std::thread{[&]() {
645 auto state = mutex.lock();
646 start.post();
647 /* sleep override */
648 std::this_thread::sleep_for(std::chrono::milliseconds{10});
649 mutex.unlock(std::move(state));
650 }};
651
652 start.wait();
653 auto result = mutex.try_lock_for(kForever);
654 EXPECT_TRUE(result);
655 thread.join();
656 }
657
658 namespace {
659 template <template <typename> class Atom = std::atomic>
660 void stressTryLockWithConcurrentLocks(
661 int numThreads,
662 int iterations = kStressFactor) {
663 auto&& threads = std::vector<std::thread>{};
664 auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{};
665 auto&& atomic = std::atomic<std::uint64_t>{0};
666
667 for (auto i = 0; i < numThreads; ++i) {
668 threads.push_back(std::thread([&] {
669 for (auto j = 0; j < iterations; ++j) {
670 auto state = mutex.lock();
671 EXPECT_EQ(atomic.fetch_add(1, std::memory_order_relaxed), 0);
672 EXPECT_EQ(atomic.fetch_sub(1, std::memory_order_relaxed), 1);
673 mutex.unlock(std::move(state));
674 }
675 }));
676 }
677
678 for (auto i = 0; i < iterations; ++i) {
679 if (auto state = mutex.try_lock()) {
680 EXPECT_EQ(atomic.fetch_add(1, std::memory_order_relaxed), 0);
681 EXPECT_EQ(atomic.fetch_sub(1, std::memory_order_relaxed), 1);
682 mutex.unlock(std::move(state));
683 }
684 }
685
686 for (auto& thread : threads) {
687 thread.join();
688 }
689 }
690 } // namespace
691
692 TEST(DistributedMutex, StressTryLockWithConcurrentLocksTwoThreads) {
693 stressTryLockWithConcurrentLocks(2);
694 }
695 TEST(DistributedMutex, StressTryLockWithConcurrentLocksFourThreads) {
696 stressTryLockWithConcurrentLocks(4);
697 }
698 TEST(DistributedMutex, StressTryLockWithConcurrentLocksEightThreads) {
699 stressTryLockWithConcurrentLocks(8);
700 }
701 TEST(DistributedMutex, StressTryLockWithConcurrentLocksSixteenThreads) {
702 stressTryLockWithConcurrentLocks(16);
703 }
704 TEST(DistributedMutex, StressTryLockWithConcurrentLocksThirtyTwoThreads) {
705 stressTryLockWithConcurrentLocks(32);
706 }
707 TEST(DistributedMutex, StressTryLockWithConcurrentLocksSixtyFourThreads) {
708 stressTryLockWithConcurrentLocks(64);
709 }
710
711 namespace {
712 template <template <typename> class Atom = std::atomic>
713 void concurrentTryLocks(int numThreads, int iterations = kStressFactor) {
714 auto&& threads = std::vector<std::thread>{};
715 auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{};
716 auto&& atomic = std::atomic<std::uint64_t>{0};
717
718 for (auto i = 0; i < numThreads; ++i) {
719 threads.push_back(std::thread([&] {
720 for (auto j = 0; j < iterations; ++j) {
721 if (auto state = mutex.try_lock()) {
722 EXPECT_EQ(atomic.fetch_add(1, std::memory_order_relaxed), 0);
723 EXPECT_EQ(atomic.fetch_sub(1, std::memory_order_relaxed), 1);
724 mutex.unlock(std::move(state));
725 }
726 }
727 }));
728 }
729
730 for (auto& thread : threads) {
731 thread.join();
732 }
733 }
734 } // namespace
735
736 TEST(DistributedMutex, StressTryLockWithTwoThreads) {
737 concurrentTryLocks(2);
738 }
739 TEST(DistributedMutex, StressTryLockFourThreads) {
740 concurrentTryLocks(4);
741 }
742 TEST(DistributedMutex, StressTryLockEightThreads) {
743 concurrentTryLocks(8);
744 }
745 TEST(DistributedMutex, StressTryLockSixteenThreads) {
746 concurrentTryLocks(16);
747 }
748 TEST(DistributedMutex, StressTryLockThirtyTwoThreads) {
749 concurrentTryLocks(32);
750 }
751 TEST(DistributedMutex, StressTryLockSixtyFourThreads) {
752 concurrentTryLocks(64);
753 }
754
755 namespace {
756 class TestConstruction {
757 public:
758 TestConstruction() = delete;
759 explicit TestConstruction(int) {
760 defaultConstructs().fetch_add(1, std::memory_order_relaxed);
761 }
762 TestConstruction(TestConstruction&&) noexcept {
763 moveConstructs().fetch_add(1, std::memory_order_relaxed);
764 }
765 TestConstruction(const TestConstruction&) {
766 copyConstructs().fetch_add(1, std::memory_order_relaxed);
767 }
768 TestConstruction& operator=(const TestConstruction&) {
769 copyAssigns().fetch_add(1, std::memory_order_relaxed);
770 return *this;
771 }
772 TestConstruction& operator=(TestConstruction&&) {
773 moveAssigns().fetch_add(1, std::memory_order_relaxed);
774 return *this;
775 }
776 ~TestConstruction() {
777 destructs().fetch_add(1, std::memory_order_relaxed);
778 }
779
780 static std::atomic<std::uint64_t>& defaultConstructs() {
781 static auto&& atomic = std::atomic<std::uint64_t>{0};
782 return atomic;
783 }
784 static std::atomic<std::uint64_t>& moveConstructs() {
785 static auto&& atomic = std::atomic<std::uint64_t>{0};
786 return atomic;
787 }
788 static std::atomic<std::uint64_t>& copyConstructs() {
789 static auto&& atomic = std::atomic<std::uint64_t>{0};
790 return atomic;
791 }
792 static std::atomic<std::uint64_t>& moveAssigns() {
793 static auto&& atomic = std::atomic<std::uint64_t>{0};
794 return atomic;
795 }
796 static std::atomic<std::uint64_t>& copyAssigns() {
797 static auto&& atomic = std::atomic<std::uint64_t>{0};
798 return atomic;
799 }
800 static std::atomic<std::uint64_t>& destructs() {
801 static auto&& atomic = std::atomic<std::uint64_t>{0};
802 return atomic;
803 }
804
805 static void reset() {
806 defaultConstructs().store(0);
807 moveConstructs().store(0);
808 copyConstructs().store(0);
809 copyAssigns().store(0);
810 destructs().store(0);
811 }
812 };
813 } // namespace
814
815 TEST(DistributedMutex, TestAppropriateDestructionAndConstructionWithCombine) {
816 auto&& mutex = folly::DistributedMutex{};
817 auto&& stop = std::atomic<bool>{false};
818
819 // test the simple return path to make sure that in the absence of
820 // contention, we get the right number of constructs and destructs
821 mutex.lock_combine([]() { return TestConstruction{1}; });
822 auto moves = TestConstruction::moveConstructs().load();
823 auto defaults = TestConstruction::defaultConstructs().load();
824 EXPECT_EQ(TestConstruction::defaultConstructs().load(), 1);
825 EXPECT_TRUE(moves == 0 || moves == 1);
826 EXPECT_EQ(TestConstruction::destructs().load(), moves + defaults);
827
828 // loop and make sure we were able to test the path where the critical
829 // section of the thread gets combined, and assert that we see the expected
830 // number of constructions and destructions
831 //
832 // this implements a timed backoff to test the combined path, so we use the
833 // smallest possible delay in tests
834 auto thread = std::thread{[&]() {
835 auto&& duration = std::chrono::milliseconds{10};
836 while (!stop.load()) {
837 TestConstruction::reset();
838 auto&& ready = folly::Baton<>{};
839 auto&& release = folly::Baton<>{};
840
841 // make one thread start it's critical section, signal and wait for
842 // another thread to enqueue, to test the
843 auto innerThread = std::thread{[&]() {
844 mutex.lock_combine([&]() {
845 ready.post();
846 release.wait();
847 /* sleep override */
848 std::this_thread::sleep_for(duration);
849 });
850 }};
851
852 // wait for the thread to get in its critical section, then tell it to go
853 ready.wait();
854 release.post();
855 mutex.lock_combine([&]() { return TestConstruction{1}; });
856
857 innerThread.join();
858
859 // at this point we should have only one default construct, either 3
860 // or 4 move constructs the same number of destructions as
861 // constructions
862 auto innerDefaults = TestConstruction::defaultConstructs().load();
863 auto innerMoves = TestConstruction::moveConstructs().load();
864 auto destructs = TestConstruction::destructs().load();
865 EXPECT_EQ(innerDefaults, 1);
866 EXPECT_TRUE(innerMoves == 3 || innerMoves == 4 || innerMoves == 1);
867 EXPECT_EQ(destructs, innerMoves + innerDefaults);
868 EXPECT_EQ(TestConstruction::moveAssigns().load(), 0);
869 EXPECT_EQ(TestConstruction::copyAssigns().load(), 0);
870
871 // increase duration by 100ms each iteration
872 duration = duration + std::chrono::milliseconds{100};
873 }
874 }};
875
876 /* sleep override */
877 std::this_thread::sleep_for(std::chrono::seconds{kStressTestSeconds});
878 stop.store(true);
879 thread.join();
880 }
881
882 namespace {
883 template <template <typename> class Atom = std::atomic>
884 void concurrentLocksManyMutexes(int numThreads, std::chrono::seconds duration) {
885 using DMutex = folly::detail::distributed_mutex::DistributedMutex<Atom>;
886 const auto&& kNumMutexes = 10;
887 auto&& threads = std::vector<std::thread>{};
888 auto&& mutexes = std::vector<DMutex>(kNumMutexes);
889 auto&& barriers = std::vector<std::atomic<std::uint64_t>>(kNumMutexes);
890 auto&& stop = std::atomic<bool>{false};
891
892 for (auto i = 0; i < numThreads; ++i) {
893 threads.push_back(std::thread([&] {
894 auto&& total = std::atomic<std::uint64_t>{0};
895 auto&& expected = std::uint64_t{0};
896
897 for (auto j = 0; !stop.load(std::memory_order_relaxed); ++j) {
898 auto& mutex = mutexes[j % kNumMutexes];
899 auto& barrier = barriers[j % kNumMutexes];
900
901 ++expected;
902 auto result = mutex.lock_combine([&]() {
903 EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
904 EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 1);
905 std::this_thread::yield();
906 SCOPE_EXIT {
907 EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
908 };
909 EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 2);
910 return total.fetch_add(1, std::memory_order_relaxed);
911 });
912 EXPECT_EQ(result, expected - 1);
913 }
914
915 EXPECT_EQ(total.load(), expected);
916 }));
917 }
918
919 /* sleep override */
920 std::this_thread::sleep_for(duration);
921 stop.store(true);
922 for (auto& thread : threads) {
923 thread.join();
924 }
925 }
926 } // namespace
927
928 TEST(DistributedMutex, StressWithManyMutexesAlternatingTwoThreads) {
929 concurrentLocksManyMutexes(2, std::chrono::seconds{kStressTestSeconds});
930 }
931 TEST(DistributedMutex, StressWithManyMutexesAlternatingFourThreads) {
932 concurrentLocksManyMutexes(4, std::chrono::seconds{kStressTestSeconds});
933 }
934 TEST(DistributedMutex, StressWithManyMutexesAlternatingEightThreads) {
935 concurrentLocksManyMutexes(8, std::chrono::seconds{kStressTestSeconds});
936 }
937 TEST(DistributedMutex, StressWithManyMutexesAlternatingSixteenThreads) {
938 concurrentLocksManyMutexes(16, std::chrono::seconds{kStressTestSeconds});
939 }
940 TEST(DistributedMutex, StressWithManyMutexesAlternatingThirtyTwoThreads) {
941 concurrentLocksManyMutexes(32, std::chrono::seconds{kStressTestSeconds});
942 }
943 TEST(DistributedMutex, StressWithManyMutexesAlternatingSixtyFourThreads) {
944 concurrentLocksManyMutexes(64, std::chrono::seconds{kStressTestSeconds});
945 }
946
947 namespace {
948 class ExceptionWithConstructionTrack : public std::exception {
949 public:
950 explicit ExceptionWithConstructionTrack(int id)
951 : id_{std::to_string(id)}, constructionTrack_{id} {}
952
953 const char* what() const noexcept override {
954 return id_.c_str();
955 }
956
957 private:
958 std::string id_;
959 TestConstruction constructionTrack_;
960 };
961 } // namespace
962
963 TEST(DistributedMutex, TestExceptionPropagationUncontended) {
964 TestConstruction::reset();
965 auto&& mutex = folly::DistributedMutex{};
966 auto&& thread = std::thread{[&]() {
967 try {
968 mutex.lock_combine([&]() { throw ExceptionWithConstructionTrack{46}; });
969 } catch (std::exception& exc) {
970 auto integer = std::stoi(exc.what());
971 EXPECT_EQ(integer, 46);
972 EXPECT_GT(TestConstruction::defaultConstructs(), 0);
973 }
974 EXPECT_EQ(
975 TestConstruction::defaultConstructs(), TestConstruction::destructs());
976 }};
977 thread.join();
978 }
979
980 namespace {
981 template <template <typename> class Atom = std::atomic>
982 void concurrentExceptionPropagationStress(
983 int numThreads,
984 std::chrono::milliseconds t) {
985 // this test fails under with a false negative under older versions of TSAN
986 // for some reason so disable it when TSAN is enabled
987 if (folly::kIsSanitizeThread) {
988 return;
989 }
990
991 TestConstruction::reset();
992 auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{};
993 auto&& threads = std::vector<std::thread>{};
994 auto&& stop = std::atomic<bool>{false};
995 auto&& barrier = std::atomic<std::uint64_t>{0};
996
997 for (auto i = 0; i < numThreads; ++i) {
998 threads.push_back(std::thread([&]() {
999 for (auto j = 0; !stop.load(); ++j) {
1000 auto value = int{0};
1001 try {
1002 value = mutex.lock_combine([&]() {
1003 EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
1004 EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 1);
1005 std::this_thread::yield();
1006 SCOPE_EXIT {
1007 EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
1008 };
1009 EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 2);
1010
1011 // we only throw an exception once every 3 times
1012 if (!(j % 3)) {
1013 throw ExceptionWithConstructionTrack{j};
1014 }
1015
1016 return j;
1017 });
1018 } catch (std::exception& exc) {
1019 value = std::stoi(exc.what());
1020 }
1021
1022 EXPECT_EQ(value, j);
1023 }
1024 }));
1025 }
1026
1027 /* sleep override */
1028 std::this_thread::sleep_for(t);
1029 stop.store(true);
1030 for (auto& thread : threads) {
1031 thread.join();
1032 }
1033 }
1034 } // namespace
1035
1036 TEST(DistributedMutex, TestExceptionPropagationStressTwoThreads) {
1037 concurrentExceptionPropagationStress(
1038 2, std::chrono::seconds{kStressTestSeconds});
1039 }
1040 TEST(DistributedMutex, TestExceptionPropagationStressFourThreads) {
1041 concurrentExceptionPropagationStress(
1042 4, std::chrono::seconds{kStressTestSeconds});
1043 }
1044 TEST(DistributedMutex, TestExceptionPropagationStressEightThreads) {
1045 concurrentExceptionPropagationStress(
1046 8, std::chrono::seconds{kStressTestSeconds});
1047 }
1048 TEST(DistributedMutex, TestExceptionPropagationStressSixteenThreads) {
1049 concurrentExceptionPropagationStress(
1050 16, std::chrono::seconds{kStressTestSeconds});
1051 }
1052 TEST(DistributedMutex, TestExceptionPropagationStressThirtyTwoThreads) {
1053 concurrentExceptionPropagationStress(
1054 32, std::chrono::seconds{kStressTestSeconds});
1055 }
1056 TEST(DistributedMutex, TestExceptionPropagationStressSixtyFourThreads) {
1057 concurrentExceptionPropagationStress(
1058 64, std::chrono::seconds{kStressTestSeconds});
1059 }
1060
1061 namespace {
1062 std::array<std::uint64_t, 8> makeMonotonicArray(int start) {
1063 auto array = std::array<std::uint64_t, 8>{};
1064 for (auto& element : array) { element = start++; }
1065 return array;
1066 }
1067
1068 template <template <typename> class Atom = std::atomic>
1069 void concurrentBigValueReturnStress(
1070 int numThreads,
1071 std::chrono::milliseconds t) {
1072 auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{};
1073 auto&& threads = std::vector<std::thread>{};
1074 auto&& stop = std::atomic<bool>{false};
1075 auto&& barrier = std::atomic<std::uint64_t>{0};
1076
1077 for (auto i = 0; i < numThreads; ++i) {
1078 threads.push_back(std::thread([&]() {
1079 auto&& value = std::atomic<std::uint64_t>{0};
1080
1081 for (auto j = 0; !stop.load(); ++j) {
1082 auto returned = mutex.lock_combine([&]() {
1083 EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
1084 EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 1);
1085 std::this_thread::yield();
1086 // return an entire cacheline worth of data
1087 auto current = value.fetch_add(1, std::memory_order_relaxed);
1088 SCOPE_EXIT {
1089 EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
1090 };
1091 EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 2);
1092 return makeMonotonicArray(static_cast<int>(current));
1093 });
1094
1095 auto expected = value.load() - 1;
1096 for (auto& element : returned) {
1097 EXPECT_EQ(element, expected++);
1098 }
1099 }
1100 }));
1101 }
1102
1103 /* sleep override */
1104 std::this_thread::sleep_for(t);
1105 stop.store(true);
1106 for (auto& thread : threads) {
1107 thread.join();
1108 }
1109 }
1110 } // namespace
1111
1112 TEST(DistributedMutex, StressBigValueReturnTwoThreads) {
1113 concurrentBigValueReturnStress(2, std::chrono::seconds{kStressTestSeconds});
1114 }
1115 TEST(DistributedMutex, StressBigValueReturnFourThreads) {
1116 concurrentBigValueReturnStress(4, std::chrono::seconds{kStressTestSeconds});
1117 }
1118 TEST(DistributedMutex, StressBigValueReturnEightThreads) {
1119 concurrentBigValueReturnStress(8, std::chrono::seconds{kStressTestSeconds});
1120 }
1121 TEST(DistributedMutex, StressBigValueReturnSixteenThreads) {
1122 concurrentBigValueReturnStress(16, std::chrono::seconds{kStressTestSeconds});
1123 }
1124 TEST(DistributedMutex, StressBigValueReturnThirtyTwoThreads) {
1125 concurrentBigValueReturnStress(32, std::chrono::seconds{kStressTestSeconds});
1126 }
1127 TEST(DistributedMutex, StressBigValueReturnSixtyFourThreads) {
1128 concurrentBigValueReturnStress(64, std::chrono::seconds{kStressTestSeconds});
1129 }
1130
1131 } // namespace folly
1132
1133 int main(int argc, char** argv) {
1134 ::testing::InitGoogleTest(&argc, argv);
1135 return RUN_ALL_TESTS();
1136 }
1137
1138 #else
1139 int main(int /*argc*/, char** /*argv*/) {
1140 printf(
1141 "DistributedMutex is not supported in ROCKSDB_LITE, on ARM, or in "
1142 "valgrind_test runs\n");
1143 return 0;
1144 }
1145 #endif // !ROCKSDB_LITE && !__ARM_ARCH && !ROCKSDB_VALGRIND_RUN