1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include <folly/synchronization/DistributedMutex.h>
7 #include <folly/container/Array.h>
8 #include <folly/synchronization/Baton.h>
11 #include "gtest/gtest.h"
13 #include <gtest/gtest.h>
16 #if !defined(ROCKSDB_LITE) && !defined(__ARM_ARCH) && \
17 !defined(ROCKSDB_VALGRIND_RUN)
25 template <template <typename
> class Atomic
>
26 using TestDistributedMutex
=
27 folly::detail::distributed_mutex::DistributedMutex
<Atomic
, false>;
31 constexpr auto kStressFactor
= 1000;
32 constexpr auto kStressTestSeconds
= 2;
33 constexpr auto kForever
= std::chrono::hours
{100};
36 return (n
* (n
+ 1)) / 2;
39 template <template <typename
> class Atom
= std::atomic
>
40 void basicNThreads(int numThreads
, int iterations
= kStressFactor
) {
41 auto&& mutex
= folly::detail::distributed_mutex::DistributedMutex
<Atom
>{};
42 auto&& barrier
= std::atomic
<int>{0};
43 auto&& threads
= std::vector
<std::thread
>{};
44 auto&& result
= std::vector
<int>{};
46 auto&& function
= [&](int id
) {
48 for (auto j
= 0; j
< iterations
; ++j
) {
49 auto lck
= std::unique_lock
<_t
<std::decay
<decltype(mutex
)>>>{mutex
};
50 EXPECT_EQ(barrier
.fetch_add(1, std::memory_order_relaxed
), 0);
51 std::this_thread::yield();
53 EXPECT_EQ(barrier
.fetch_sub(1, std::memory_order_relaxed
), 1);
58 for (auto i
= 1; i
<= numThreads
; ++i
) {
59 threads
.push_back(std::thread(function(i
)));
61 for (auto& thread
: threads
) {
66 for (auto value
: result
) {
69 EXPECT_EQ(total
, sum(numThreads
) * iterations
);
72 template <template <typename
> class Atom
= std::atomic
>
73 void lockWithTryAndTimedNThreads(
75 std::chrono::seconds duration
) {
76 auto&& mutex
= folly::detail::distributed_mutex::DistributedMutex
<Atom
>{};
77 auto&& barrier
= std::atomic
<int>{0};
78 auto&& threads
= std::vector
<std::thread
>{};
79 auto&& stop
= std::atomic
<bool>{false};
81 auto&& lockUnlockFunction
= [&]() {
82 while (!stop
.load()) {
83 auto lck
= std::unique_lock
<_t
<std::decay
<decltype(mutex
)>>>{mutex
};
84 EXPECT_EQ(barrier
.fetch_add(1, std::memory_order_relaxed
), 0);
85 std::this_thread::yield();
86 EXPECT_EQ(barrier
.fetch_sub(1, std::memory_order_relaxed
), 1);
90 auto tryLockFunction
= [&]() {
91 while (!stop
.load()) {
92 using Mutex
= _t
<std::decay
<decltype(mutex
)>>;
93 auto lck
= std::unique_lock
<Mutex
>{mutex
, std::defer_lock
};
95 EXPECT_EQ(barrier
.fetch_add(1, std::memory_order_relaxed
), 0);
96 std::this_thread::yield();
97 EXPECT_EQ(barrier
.fetch_sub(1, std::memory_order_relaxed
), 1);
102 auto timedLockFunction
= [&]() {
103 while (!stop
.load()) {
104 using Mutex
= _t
<std::decay
<decltype(mutex
)>>;
105 auto lck
= std::unique_lock
<Mutex
>{mutex
, std::defer_lock
};
106 if (lck
.try_lock_for(kForever
)) {
107 EXPECT_EQ(barrier
.fetch_add(1, std::memory_order_relaxed
), 0);
108 std::this_thread::yield();
109 EXPECT_EQ(barrier
.fetch_sub(1, std::memory_order_relaxed
), 1);
114 for (auto i
= 0; i
< (numThreads
/ 3); ++i
) {
115 threads
.push_back(std::thread(lockUnlockFunction
));
117 for (auto i
= 0; i
< (numThreads
/ 3); ++i
) {
118 threads
.push_back(std::thread(tryLockFunction
));
120 for (auto i
= 0; i
< (numThreads
/ 3); ++i
) {
121 threads
.push_back(std::thread(timedLockFunction
));
125 std::this_thread::sleep_for(duration
);
127 for (auto& thread
: threads
) {
132 template <template <typename
> class Atom
= std::atomic
>
133 void combineNThreads(int numThreads
, std::chrono::seconds duration
) {
134 auto&& mutex
= folly::detail::distributed_mutex::DistributedMutex
<Atom
>{};
135 auto&& barrier
= std::atomic
<int>{0};
136 auto&& threads
= std::vector
<std::thread
>{};
137 auto&& stop
= std::atomic
<bool>{false};
139 auto&& function
= [&]() {
141 auto&& expected
= std::uint64_t{0};
142 auto&& local
= std::atomic
<std::uint64_t>{0};
143 auto&& result
= std::atomic
<std::uint64_t>{0};
144 while (!stop
.load()) {
146 auto current
= mutex
.lock_combine([&]() {
148 EXPECT_EQ(barrier
.fetch_add(1, std::memory_order_relaxed
), 0);
149 EXPECT_EQ(barrier
.fetch_add(1, std::memory_order_relaxed
), 1);
150 std::this_thread::yield();
152 EXPECT_EQ(barrier
.fetch_sub(1, std::memory_order_relaxed
), 1);
154 EXPECT_EQ(barrier
.fetch_sub(1, std::memory_order_relaxed
), 2);
155 return local
.fetch_add(1);
157 EXPECT_EQ(current
, expected
- 1);
160 EXPECT_EQ(expected
, result
.load());
164 for (auto i
= 1; i
<= numThreads
; ++i
) {
165 threads
.push_back(std::thread(function()));
169 std::this_thread::sleep_for(duration
);
171 for (auto& thread
: threads
) {
176 template <template <typename
> class Atom
= std::atomic
>
177 void combineWithLockNThreads(int numThreads
, std::chrono::seconds duration
) {
178 auto&& mutex
= folly::detail::distributed_mutex::DistributedMutex
<Atom
>{};
179 auto&& barrier
= std::atomic
<int>{0};
180 auto&& threads
= std::vector
<std::thread
>{};
181 auto&& stop
= std::atomic
<bool>{false};
183 auto&& lockUnlockFunction
= [&]() {
184 while (!stop
.load()) {
185 auto lck
= std::unique_lock
<_t
<std::decay
<decltype(mutex
)>>>{mutex
};
186 EXPECT_EQ(barrier
.fetch_add(1, std::memory_order_relaxed
), 0);
187 std::this_thread::yield();
188 EXPECT_EQ(barrier
.fetch_sub(1, std::memory_order_relaxed
), 1);
192 auto&& combineFunction
= [&]() {
193 auto&& expected
= std::uint64_t{0};
194 auto&& total
= std::atomic
<std::uint64_t>{0};
196 while (!stop
.load()) {
198 auto current
= mutex
.lock_combine([&]() {
199 auto iteration
= total
.fetch_add(1);
200 EXPECT_EQ(barrier
.fetch_add(1, std::memory_order_relaxed
), 0);
201 EXPECT_EQ(barrier
.fetch_add(1, std::memory_order_relaxed
), 1);
202 std::this_thread::yield();
204 EXPECT_EQ(barrier
.fetch_sub(1, std::memory_order_relaxed
), 1);
206 EXPECT_EQ(barrier
.fetch_sub(1, std::memory_order_relaxed
), 2);
210 EXPECT_EQ(expected
, current
+ 1);
213 EXPECT_EQ(expected
, total
.load());
216 for (auto i
= 1; i
< (numThreads
/ 2); ++i
) {
217 threads
.push_back(std::thread(combineFunction
));
219 for (auto i
= 0; i
< (numThreads
/ 2); ++i
) {
220 threads
.push_back(std::thread(lockUnlockFunction
));
224 std::this_thread::sleep_for(duration
);
226 for (auto& thread
: threads
) {
231 template <template <typename
> class Atom
= std::atomic
>
232 void combineWithTryLockNThreads(int numThreads
, std::chrono::seconds duration
) {
233 auto&& mutex
= folly::detail::distributed_mutex::DistributedMutex
<Atom
>{};
234 auto&& barrier
= std::atomic
<int>{0};
235 auto&& threads
= std::vector
<std::thread
>{};
236 auto&& stop
= std::atomic
<bool>{false};
238 auto&& lockUnlockFunction
= [&]() {
239 while (!stop
.load()) {
240 auto lck
= std::unique_lock
<_t
<std::decay
<decltype(mutex
)>>>{mutex
};
241 EXPECT_EQ(barrier
.fetch_add(1, std::memory_order_relaxed
), 0);
242 std::this_thread::yield();
243 EXPECT_EQ(barrier
.fetch_sub(1, std::memory_order_relaxed
), 1);
247 auto&& combineFunction
= [&]() {
248 auto&& expected
= std::uint64_t{0};
249 auto&& total
= std::atomic
<std::uint64_t>{0};
251 while (!stop
.load()) {
253 auto current
= mutex
.lock_combine([&]() {
254 auto iteration
= total
.fetch_add(1);
255 EXPECT_EQ(barrier
.fetch_add(1, std::memory_order_relaxed
), 0);
256 EXPECT_EQ(barrier
.fetch_add(1, std::memory_order_relaxed
), 1);
257 std::this_thread::yield();
259 EXPECT_EQ(barrier
.fetch_sub(1, std::memory_order_relaxed
), 1);
261 EXPECT_EQ(barrier
.fetch_sub(1, std::memory_order_relaxed
), 2);
265 EXPECT_EQ(expected
, current
+ 1);
268 EXPECT_EQ(expected
, total
.load());
271 auto tryLockFunction
= [&]() {
272 while (!stop
.load()) {
273 using Mutex
= _t
<std::decay
<decltype(mutex
)>>;
274 auto lck
= std::unique_lock
<Mutex
>{mutex
, std::defer_lock
};
275 if (lck
.try_lock()) {
276 EXPECT_EQ(barrier
.fetch_add(1, std::memory_order_relaxed
), 0);
277 std::this_thread::yield();
278 EXPECT_EQ(barrier
.fetch_sub(1, std::memory_order_relaxed
), 1);
283 for (auto i
= 0; i
< (numThreads
/ 3); ++i
) {
284 threads
.push_back(std::thread(lockUnlockFunction
));
286 for (auto i
= 0; i
< (numThreads
/ 3); ++i
) {
287 threads
.push_back(std::thread(combineFunction
));
289 for (auto i
= 0; i
< (numThreads
/ 3); ++i
) {
290 threads
.push_back(std::thread(tryLockFunction
));
294 std::this_thread::sleep_for(duration
);
296 for (auto& thread
: threads
) {
301 template <template <typename
> class Atom
= std::atomic
>
302 void combineWithLockTryAndTimedNThreads(
304 std::chrono::seconds duration
) {
305 auto&& mutex
= folly::detail::distributed_mutex::DistributedMutex
<Atom
>{};
306 auto&& barrier
= std::atomic
<int>{0};
307 auto&& threads
= std::vector
<std::thread
>{};
308 auto&& stop
= std::atomic
<bool>{false};
310 auto&& lockUnlockFunction
= [&]() {
311 while (!stop
.load()) {
312 auto lck
= std::unique_lock
<_t
<std::decay
<decltype(mutex
)>>>{mutex
};
313 EXPECT_EQ(barrier
.fetch_add(1, std::memory_order_relaxed
), 0);
314 std::this_thread::yield();
315 EXPECT_EQ(barrier
.fetch_sub(1, std::memory_order_relaxed
), 1);
319 auto&& combineFunction
= [&]() {
320 auto&& expected
= std::uint64_t{0};
321 auto&& total
= std::atomic
<std::uint64_t>{0};
323 while (!stop
.load()) {
325 auto current
= mutex
.lock_combine([&]() {
326 auto iteration
= total
.fetch_add(1);
327 EXPECT_EQ(barrier
.fetch_add(1, std::memory_order_relaxed
), 0);
328 EXPECT_EQ(barrier
.fetch_add(1, std::memory_order_relaxed
), 1);
329 std::this_thread::yield();
331 EXPECT_EQ(barrier
.fetch_sub(1, std::memory_order_relaxed
), 1);
333 EXPECT_EQ(barrier
.fetch_sub(1, std::memory_order_relaxed
), 2);
335 // return a non-trivially-copyable object that occupies all the
336 // storage we use to coalesce returns to test that codepath
337 return folly::make_array(
346 EXPECT_EQ(expected
, current
[0] + 1);
347 EXPECT_EQ(expected
, current
[1]);
348 EXPECT_EQ(expected
, current
[2] - 1);
349 EXPECT_EQ(expected
, current
[3] - 2);
350 EXPECT_EQ(expected
, current
[4] - 3);
351 EXPECT_EQ(expected
, current
[5] - 4);
354 EXPECT_EQ(expected
, total
.load());
357 auto tryLockFunction
= [&]() {
358 while (!stop
.load()) {
359 using Mutex
= _t
<std::decay
<decltype(mutex
)>>;
360 auto lck
= std::unique_lock
<Mutex
>{mutex
, std::defer_lock
};
361 if (lck
.try_lock()) {
362 EXPECT_EQ(barrier
.fetch_add(1, std::memory_order_relaxed
), 0);
363 std::this_thread::yield();
364 EXPECT_EQ(barrier
.fetch_sub(1, std::memory_order_relaxed
), 1);
369 auto timedLockFunction
= [&]() {
370 while (!stop
.load()) {
371 using Mutex
= _t
<std::decay
<decltype(mutex
)>>;
372 auto lck
= std::unique_lock
<Mutex
>{mutex
, std::defer_lock
};
373 if (lck
.try_lock_for(kForever
)) {
374 EXPECT_EQ(barrier
.fetch_add(1, std::memory_order_relaxed
), 0);
375 std::this_thread::yield();
376 EXPECT_EQ(barrier
.fetch_sub(1, std::memory_order_relaxed
), 1);
381 for (auto i
= 0; i
< (numThreads
/ 4); ++i
) {
382 threads
.push_back(std::thread(lockUnlockFunction
));
384 for (auto i
= 0; i
< (numThreads
/ 4); ++i
) {
385 threads
.push_back(std::thread(combineFunction
));
387 for (auto i
= 0; i
< (numThreads
/ 4); ++i
) {
388 threads
.push_back(std::thread(tryLockFunction
));
390 for (auto i
= 0; i
< (numThreads
/ 4); ++i
) {
391 threads
.push_back(std::thread(timedLockFunction
));
395 std::this_thread::sleep_for(duration
);
397 for (auto& thread
: threads
) {
403 TEST(DistributedMutex
, InternalDetailTestOne
) {
405 auto ptr
= reinterpret_cast<std::uintptr_t>(&value
);
406 EXPECT_EQ(folly::detail::distributed_mutex::extractPtr
<int>(ptr
), &value
);
408 EXPECT_EQ(folly::detail::distributed_mutex::extractPtr
<int>(ptr
), &value
);
411 TEST(DistributedMutex
, Basic
) {
412 auto&& mutex
= folly::DistributedMutex
{};
413 auto state
= mutex
.lock();
414 mutex
.unlock(std::move(state
));
417 TEST(DistributedMutex
, BasicTryLock
) {
418 auto&& mutex
= folly::DistributedMutex
{};
421 auto state
= mutex
.try_lock();
423 mutex
.unlock(std::move(state
));
429 TEST(DistributedMutex
, StressTwoThreads
) {
432 TEST(DistributedMutex
, StressThreeThreads
) {
435 TEST(DistributedMutex
, StressFourThreads
) {
438 TEST(DistributedMutex
, StressFiveThreads
) {
441 TEST(DistributedMutex
, StressSixThreads
) {
444 TEST(DistributedMutex
, StressSevenThreads
) {
447 TEST(DistributedMutex
, StressEightThreads
) {
450 TEST(DistributedMutex
, StressSixteenThreads
) {
453 TEST(DistributedMutex
, StressThirtyTwoThreads
) {
456 TEST(DistributedMutex
, StressSixtyFourThreads
) {
459 TEST(DistributedMutex
, StressHundredThreads
) {
462 TEST(DistributedMutex
, StressHardwareConcurrencyThreads
) {
463 basicNThreads(std::thread::hardware_concurrency());
466 TEST(DistributedMutex
, StressThreeThreadsLockTryAndTimed
) {
467 lockWithTryAndTimedNThreads(3, std::chrono::seconds
{kStressTestSeconds
});
469 TEST(DistributedMutex
, StressSixThreadsLockTryAndTimed
) {
470 lockWithTryAndTimedNThreads(6, std::chrono::seconds
{kStressTestSeconds
});
472 TEST(DistributedMutex
, StressTwelveThreadsLockTryAndTimed
) {
473 lockWithTryAndTimedNThreads(12, std::chrono::seconds
{kStressTestSeconds
});
475 TEST(DistributedMutex
, StressTwentyFourThreadsLockTryAndTimed
) {
476 lockWithTryAndTimedNThreads(24, std::chrono::seconds
{kStressTestSeconds
});
478 TEST(DistributedMutex
, StressFourtyEightThreadsLockTryAndTimed
) {
479 lockWithTryAndTimedNThreads(48, std::chrono::seconds
{kStressTestSeconds
});
481 TEST(DistributedMutex
, StressSixtyFourThreadsLockTryAndTimed
) {
482 lockWithTryAndTimedNThreads(64, std::chrono::seconds
{kStressTestSeconds
});
484 TEST(DistributedMutex
, StressHwConcThreadsLockTryAndTimed
) {
485 lockWithTryAndTimedNThreads(
486 std::thread::hardware_concurrency(),
487 std::chrono::seconds
{kStressTestSeconds
});
490 TEST(DistributedMutex
, StressTwoThreadsCombine
) {
491 combineNThreads(2, std::chrono::seconds
{kStressTestSeconds
});
493 TEST(DistributedMutex
, StressThreeThreadsCombine
) {
494 combineNThreads(3, std::chrono::seconds
{kStressTestSeconds
});
496 TEST(DistributedMutex
, StressFourThreadsCombine
) {
497 combineNThreads(4, std::chrono::seconds
{kStressTestSeconds
});
499 TEST(DistributedMutex
, StressFiveThreadsCombine
) {
500 combineNThreads(5, std::chrono::seconds
{kStressTestSeconds
});
502 TEST(DistributedMutex
, StressSixThreadsCombine
) {
503 combineNThreads(6, std::chrono::seconds
{kStressTestSeconds
});
505 TEST(DistributedMutex
, StressSevenThreadsCombine
) {
506 combineNThreads(7, std::chrono::seconds
{kStressTestSeconds
});
508 TEST(DistributedMutex
, StressEightThreadsCombine
) {
509 combineNThreads(8, std::chrono::seconds
{kStressTestSeconds
});
511 TEST(DistributedMutex
, StressSixteenThreadsCombine
) {
512 combineNThreads(16, std::chrono::seconds
{kStressTestSeconds
});
514 TEST(DistributedMutex
, StressThirtyTwoThreadsCombine
) {
515 combineNThreads(32, std::chrono::seconds
{kStressTestSeconds
});
517 TEST(DistributedMutex
, StressSixtyFourThreadsCombine
) {
518 combineNThreads(64, std::chrono::seconds
{kStressTestSeconds
});
520 TEST(DistributedMutex
, StressHundredThreadsCombine
) {
521 combineNThreads(100, std::chrono::seconds
{kStressTestSeconds
});
523 TEST(DistributedMutex
, StressHardwareConcurrencyThreadsCombine
) {
525 std::thread::hardware_concurrency(),
526 std::chrono::seconds
{kStressTestSeconds
});
529 TEST(DistributedMutex
, StressTwoThreadsCombineAndLock
) {
530 combineWithLockNThreads(2, std::chrono::seconds
{kStressTestSeconds
});
532 TEST(DistributedMutex
, StressFourThreadsCombineAndLock
) {
533 combineWithLockNThreads(4, std::chrono::seconds
{kStressTestSeconds
});
535 TEST(DistributedMutex
, StressEightThreadsCombineAndLock
) {
536 combineWithLockNThreads(8, std::chrono::seconds
{kStressTestSeconds
});
538 TEST(DistributedMutex
, StressSixteenThreadsCombineAndLock
) {
539 combineWithLockNThreads(16, std::chrono::seconds
{kStressTestSeconds
});
541 TEST(DistributedMutex
, StressThirtyTwoThreadsCombineAndLock
) {
542 combineWithLockNThreads(32, std::chrono::seconds
{kStressTestSeconds
});
544 TEST(DistributedMutex
, StressSixtyFourThreadsCombineAndLock
) {
545 combineWithLockNThreads(64, std::chrono::seconds
{kStressTestSeconds
});
547 TEST(DistributedMutex
, StressHardwareConcurrencyThreadsCombineAndLock
) {
548 combineWithLockNThreads(
549 std::thread::hardware_concurrency(),
550 std::chrono::seconds
{kStressTestSeconds
});
553 TEST(DistributedMutex
, StressThreeThreadsCombineTryLockAndLock
) {
554 combineWithTryLockNThreads(3, std::chrono::seconds
{kStressTestSeconds
});
556 TEST(DistributedMutex
, StressSixThreadsCombineTryLockAndLock
) {
557 combineWithTryLockNThreads(6, std::chrono::seconds
{kStressTestSeconds
});
559 TEST(DistributedMutex
, StressTwelveThreadsCombineTryLockAndLock
) {
560 combineWithTryLockNThreads(12, std::chrono::seconds
{kStressTestSeconds
});
562 TEST(DistributedMutex
, StressTwentyFourThreadsCombineTryLockAndLock
) {
563 combineWithTryLockNThreads(24, std::chrono::seconds
{kStressTestSeconds
});
565 TEST(DistributedMutex
, StressFourtyEightThreadsCombineTryLockAndLock
) {
566 combineWithTryLockNThreads(48, std::chrono::seconds
{kStressTestSeconds
});
568 TEST(DistributedMutex
, StressSixtyFourThreadsCombineTryLockAndLock
) {
569 combineWithTryLockNThreads(64, std::chrono::seconds
{kStressTestSeconds
});
571 TEST(DistributedMutex
, StressHardwareConcurrencyThreadsCombineTryLockAndLock
) {
572 combineWithTryLockNThreads(
573 std::thread::hardware_concurrency(),
574 std::chrono::seconds
{kStressTestSeconds
});
577 TEST(DistributedMutex
, StressThreeThreadsCombineTryLockLockAndTimed
) {
578 combineWithLockTryAndTimedNThreads(
579 3, std::chrono::seconds
{kStressTestSeconds
});
581 TEST(DistributedMutex
, StressSixThreadsCombineTryLockLockAndTimed
) {
582 combineWithLockTryAndTimedNThreads(
583 6, std::chrono::seconds
{kStressTestSeconds
});
585 TEST(DistributedMutex
, StressTwelveThreadsCombineTryLockLockAndTimed
) {
586 combineWithLockTryAndTimedNThreads(
587 12, std::chrono::seconds
{kStressTestSeconds
});
589 TEST(DistributedMutex
, StressTwentyFourThreadsCombineTryLockLockAndTimed
) {
590 combineWithLockTryAndTimedNThreads(
591 24, std::chrono::seconds
{kStressTestSeconds
});
593 TEST(DistributedMutex
, StressFourtyEightThreadsCombineTryLockLockAndTimed
) {
594 combineWithLockTryAndTimedNThreads(
595 48, std::chrono::seconds
{kStressTestSeconds
});
597 TEST(DistributedMutex
, StressSixtyFourThreadsCombineTryLockLockAndTimed
) {
598 combineWithLockTryAndTimedNThreads(
599 64, std::chrono::seconds
{kStressTestSeconds
});
601 TEST(DistributedMutex
, StressHwConcurrencyThreadsCombineTryLockLockAndTimed
) {
602 combineWithLockTryAndTimedNThreads(
603 std::thread::hardware_concurrency(),
604 std::chrono::seconds
{kStressTestSeconds
});
607 TEST(DistributedMutex
, StressTryLock
) {
608 auto&& mutex
= folly::DistributedMutex
{};
610 for (auto i
= 0; i
< kStressFactor
; ++i
) {
612 auto state
= mutex
.try_lock();
614 mutex
.unlock(std::move(state
));
621 TEST(DistributedMutex
, TimedLockTimeout
) {
622 auto&& mutex
= folly::DistributedMutex
{};
623 auto&& start
= folly::Baton
<>{};
624 auto&& done
= folly::Baton
<>{};
626 auto thread
= std::thread
{[&]() {
627 auto state
= mutex
.lock();
630 mutex
.unlock(std::move(state
));
634 auto result
= mutex
.try_lock_for(std::chrono::milliseconds
{10});
635 EXPECT_FALSE(result
);
640 TEST(DistributedMutex
, TimedLockAcquireAfterUnlock
) {
641 auto&& mutex
= folly::DistributedMutex
{};
642 auto&& start
= folly::Baton
<>{};
644 auto thread
= std::thread
{[&]() {
645 auto state
= mutex
.lock();
648 std::this_thread::sleep_for(std::chrono::milliseconds
{10});
649 mutex
.unlock(std::move(state
));
653 auto result
= mutex
.try_lock_for(kForever
);
659 template <template <typename
> class Atom
= std::atomic
>
660 void stressTryLockWithConcurrentLocks(
662 int iterations
= kStressFactor
) {
663 auto&& threads
= std::vector
<std::thread
>{};
664 auto&& mutex
= folly::detail::distributed_mutex::DistributedMutex
<Atom
>{};
665 auto&& atomic
= std::atomic
<std::uint64_t>{0};
667 for (auto i
= 0; i
< numThreads
; ++i
) {
668 threads
.push_back(std::thread([&] {
669 for (auto j
= 0; j
< iterations
; ++j
) {
670 auto state
= mutex
.lock();
671 EXPECT_EQ(atomic
.fetch_add(1, std::memory_order_relaxed
), 0);
672 EXPECT_EQ(atomic
.fetch_sub(1, std::memory_order_relaxed
), 1);
673 mutex
.unlock(std::move(state
));
678 for (auto i
= 0; i
< iterations
; ++i
) {
679 if (auto state
= mutex
.try_lock()) {
680 EXPECT_EQ(atomic
.fetch_add(1, std::memory_order_relaxed
), 0);
681 EXPECT_EQ(atomic
.fetch_sub(1, std::memory_order_relaxed
), 1);
682 mutex
.unlock(std::move(state
));
686 for (auto& thread
: threads
) {
692 TEST(DistributedMutex
, StressTryLockWithConcurrentLocksTwoThreads
) {
693 stressTryLockWithConcurrentLocks(2);
695 TEST(DistributedMutex
, StressTryLockWithConcurrentLocksFourThreads
) {
696 stressTryLockWithConcurrentLocks(4);
698 TEST(DistributedMutex
, StressTryLockWithConcurrentLocksEightThreads
) {
699 stressTryLockWithConcurrentLocks(8);
701 TEST(DistributedMutex
, StressTryLockWithConcurrentLocksSixteenThreads
) {
702 stressTryLockWithConcurrentLocks(16);
704 TEST(DistributedMutex
, StressTryLockWithConcurrentLocksThirtyTwoThreads
) {
705 stressTryLockWithConcurrentLocks(32);
707 TEST(DistributedMutex
, StressTryLockWithConcurrentLocksSixtyFourThreads
) {
708 stressTryLockWithConcurrentLocks(64);
712 template <template <typename
> class Atom
= std::atomic
>
713 void concurrentTryLocks(int numThreads
, int iterations
= kStressFactor
) {
714 auto&& threads
= std::vector
<std::thread
>{};
715 auto&& mutex
= folly::detail::distributed_mutex::DistributedMutex
<Atom
>{};
716 auto&& atomic
= std::atomic
<std::uint64_t>{0};
718 for (auto i
= 0; i
< numThreads
; ++i
) {
719 threads
.push_back(std::thread([&] {
720 for (auto j
= 0; j
< iterations
; ++j
) {
721 if (auto state
= mutex
.try_lock()) {
722 EXPECT_EQ(atomic
.fetch_add(1, std::memory_order_relaxed
), 0);
723 EXPECT_EQ(atomic
.fetch_sub(1, std::memory_order_relaxed
), 1);
724 mutex
.unlock(std::move(state
));
730 for (auto& thread
: threads
) {
736 TEST(DistributedMutex
, StressTryLockWithTwoThreads
) {
737 concurrentTryLocks(2);
739 TEST(DistributedMutex
, StressTryLockFourThreads
) {
740 concurrentTryLocks(4);
742 TEST(DistributedMutex
, StressTryLockEightThreads
) {
743 concurrentTryLocks(8);
745 TEST(DistributedMutex
, StressTryLockSixteenThreads
) {
746 concurrentTryLocks(16);
748 TEST(DistributedMutex
, StressTryLockThirtyTwoThreads
) {
749 concurrentTryLocks(32);
751 TEST(DistributedMutex
, StressTryLockSixtyFourThreads
) {
752 concurrentTryLocks(64);
756 class TestConstruction
{
758 TestConstruction() = delete;
759 explicit TestConstruction(int) {
760 defaultConstructs().fetch_add(1, std::memory_order_relaxed
);
762 TestConstruction(TestConstruction
&&) noexcept
{
763 moveConstructs().fetch_add(1, std::memory_order_relaxed
);
765 TestConstruction(const TestConstruction
&) {
766 copyConstructs().fetch_add(1, std::memory_order_relaxed
);
768 TestConstruction
& operator=(const TestConstruction
&) {
769 copyAssigns().fetch_add(1, std::memory_order_relaxed
);
772 TestConstruction
& operator=(TestConstruction
&&) {
773 moveAssigns().fetch_add(1, std::memory_order_relaxed
);
776 ~TestConstruction() {
777 destructs().fetch_add(1, std::memory_order_relaxed
);
780 static std::atomic
<std::uint64_t>& defaultConstructs() {
781 static auto&& atomic
= std::atomic
<std::uint64_t>{0};
784 static std::atomic
<std::uint64_t>& moveConstructs() {
785 static auto&& atomic
= std::atomic
<std::uint64_t>{0};
788 static std::atomic
<std::uint64_t>& copyConstructs() {
789 static auto&& atomic
= std::atomic
<std::uint64_t>{0};
792 static std::atomic
<std::uint64_t>& moveAssigns() {
793 static auto&& atomic
= std::atomic
<std::uint64_t>{0};
796 static std::atomic
<std::uint64_t>& copyAssigns() {
797 static auto&& atomic
= std::atomic
<std::uint64_t>{0};
800 static std::atomic
<std::uint64_t>& destructs() {
801 static auto&& atomic
= std::atomic
<std::uint64_t>{0};
805 static void reset() {
806 defaultConstructs().store(0);
807 moveConstructs().store(0);
808 copyConstructs().store(0);
809 copyAssigns().store(0);
810 destructs().store(0);
815 TEST(DistributedMutex
, TestAppropriateDestructionAndConstructionWithCombine
) {
816 auto&& mutex
= folly::DistributedMutex
{};
817 auto&& stop
= std::atomic
<bool>{false};
819 // test the simple return path to make sure that in the absence of
820 // contention, we get the right number of constructs and destructs
821 mutex
.lock_combine([]() { return TestConstruction
{1}; });
822 auto moves
= TestConstruction::moveConstructs().load();
823 auto defaults
= TestConstruction::defaultConstructs().load();
824 EXPECT_EQ(TestConstruction::defaultConstructs().load(), 1);
825 EXPECT_TRUE(moves
== 0 || moves
== 1);
826 EXPECT_EQ(TestConstruction::destructs().load(), moves
+ defaults
);
828 // loop and make sure we were able to test the path where the critical
829 // section of the thread gets combined, and assert that we see the expected
830 // number of constructions and destructions
832 // this implements a timed backoff to test the combined path, so we use the
833 // smallest possible delay in tests
834 auto thread
= std::thread
{[&]() {
835 auto&& duration
= std::chrono::milliseconds
{10};
836 while (!stop
.load()) {
837 TestConstruction::reset();
838 auto&& ready
= folly::Baton
<>{};
839 auto&& release
= folly::Baton
<>{};
841 // make one thread start it's critical section, signal and wait for
842 // another thread to enqueue, to test the
843 auto innerThread
= std::thread
{[&]() {
844 mutex
.lock_combine([&]() {
848 std::this_thread::sleep_for(duration
);
852 // wait for the thread to get in its critical section, then tell it to go
855 mutex
.lock_combine([&]() { return TestConstruction
{1}; });
859 // at this point we should have only one default construct, either 3
860 // or 4 move constructs the same number of destructions as
862 auto innerDefaults
= TestConstruction::defaultConstructs().load();
863 auto innerMoves
= TestConstruction::moveConstructs().load();
864 auto destructs
= TestConstruction::destructs().load();
865 EXPECT_EQ(innerDefaults
, 1);
866 EXPECT_TRUE(innerMoves
== 3 || innerMoves
== 4 || innerMoves
== 1);
867 EXPECT_EQ(destructs
, innerMoves
+ innerDefaults
);
868 EXPECT_EQ(TestConstruction::moveAssigns().load(), 0);
869 EXPECT_EQ(TestConstruction::copyAssigns().load(), 0);
871 // increase duration by 100ms each iteration
872 duration
= duration
+ std::chrono::milliseconds
{100};
877 std::this_thread::sleep_for(std::chrono::seconds
{kStressTestSeconds
});
883 template <template <typename
> class Atom
= std::atomic
>
884 void concurrentLocksManyMutexes(int numThreads
, std::chrono::seconds duration
) {
885 using DMutex
= folly::detail::distributed_mutex::DistributedMutex
<Atom
>;
886 const auto&& kNumMutexes
= 10;
887 auto&& threads
= std::vector
<std::thread
>{};
888 auto&& mutexes
= std::vector
<DMutex
>(kNumMutexes
);
889 auto&& barriers
= std::vector
<std::atomic
<std::uint64_t>>(kNumMutexes
);
890 auto&& stop
= std::atomic
<bool>{false};
892 for (auto i
= 0; i
< numThreads
; ++i
) {
893 threads
.push_back(std::thread([&] {
894 auto&& total
= std::atomic
<std::uint64_t>{0};
895 auto&& expected
= std::uint64_t{0};
897 for (auto j
= 0; !stop
.load(std::memory_order_relaxed
); ++j
) {
898 auto& mutex
= mutexes
[j
% kNumMutexes
];
899 auto& barrier
= barriers
[j
% kNumMutexes
];
902 auto result
= mutex
.lock_combine([&]() {
903 EXPECT_EQ(barrier
.fetch_add(1, std::memory_order_relaxed
), 0);
904 EXPECT_EQ(barrier
.fetch_add(1, std::memory_order_relaxed
), 1);
905 std::this_thread::yield();
907 EXPECT_EQ(barrier
.fetch_sub(1, std::memory_order_relaxed
), 1);
909 EXPECT_EQ(barrier
.fetch_sub(1, std::memory_order_relaxed
), 2);
910 return total
.fetch_add(1, std::memory_order_relaxed
);
912 EXPECT_EQ(result
, expected
- 1);
915 EXPECT_EQ(total
.load(), expected
);
920 std::this_thread::sleep_for(duration
);
922 for (auto& thread
: threads
) {
928 TEST(DistributedMutex
, StressWithManyMutexesAlternatingTwoThreads
) {
929 concurrentLocksManyMutexes(2, std::chrono::seconds
{kStressTestSeconds
});
931 TEST(DistributedMutex
, StressWithManyMutexesAlternatingFourThreads
) {
932 concurrentLocksManyMutexes(4, std::chrono::seconds
{kStressTestSeconds
});
934 TEST(DistributedMutex
, StressWithManyMutexesAlternatingEightThreads
) {
935 concurrentLocksManyMutexes(8, std::chrono::seconds
{kStressTestSeconds
});
937 TEST(DistributedMutex
, StressWithManyMutexesAlternatingSixteenThreads
) {
938 concurrentLocksManyMutexes(16, std::chrono::seconds
{kStressTestSeconds
});
940 TEST(DistributedMutex
, StressWithManyMutexesAlternatingThirtyTwoThreads
) {
941 concurrentLocksManyMutexes(32, std::chrono::seconds
{kStressTestSeconds
});
943 TEST(DistributedMutex
, StressWithManyMutexesAlternatingSixtyFourThreads
) {
944 concurrentLocksManyMutexes(64, std::chrono::seconds
{kStressTestSeconds
});
948 class ExceptionWithConstructionTrack
: public std::exception
{
950 explicit ExceptionWithConstructionTrack(int id
)
951 : id_
{std::to_string(id
)}, constructionTrack_
{id
} {}
953 const char* what() const noexcept override
{
959 TestConstruction constructionTrack_
;
963 TEST(DistributedMutex
, TestExceptionPropagationUncontended
) {
964 TestConstruction::reset();
965 auto&& mutex
= folly::DistributedMutex
{};
966 auto&& thread
= std::thread
{[&]() {
968 mutex
.lock_combine([&]() { throw ExceptionWithConstructionTrack
{46}; });
969 } catch (std::exception
& exc
) {
970 auto integer
= std::stoi(exc
.what());
971 EXPECT_EQ(integer
, 46);
972 EXPECT_GT(TestConstruction::defaultConstructs(), 0);
975 TestConstruction::defaultConstructs(), TestConstruction::destructs());
981 template <template <typename
> class Atom
= std::atomic
>
982 void concurrentExceptionPropagationStress(
984 std::chrono::milliseconds t
) {
985 // this test fails under with a false negative under older versions of TSAN
986 // for some reason so disable it when TSAN is enabled
987 if (folly::kIsSanitizeThread
) {
991 TestConstruction::reset();
992 auto&& mutex
= folly::detail::distributed_mutex::DistributedMutex
<Atom
>{};
993 auto&& threads
= std::vector
<std::thread
>{};
994 auto&& stop
= std::atomic
<bool>{false};
995 auto&& barrier
= std::atomic
<std::uint64_t>{0};
997 for (auto i
= 0; i
< numThreads
; ++i
) {
998 threads
.push_back(std::thread([&]() {
999 for (auto j
= 0; !stop
.load(); ++j
) {
1000 auto value
= int{0};
1002 value
= mutex
.lock_combine([&]() {
1003 EXPECT_EQ(barrier
.fetch_add(1, std::memory_order_relaxed
), 0);
1004 EXPECT_EQ(barrier
.fetch_add(1, std::memory_order_relaxed
), 1);
1005 std::this_thread::yield();
1007 EXPECT_EQ(barrier
.fetch_sub(1, std::memory_order_relaxed
), 1);
1009 EXPECT_EQ(barrier
.fetch_sub(1, std::memory_order_relaxed
), 2);
1011 // we only throw an exception once every 3 times
1013 throw ExceptionWithConstructionTrack
{j
};
1018 } catch (std::exception
& exc
) {
1019 value
= std::stoi(exc
.what());
1022 EXPECT_EQ(value
, j
);
1027 /* sleep override */
1028 std::this_thread::sleep_for(t
);
1030 for (auto& thread
: threads
) {
1036 TEST(DistributedMutex
, TestExceptionPropagationStressTwoThreads
) {
1037 concurrentExceptionPropagationStress(
1038 2, std::chrono::seconds
{kStressTestSeconds
});
1040 TEST(DistributedMutex
, TestExceptionPropagationStressFourThreads
) {
1041 concurrentExceptionPropagationStress(
1042 4, std::chrono::seconds
{kStressTestSeconds
});
1044 TEST(DistributedMutex
, TestExceptionPropagationStressEightThreads
) {
1045 concurrentExceptionPropagationStress(
1046 8, std::chrono::seconds
{kStressTestSeconds
});
1048 TEST(DistributedMutex
, TestExceptionPropagationStressSixteenThreads
) {
1049 concurrentExceptionPropagationStress(
1050 16, std::chrono::seconds
{kStressTestSeconds
});
1052 TEST(DistributedMutex
, TestExceptionPropagationStressThirtyTwoThreads
) {
1053 concurrentExceptionPropagationStress(
1054 32, std::chrono::seconds
{kStressTestSeconds
});
1056 TEST(DistributedMutex
, TestExceptionPropagationStressSixtyFourThreads
) {
1057 concurrentExceptionPropagationStress(
1058 64, std::chrono::seconds
{kStressTestSeconds
});
1062 std::array
<std::uint64_t, 8> makeMonotonicArray(int start
) {
1063 auto array
= std::array
<std::uint64_t, 8>{};
1064 for (auto& element
: array
) { element
= start
++; }
1068 template <template <typename
> class Atom
= std::atomic
>
1069 void concurrentBigValueReturnStress(
1071 std::chrono::milliseconds t
) {
1072 auto&& mutex
= folly::detail::distributed_mutex::DistributedMutex
<Atom
>{};
1073 auto&& threads
= std::vector
<std::thread
>{};
1074 auto&& stop
= std::atomic
<bool>{false};
1075 auto&& barrier
= std::atomic
<std::uint64_t>{0};
1077 for (auto i
= 0; i
< numThreads
; ++i
) {
1078 threads
.push_back(std::thread([&]() {
1079 auto&& value
= std::atomic
<std::uint64_t>{0};
1081 for (auto j
= 0; !stop
.load(); ++j
) {
1082 auto returned
= mutex
.lock_combine([&]() {
1083 EXPECT_EQ(barrier
.fetch_add(1, std::memory_order_relaxed
), 0);
1084 EXPECT_EQ(barrier
.fetch_add(1, std::memory_order_relaxed
), 1);
1085 std::this_thread::yield();
1086 // return an entire cacheline worth of data
1087 auto current
= value
.fetch_add(1, std::memory_order_relaxed
);
1089 EXPECT_EQ(barrier
.fetch_sub(1, std::memory_order_relaxed
), 1);
1091 EXPECT_EQ(barrier
.fetch_sub(1, std::memory_order_relaxed
), 2);
1092 return makeMonotonicArray(static_cast<int>(current
));
1095 auto expected
= value
.load() - 1;
1096 for (auto& element
: returned
) {
1097 EXPECT_EQ(element
, expected
++);
1103 /* sleep override */
1104 std::this_thread::sleep_for(t
);
1106 for (auto& thread
: threads
) {
1112 TEST(DistributedMutex
, StressBigValueReturnTwoThreads
) {
1113 concurrentBigValueReturnStress(2, std::chrono::seconds
{kStressTestSeconds
});
1115 TEST(DistributedMutex
, StressBigValueReturnFourThreads
) {
1116 concurrentBigValueReturnStress(4, std::chrono::seconds
{kStressTestSeconds
});
1118 TEST(DistributedMutex
, StressBigValueReturnEightThreads
) {
1119 concurrentBigValueReturnStress(8, std::chrono::seconds
{kStressTestSeconds
});
1121 TEST(DistributedMutex
, StressBigValueReturnSixteenThreads
) {
1122 concurrentBigValueReturnStress(16, std::chrono::seconds
{kStressTestSeconds
});
1124 TEST(DistributedMutex
, StressBigValueReturnThirtyTwoThreads
) {
1125 concurrentBigValueReturnStress(32, std::chrono::seconds
{kStressTestSeconds
});
1127 TEST(DistributedMutex
, StressBigValueReturnSixtyFourThreads
) {
1128 concurrentBigValueReturnStress(64, std::chrono::seconds
{kStressTestSeconds
});
1131 } // namespace folly
1133 int main(int argc
, char** argv
) {
1134 ::testing::InitGoogleTest(&argc
, argv
);
1135 return RUN_ALL_TESTS();
1139 int main(int /*argc*/, char** /*argv*/) {
1141 "DistributedMutex is not supported in ROCKSDB_LITE, on ARM, or in "
1142 "valgrind_test runs\n");
1145 #endif // !ROCKSDB_LITE && !__ARM_ARCH && !ROCKSDB_VALGRIND_RUN