]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/transactions/lock/point/point_lock_manager_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / utilities / transactions / lock / point / point_lock_manager_test.cc
1 // Copyright (c) 2020-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "utilities/transactions/lock/point/point_lock_manager.h"
9
10 #include "file/file_util.h"
11 #include "port/port.h"
12 #include "port/stack_trace.h"
13 #include "rocksdb/utilities/transaction_db.h"
14 #include "test_util/testharness.h"
15 #include "test_util/testutil.h"
16 #include "utilities/transactions/pessimistic_transaction_db.h"
17 #include "utilities/transactions/transaction_db_mutex_impl.h"
18
19 namespace ROCKSDB_NAMESPACE {
20
21 class MockColumnFamilyHandle : public ColumnFamilyHandle {
22 public:
23 explicit MockColumnFamilyHandle(ColumnFamilyId cf_id) : cf_id_(cf_id) {}
24
25 ~MockColumnFamilyHandle() override {}
26
27 const std::string& GetName() const override { return name_; }
28
29 ColumnFamilyId GetID() const override { return cf_id_; }
30
31 Status GetDescriptor(ColumnFamilyDescriptor*) override {
32 return Status::OK();
33 }
34
35 const Comparator* GetComparator() const override { return nullptr; }
36
37 private:
38 ColumnFamilyId cf_id_;
39 std::string name_ = "MockCF";
40 };
41
42 class PointLockManagerTest : public testing::Test {
43 public:
44 void SetUp() override {
45 env_ = Env::Default();
46 db_dir_ = test::PerThreadDBPath("point_lock_manager_test");
47 ASSERT_OK(env_->CreateDir(db_dir_));
48 mutex_factory_ = std::make_shared<TransactionDBMutexFactoryImpl>();
49
50 Options opt;
51 opt.create_if_missing = true;
52 TransactionDBOptions txn_opt;
53 txn_opt.transaction_lock_timeout = 0;
54 txn_opt.custom_mutex_factory = mutex_factory_;
55 ASSERT_OK(TransactionDB::Open(opt, txn_opt, db_dir_, &db_));
56
57 locker_.reset(new PointLockManager(
58 static_cast<PessimisticTransactionDB*>(db_), txn_opt));
59 }
60
61 void TearDown() override {
62 delete db_;
63 EXPECT_OK(DestroyDir(env_, db_dir_));
64 }
65
66 PessimisticTransaction* NewTxn(
67 TransactionOptions txn_opt = TransactionOptions()) {
68 Transaction* txn = db_->BeginTransaction(WriteOptions(), txn_opt);
69 return reinterpret_cast<PessimisticTransaction*>(txn);
70 }
71
72 protected:
73 Env* env_;
74 std::unique_ptr<PointLockManager> locker_;
75
76 private:
77 std::string db_dir_;
78 std::shared_ptr<TransactionDBMutexFactory> mutex_factory_;
79 TransactionDB* db_;
80 };
81
82 TEST_F(PointLockManagerTest, LockNonExistingColumnFamily) {
83 MockColumnFamilyHandle cf(1024);
84 locker_->RemoveColumnFamily(&cf);
85 auto txn = NewTxn();
86 auto s = locker_->TryLock(txn, 1024, "k", env_, true);
87 ASSERT_TRUE(s.IsInvalidArgument());
88 ASSERT_STREQ(s.getState(), "Column family id not found: 1024");
89 delete txn;
90 }
91
92 TEST_F(PointLockManagerTest, LockStatus) {
93 MockColumnFamilyHandle cf1(1024), cf2(2048);
94 locker_->AddColumnFamily(&cf1);
95 locker_->AddColumnFamily(&cf2);
96
97 auto txn1 = NewTxn();
98 ASSERT_OK(locker_->TryLock(txn1, 1024, "k1", env_, true));
99 ASSERT_OK(locker_->TryLock(txn1, 2048, "k1", env_, true));
100
101 auto txn2 = NewTxn();
102 ASSERT_OK(locker_->TryLock(txn2, 1024, "k2", env_, false));
103 ASSERT_OK(locker_->TryLock(txn2, 2048, "k2", env_, false));
104
105 auto s = locker_->GetPointLockStatus();
106 ASSERT_EQ(s.size(), 4u);
107 for (uint32_t cf_id : {1024, 2048}) {
108 ASSERT_EQ(s.count(cf_id), 2u);
109 auto range = s.equal_range(cf_id);
110 for (auto it = range.first; it != range.second; it++) {
111 ASSERT_TRUE(it->second.key == "k1" || it->second.key == "k2");
112 if (it->second.key == "k1") {
113 ASSERT_EQ(it->second.exclusive, true);
114 ASSERT_EQ(it->second.ids.size(), 1u);
115 ASSERT_EQ(it->second.ids[0], txn1->GetID());
116 } else if (it->second.key == "k2") {
117 ASSERT_EQ(it->second.exclusive, false);
118 ASSERT_EQ(it->second.ids.size(), 1u);
119 ASSERT_EQ(it->second.ids[0], txn2->GetID());
120 }
121 }
122 }
123
124 delete txn1;
125 delete txn2;
126 }
127
128 TEST_F(PointLockManagerTest, UnlockExclusive) {
129 MockColumnFamilyHandle cf(1);
130 locker_->AddColumnFamily(&cf);
131
132 auto txn1 = NewTxn();
133 ASSERT_OK(locker_->TryLock(txn1, 1, "k", env_, true));
134 locker_->UnLock(txn1, 1, "k", env_);
135
136 auto txn2 = NewTxn();
137 ASSERT_OK(locker_->TryLock(txn2, 1, "k", env_, true));
138
139 delete txn1;
140 delete txn2;
141 }
142
143 TEST_F(PointLockManagerTest, UnlockShared) {
144 MockColumnFamilyHandle cf(1);
145 locker_->AddColumnFamily(&cf);
146
147 auto txn1 = NewTxn();
148 ASSERT_OK(locker_->TryLock(txn1, 1, "k", env_, false));
149 locker_->UnLock(txn1, 1, "k", env_);
150
151 auto txn2 = NewTxn();
152 ASSERT_OK(locker_->TryLock(txn2, 1, "k", env_, true));
153
154 delete txn1;
155 delete txn2;
156 }
157
158 TEST_F(PointLockManagerTest, ReentrantExclusiveLock) {
159 // Tests that a txn can acquire exclusive lock on the same key repeatedly.
160 MockColumnFamilyHandle cf(1);
161 locker_->AddColumnFamily(&cf);
162 auto txn = NewTxn();
163 ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true));
164 ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true));
165 delete txn;
166 }
167
168 TEST_F(PointLockManagerTest, ReentrantSharedLock) {
169 // Tests that a txn can acquire shared lock on the same key repeatedly.
170 MockColumnFamilyHandle cf(1);
171 locker_->AddColumnFamily(&cf);
172 auto txn = NewTxn();
173 ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false));
174 ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false));
175 delete txn;
176 }
177
178 TEST_F(PointLockManagerTest, LockUpgrade) {
179 // Tests that a txn can upgrade from a shared lock to an exclusive lock.
180 MockColumnFamilyHandle cf(1);
181 locker_->AddColumnFamily(&cf);
182 auto txn = NewTxn();
183 ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false));
184 ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true));
185 delete txn;
186 }
187
188 TEST_F(PointLockManagerTest, LockDowngrade) {
189 // Tests that a txn can acquire a shared lock after acquiring an exclusive
190 // lock on the same key.
191 MockColumnFamilyHandle cf(1);
192 locker_->AddColumnFamily(&cf);
193 auto txn = NewTxn();
194 ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true));
195 ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false));
196 delete txn;
197 }
198
199 TEST_F(PointLockManagerTest, LockConflict) {
200 // Tests that lock conflicts lead to lock timeout.
201 MockColumnFamilyHandle cf(1);
202 locker_->AddColumnFamily(&cf);
203 auto txn1 = NewTxn();
204 auto txn2 = NewTxn();
205
206 {
207 // exclusive-exclusive conflict.
208 ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
209 auto s = locker_->TryLock(txn2, 1, "k1", env_, true);
210 ASSERT_TRUE(s.IsTimedOut());
211 }
212
213 {
214 // exclusive-shared conflict.
215 ASSERT_OK(locker_->TryLock(txn1, 1, "k2", env_, true));
216 auto s = locker_->TryLock(txn2, 1, "k2", env_, false);
217 ASSERT_TRUE(s.IsTimedOut());
218 }
219
220 {
221 // shared-exclusive conflict.
222 ASSERT_OK(locker_->TryLock(txn1, 1, "k2", env_, false));
223 auto s = locker_->TryLock(txn2, 1, "k2", env_, true);
224 ASSERT_TRUE(s.IsTimedOut());
225 }
226
227 delete txn1;
228 delete txn2;
229 }
230
231 port::Thread BlockUntilWaitingTxn(std::function<void()> f) {
232 std::atomic<bool> reached(false);
233 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
234 "PointLockManager::AcquireWithTimeout:WaitingTxn",
235 [&](void* /*arg*/) { reached.store(true); });
236 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
237
238 port::Thread t(f);
239
240 while (!reached.load()) {
241 std::this_thread::sleep_for(std::chrono::milliseconds(100));
242 }
243 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
244 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
245
246 return t;
247 }
248
249 TEST_F(PointLockManagerTest, SharedLocks) {
250 // Tests that shared locks can be concurrently held by multiple transactions.
251 MockColumnFamilyHandle cf(1);
252 locker_->AddColumnFamily(&cf);
253 auto txn1 = NewTxn();
254 auto txn2 = NewTxn();
255 ASSERT_OK(locker_->TryLock(txn1, 1, "k", env_, false));
256 ASSERT_OK(locker_->TryLock(txn2, 1, "k", env_, false));
257 delete txn1;
258 delete txn2;
259 }
260
261 TEST_F(PointLockManagerTest, Deadlock) {
262 // Tests that deadlock can be detected.
263 // Deadlock scenario:
264 // txn1 exclusively locks k1, and wants to lock k2;
265 // txn2 exclusively locks k2, and wants to lock k1.
266 MockColumnFamilyHandle cf(1);
267 locker_->AddColumnFamily(&cf);
268 TransactionOptions txn_opt;
269 txn_opt.deadlock_detect = true;
270 txn_opt.lock_timeout = 1000000;
271 auto txn1 = NewTxn(txn_opt);
272 auto txn2 = NewTxn(txn_opt);
273
274 ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
275 ASSERT_OK(locker_->TryLock(txn2, 1, "k2", env_, true));
276
277 // txn1 tries to lock k2, will block forever.
278 port::Thread t = BlockUntilWaitingTxn([&]() {
279 // block because txn2 is holding a lock on k2.
280 locker_->TryLock(txn1, 1, "k2", env_, true);
281 });
282
283 auto s = locker_->TryLock(txn2, 1, "k1", env_, true);
284 ASSERT_TRUE(s.IsBusy());
285 ASSERT_EQ(s.subcode(), Status::SubCode::kDeadlock);
286
287 std::vector<DeadlockPath> deadlock_paths = locker_->GetDeadlockInfoBuffer();
288 ASSERT_EQ(deadlock_paths.size(), 1u);
289 ASSERT_FALSE(deadlock_paths[0].limit_exceeded);
290
291 std::vector<DeadlockInfo> deadlocks = deadlock_paths[0].path;
292 ASSERT_EQ(deadlocks.size(), 2u);
293
294 ASSERT_EQ(deadlocks[0].m_txn_id, txn1->GetID());
295 ASSERT_EQ(deadlocks[0].m_cf_id, 1u);
296 ASSERT_TRUE(deadlocks[0].m_exclusive);
297 ASSERT_EQ(deadlocks[0].m_waiting_key, "k2");
298
299 ASSERT_EQ(deadlocks[1].m_txn_id, txn2->GetID());
300 ASSERT_EQ(deadlocks[1].m_cf_id, 1u);
301 ASSERT_TRUE(deadlocks[1].m_exclusive);
302 ASSERT_EQ(deadlocks[1].m_waiting_key, "k1");
303
304 locker_->UnLock(txn2, 1, "k2", env_);
305 t.join();
306
307 delete txn2;
308 delete txn1;
309 }
310
311 TEST_F(PointLockManagerTest, DeadlockDepthExceeded) {
312 // Tests that when detecting deadlock, if the detection depth is exceeded,
313 // it's also viewed as deadlock.
314 MockColumnFamilyHandle cf(1);
315 locker_->AddColumnFamily(&cf);
316 TransactionOptions txn_opt;
317 txn_opt.deadlock_detect = true;
318 txn_opt.deadlock_detect_depth = 1;
319 txn_opt.lock_timeout = 1000000;
320 auto txn1 = NewTxn(txn_opt);
321 auto txn2 = NewTxn(txn_opt);
322 auto txn3 = NewTxn(txn_opt);
323 auto txn4 = NewTxn(txn_opt);
324 // "a ->(k) b" means transaction a is waiting for transaction b to release
325 // the held lock on key k.
326 // txn4 ->(k3) -> txn3 ->(k2) txn2 ->(k1) txn1
327 // txn3's deadlock detection will exceed the detection depth 1,
328 // which will be viewed as a deadlock.
329 // NOTE:
330 // txn4 ->(k3) -> txn3 must be set up before
331 // txn3 ->(k2) -> txn2, because to trigger deadlock detection for txn3,
332 // it must have another txn waiting on it, which is txn4 in this case.
333 ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
334
335 port::Thread t1 = BlockUntilWaitingTxn([&]() {
336 ASSERT_OK(locker_->TryLock(txn2, 1, "k2", env_, true));
337 // block because txn1 is holding a lock on k1.
338 locker_->TryLock(txn2, 1, "k1", env_, true);
339 });
340
341 ASSERT_OK(locker_->TryLock(txn3, 1, "k3", env_, true));
342
343 port::Thread t2 = BlockUntilWaitingTxn([&]() {
344 // block because txn3 is holding a lock on k1.
345 locker_->TryLock(txn4, 1, "k3", env_, true);
346 });
347
348 auto s = locker_->TryLock(txn3, 1, "k2", env_, true);
349 ASSERT_TRUE(s.IsBusy());
350 ASSERT_EQ(s.subcode(), Status::SubCode::kDeadlock);
351
352 std::vector<DeadlockPath> deadlock_paths = locker_->GetDeadlockInfoBuffer();
353 ASSERT_EQ(deadlock_paths.size(), 1u);
354 ASSERT_TRUE(deadlock_paths[0].limit_exceeded);
355
356 locker_->UnLock(txn1, 1, "k1", env_);
357 locker_->UnLock(txn3, 1, "k3", env_);
358 t1.join();
359 t2.join();
360
361 delete txn4;
362 delete txn3;
363 delete txn2;
364 delete txn1;
365 }
366
367 } // namespace ROCKSDB_NAMESPACE
368
369 int main(int argc, char** argv) {
370 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
371 ::testing::InitGoogleTest(&argc, argv);
372 return RUN_ALL_TESTS();
373 }
374
375 #else
376 #include <stdio.h>
377
378 int main(int /*argc*/, char** /*argv*/) {
379 fprintf(stderr,
380 "SKIPPED because Transactions are not supported in ROCKSDB_LITE\n");
381 return 0;
382 }
383
384 #endif // ROCKSDB_LITE