1 // Copyright (c) 2020-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #include "utilities/transactions/lock/point/point_lock_manager.h"
10 #include "file/file_util.h"
11 #include "port/port.h"
12 #include "port/stack_trace.h"
13 #include "rocksdb/utilities/transaction_db.h"
14 #include "test_util/testharness.h"
15 #include "test_util/testutil.h"
16 #include "utilities/transactions/pessimistic_transaction_db.h"
17 #include "utilities/transactions/transaction_db_mutex_impl.h"
19 namespace ROCKSDB_NAMESPACE
{
21 class MockColumnFamilyHandle
: public ColumnFamilyHandle
{
23 explicit MockColumnFamilyHandle(ColumnFamilyId cf_id
) : cf_id_(cf_id
) {}
25 ~MockColumnFamilyHandle() override
{}
27 const std::string
& GetName() const override
{ return name_
; }
29 ColumnFamilyId
GetID() const override
{ return cf_id_
; }
31 Status
GetDescriptor(ColumnFamilyDescriptor
*) override
{
35 const Comparator
* GetComparator() const override
{ return nullptr; }
38 ColumnFamilyId cf_id_
;
39 std::string name_
= "MockCF";
42 class PointLockManagerTest
: public testing::Test
{
44 void SetUp() override
{
45 env_
= Env::Default();
46 db_dir_
= test::PerThreadDBPath("point_lock_manager_test");
47 ASSERT_OK(env_
->CreateDir(db_dir_
));
48 mutex_factory_
= std::make_shared
<TransactionDBMutexFactoryImpl
>();
51 opt
.create_if_missing
= true;
52 TransactionDBOptions txn_opt
;
53 txn_opt
.transaction_lock_timeout
= 0;
54 txn_opt
.custom_mutex_factory
= mutex_factory_
;
55 ASSERT_OK(TransactionDB::Open(opt
, txn_opt
, db_dir_
, &db_
));
57 locker_
.reset(new PointLockManager(
58 static_cast<PessimisticTransactionDB
*>(db_
), txn_opt
));
61 void TearDown() override
{
63 EXPECT_OK(DestroyDir(env_
, db_dir_
));
66 PessimisticTransaction
* NewTxn(
67 TransactionOptions txn_opt
= TransactionOptions()) {
68 Transaction
* txn
= db_
->BeginTransaction(WriteOptions(), txn_opt
);
69 return reinterpret_cast<PessimisticTransaction
*>(txn
);
74 std::unique_ptr
<PointLockManager
> locker_
;
78 std::shared_ptr
<TransactionDBMutexFactory
> mutex_factory_
;
82 TEST_F(PointLockManagerTest
, LockNonExistingColumnFamily
) {
83 MockColumnFamilyHandle
cf(1024);
84 locker_
->RemoveColumnFamily(&cf
);
86 auto s
= locker_
->TryLock(txn
, 1024, "k", env_
, true);
87 ASSERT_TRUE(s
.IsInvalidArgument());
88 ASSERT_STREQ(s
.getState(), "Column family id not found: 1024");
92 TEST_F(PointLockManagerTest
, LockStatus
) {
93 MockColumnFamilyHandle
cf1(1024), cf2(2048);
94 locker_
->AddColumnFamily(&cf1
);
95 locker_
->AddColumnFamily(&cf2
);
98 ASSERT_OK(locker_
->TryLock(txn1
, 1024, "k1", env_
, true));
99 ASSERT_OK(locker_
->TryLock(txn1
, 2048, "k1", env_
, true));
101 auto txn2
= NewTxn();
102 ASSERT_OK(locker_
->TryLock(txn2
, 1024, "k2", env_
, false));
103 ASSERT_OK(locker_
->TryLock(txn2
, 2048, "k2", env_
, false));
105 auto s
= locker_
->GetPointLockStatus();
106 ASSERT_EQ(s
.size(), 4u);
107 for (uint32_t cf_id
: {1024, 2048}) {
108 ASSERT_EQ(s
.count(cf_id
), 2u);
109 auto range
= s
.equal_range(cf_id
);
110 for (auto it
= range
.first
; it
!= range
.second
; it
++) {
111 ASSERT_TRUE(it
->second
.key
== "k1" || it
->second
.key
== "k2");
112 if (it
->second
.key
== "k1") {
113 ASSERT_EQ(it
->second
.exclusive
, true);
114 ASSERT_EQ(it
->second
.ids
.size(), 1u);
115 ASSERT_EQ(it
->second
.ids
[0], txn1
->GetID());
116 } else if (it
->second
.key
== "k2") {
117 ASSERT_EQ(it
->second
.exclusive
, false);
118 ASSERT_EQ(it
->second
.ids
.size(), 1u);
119 ASSERT_EQ(it
->second
.ids
[0], txn2
->GetID());
128 TEST_F(PointLockManagerTest
, UnlockExclusive
) {
129 MockColumnFamilyHandle
cf(1);
130 locker_
->AddColumnFamily(&cf
);
132 auto txn1
= NewTxn();
133 ASSERT_OK(locker_
->TryLock(txn1
, 1, "k", env_
, true));
134 locker_
->UnLock(txn1
, 1, "k", env_
);
136 auto txn2
= NewTxn();
137 ASSERT_OK(locker_
->TryLock(txn2
, 1, "k", env_
, true));
143 TEST_F(PointLockManagerTest
, UnlockShared
) {
144 MockColumnFamilyHandle
cf(1);
145 locker_
->AddColumnFamily(&cf
);
147 auto txn1
= NewTxn();
148 ASSERT_OK(locker_
->TryLock(txn1
, 1, "k", env_
, false));
149 locker_
->UnLock(txn1
, 1, "k", env_
);
151 auto txn2
= NewTxn();
152 ASSERT_OK(locker_
->TryLock(txn2
, 1, "k", env_
, true));
158 TEST_F(PointLockManagerTest
, ReentrantExclusiveLock
) {
159 // Tests that a txn can acquire exclusive lock on the same key repeatedly.
160 MockColumnFamilyHandle
cf(1);
161 locker_
->AddColumnFamily(&cf
);
163 ASSERT_OK(locker_
->TryLock(txn
, 1, "k", env_
, true));
164 ASSERT_OK(locker_
->TryLock(txn
, 1, "k", env_
, true));
168 TEST_F(PointLockManagerTest
, ReentrantSharedLock
) {
169 // Tests that a txn can acquire shared lock on the same key repeatedly.
170 MockColumnFamilyHandle
cf(1);
171 locker_
->AddColumnFamily(&cf
);
173 ASSERT_OK(locker_
->TryLock(txn
, 1, "k", env_
, false));
174 ASSERT_OK(locker_
->TryLock(txn
, 1, "k", env_
, false));
178 TEST_F(PointLockManagerTest
, LockUpgrade
) {
179 // Tests that a txn can upgrade from a shared lock to an exclusive lock.
180 MockColumnFamilyHandle
cf(1);
181 locker_
->AddColumnFamily(&cf
);
183 ASSERT_OK(locker_
->TryLock(txn
, 1, "k", env_
, false));
184 ASSERT_OK(locker_
->TryLock(txn
, 1, "k", env_
, true));
188 TEST_F(PointLockManagerTest
, LockDowngrade
) {
189 // Tests that a txn can acquire a shared lock after acquiring an exclusive
190 // lock on the same key.
191 MockColumnFamilyHandle
cf(1);
192 locker_
->AddColumnFamily(&cf
);
194 ASSERT_OK(locker_
->TryLock(txn
, 1, "k", env_
, true));
195 ASSERT_OK(locker_
->TryLock(txn
, 1, "k", env_
, false));
199 TEST_F(PointLockManagerTest
, LockConflict
) {
200 // Tests that lock conflicts lead to lock timeout.
201 MockColumnFamilyHandle
cf(1);
202 locker_
->AddColumnFamily(&cf
);
203 auto txn1
= NewTxn();
204 auto txn2
= NewTxn();
207 // exclusive-exclusive conflict.
208 ASSERT_OK(locker_
->TryLock(txn1
, 1, "k1", env_
, true));
209 auto s
= locker_
->TryLock(txn2
, 1, "k1", env_
, true);
210 ASSERT_TRUE(s
.IsTimedOut());
214 // exclusive-shared conflict.
215 ASSERT_OK(locker_
->TryLock(txn1
, 1, "k2", env_
, true));
216 auto s
= locker_
->TryLock(txn2
, 1, "k2", env_
, false);
217 ASSERT_TRUE(s
.IsTimedOut());
221 // shared-exclusive conflict.
222 ASSERT_OK(locker_
->TryLock(txn1
, 1, "k2", env_
, false));
223 auto s
= locker_
->TryLock(txn2
, 1, "k2", env_
, true);
224 ASSERT_TRUE(s
.IsTimedOut());
231 port::Thread
BlockUntilWaitingTxn(std::function
<void()> f
) {
232 std::atomic
<bool> reached(false);
233 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
234 "PointLockManager::AcquireWithTimeout:WaitingTxn",
235 [&](void* /*arg*/) { reached
.store(true); });
236 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
240 while (!reached
.load()) {
241 std::this_thread::sleep_for(std::chrono::milliseconds(100));
243 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
244 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
249 TEST_F(PointLockManagerTest
, SharedLocks
) {
250 // Tests that shared locks can be concurrently held by multiple transactions.
251 MockColumnFamilyHandle
cf(1);
252 locker_
->AddColumnFamily(&cf
);
253 auto txn1
= NewTxn();
254 auto txn2
= NewTxn();
255 ASSERT_OK(locker_
->TryLock(txn1
, 1, "k", env_
, false));
256 ASSERT_OK(locker_
->TryLock(txn2
, 1, "k", env_
, false));
261 TEST_F(PointLockManagerTest
, Deadlock
) {
262 // Tests that deadlock can be detected.
263 // Deadlock scenario:
264 // txn1 exclusively locks k1, and wants to lock k2;
265 // txn2 exclusively locks k2, and wants to lock k1.
266 MockColumnFamilyHandle
cf(1);
267 locker_
->AddColumnFamily(&cf
);
268 TransactionOptions txn_opt
;
269 txn_opt
.deadlock_detect
= true;
270 txn_opt
.lock_timeout
= 1000000;
271 auto txn1
= NewTxn(txn_opt
);
272 auto txn2
= NewTxn(txn_opt
);
274 ASSERT_OK(locker_
->TryLock(txn1
, 1, "k1", env_
, true));
275 ASSERT_OK(locker_
->TryLock(txn2
, 1, "k2", env_
, true));
277 // txn1 tries to lock k2, will block forever.
278 port::Thread t
= BlockUntilWaitingTxn([&]() {
279 // block because txn2 is holding a lock on k2.
280 locker_
->TryLock(txn1
, 1, "k2", env_
, true);
283 auto s
= locker_
->TryLock(txn2
, 1, "k1", env_
, true);
284 ASSERT_TRUE(s
.IsBusy());
285 ASSERT_EQ(s
.subcode(), Status::SubCode::kDeadlock
);
287 std::vector
<DeadlockPath
> deadlock_paths
= locker_
->GetDeadlockInfoBuffer();
288 ASSERT_EQ(deadlock_paths
.size(), 1u);
289 ASSERT_FALSE(deadlock_paths
[0].limit_exceeded
);
291 std::vector
<DeadlockInfo
> deadlocks
= deadlock_paths
[0].path
;
292 ASSERT_EQ(deadlocks
.size(), 2u);
294 ASSERT_EQ(deadlocks
[0].m_txn_id
, txn1
->GetID());
295 ASSERT_EQ(deadlocks
[0].m_cf_id
, 1u);
296 ASSERT_TRUE(deadlocks
[0].m_exclusive
);
297 ASSERT_EQ(deadlocks
[0].m_waiting_key
, "k2");
299 ASSERT_EQ(deadlocks
[1].m_txn_id
, txn2
->GetID());
300 ASSERT_EQ(deadlocks
[1].m_cf_id
, 1u);
301 ASSERT_TRUE(deadlocks
[1].m_exclusive
);
302 ASSERT_EQ(deadlocks
[1].m_waiting_key
, "k1");
304 locker_
->UnLock(txn2
, 1, "k2", env_
);
311 TEST_F(PointLockManagerTest
, DeadlockDepthExceeded
) {
312 // Tests that when detecting deadlock, if the detection depth is exceeded,
313 // it's also viewed as deadlock.
314 MockColumnFamilyHandle
cf(1);
315 locker_
->AddColumnFamily(&cf
);
316 TransactionOptions txn_opt
;
317 txn_opt
.deadlock_detect
= true;
318 txn_opt
.deadlock_detect_depth
= 1;
319 txn_opt
.lock_timeout
= 1000000;
320 auto txn1
= NewTxn(txn_opt
);
321 auto txn2
= NewTxn(txn_opt
);
322 auto txn3
= NewTxn(txn_opt
);
323 auto txn4
= NewTxn(txn_opt
);
324 // "a ->(k) b" means transaction a is waiting for transaction b to release
325 // the held lock on key k.
326 // txn4 ->(k3) -> txn3 ->(k2) txn2 ->(k1) txn1
327 // txn3's deadlock detection will exceed the detection depth 1,
328 // which will be viewed as a deadlock.
330 // txn4 ->(k3) -> txn3 must be set up before
331 // txn3 ->(k2) -> txn2, because to trigger deadlock detection for txn3,
332 // it must have another txn waiting on it, which is txn4 in this case.
333 ASSERT_OK(locker_
->TryLock(txn1
, 1, "k1", env_
, true));
335 port::Thread t1
= BlockUntilWaitingTxn([&]() {
336 ASSERT_OK(locker_
->TryLock(txn2
, 1, "k2", env_
, true));
337 // block because txn1 is holding a lock on k1.
338 locker_
->TryLock(txn2
, 1, "k1", env_
, true);
341 ASSERT_OK(locker_
->TryLock(txn3
, 1, "k3", env_
, true));
343 port::Thread t2
= BlockUntilWaitingTxn([&]() {
344 // block because txn3 is holding a lock on k1.
345 locker_
->TryLock(txn4
, 1, "k3", env_
, true);
348 auto s
= locker_
->TryLock(txn3
, 1, "k2", env_
, true);
349 ASSERT_TRUE(s
.IsBusy());
350 ASSERT_EQ(s
.subcode(), Status::SubCode::kDeadlock
);
352 std::vector
<DeadlockPath
> deadlock_paths
= locker_
->GetDeadlockInfoBuffer();
353 ASSERT_EQ(deadlock_paths
.size(), 1u);
354 ASSERT_TRUE(deadlock_paths
[0].limit_exceeded
);
356 locker_
->UnLock(txn1
, 1, "k1", env_
);
357 locker_
->UnLock(txn3
, 1, "k3", env_
);
367 } // namespace ROCKSDB_NAMESPACE
369 int main(int argc
, char** argv
) {
370 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
371 ::testing::InitGoogleTest(&argc
, argv
);
372 return RUN_ALL_TESTS();
378 int main(int /*argc*/, char** /*argv*/) {
380 "SKIPPED because Transactions are not supported in ROCKSDB_LITE\n");
384 #endif // ROCKSDB_LITE