]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/transactions/lock/point/point_lock_manager.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / utilities / transactions / lock / point / point_lock_manager.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "utilities/transactions/lock/point/point_lock_manager.h"
9
10 #include <algorithm>
11 #include <cinttypes>
12 #include <mutex>
13
14 #include "monitoring/perf_context_imp.h"
15 #include "rocksdb/slice.h"
16 #include "rocksdb/utilities/transaction_db_mutex.h"
17 #include "test_util/sync_point.h"
18 #include "util/cast_util.h"
19 #include "util/hash.h"
20 #include "util/thread_local.h"
21 #include "utilities/transactions/pessimistic_transaction_db.h"
22 #include "utilities/transactions/transaction_db_mutex_impl.h"
23
24 namespace ROCKSDB_NAMESPACE {
25
26 struct LockInfo {
27 bool exclusive;
28 autovector<TransactionID> txn_ids;
29
30 // Transaction locks are not valid after this time in us
31 uint64_t expiration_time;
32
33 LockInfo(TransactionID id, uint64_t time, bool ex)
34 : exclusive(ex), expiration_time(time) {
35 txn_ids.push_back(id);
36 }
37 LockInfo(const LockInfo& lock_info)
38 : exclusive(lock_info.exclusive),
39 txn_ids(lock_info.txn_ids),
40 expiration_time(lock_info.expiration_time) {}
41 void operator=(const LockInfo& lock_info) {
42 exclusive = lock_info.exclusive;
43 txn_ids = lock_info.txn_ids;
44 expiration_time = lock_info.expiration_time;
45 }
46 };
47
48 struct LockMapStripe {
49 explicit LockMapStripe(std::shared_ptr<TransactionDBMutexFactory> factory) {
50 stripe_mutex = factory->AllocateMutex();
51 stripe_cv = factory->AllocateCondVar();
52 assert(stripe_mutex);
53 assert(stripe_cv);
54 }
55
56 // Mutex must be held before modifying keys map
57 std::shared_ptr<TransactionDBMutex> stripe_mutex;
58
59 // Condition Variable per stripe for waiting on a lock
60 std::shared_ptr<TransactionDBCondVar> stripe_cv;
61
62 // Locked keys mapped to the info about the transactions that locked them.
63 // TODO(agiardullo): Explore performance of other data structures.
64 std::unordered_map<std::string, LockInfo> keys;
65 };
66
67 // Map of #num_stripes LockMapStripes
68 struct LockMap {
69 explicit LockMap(size_t num_stripes,
70 std::shared_ptr<TransactionDBMutexFactory> factory)
71 : num_stripes_(num_stripes) {
72 lock_map_stripes_.reserve(num_stripes);
73 for (size_t i = 0; i < num_stripes; i++) {
74 LockMapStripe* stripe = new LockMapStripe(factory);
75 lock_map_stripes_.push_back(stripe);
76 }
77 }
78
79 ~LockMap() {
80 for (auto stripe : lock_map_stripes_) {
81 delete stripe;
82 }
83 }
84
85 // Number of sepearate LockMapStripes to create, each with their own Mutex
86 const size_t num_stripes_;
87
88 // Count of keys that are currently locked in this column family.
89 // (Only maintained if PointLockManager::max_num_locks_ is positive.)
90 std::atomic<int64_t> lock_cnt{0};
91
92 std::vector<LockMapStripe*> lock_map_stripes_;
93
94 size_t GetStripe(const std::string& key) const;
95 };
96
97 void DeadlockInfoBuffer::AddNewPath(DeadlockPath path) {
98 std::lock_guard<std::mutex> lock(paths_buffer_mutex_);
99
100 if (paths_buffer_.empty()) {
101 return;
102 }
103
104 paths_buffer_[buffer_idx_] = std::move(path);
105 buffer_idx_ = (buffer_idx_ + 1) % paths_buffer_.size();
106 }
107
108 void DeadlockInfoBuffer::Resize(uint32_t target_size) {
109 std::lock_guard<std::mutex> lock(paths_buffer_mutex_);
110
111 paths_buffer_ = Normalize();
112
113 // Drop the deadlocks that will no longer be needed ater the normalize
114 if (target_size < paths_buffer_.size()) {
115 paths_buffer_.erase(
116 paths_buffer_.begin(),
117 paths_buffer_.begin() + (paths_buffer_.size() - target_size));
118 buffer_idx_ = 0;
119 }
120 // Resize the buffer to the target size and restore the buffer's idx
121 else {
122 auto prev_size = paths_buffer_.size();
123 paths_buffer_.resize(target_size);
124 buffer_idx_ = (uint32_t)prev_size;
125 }
126 }
127
128 std::vector<DeadlockPath> DeadlockInfoBuffer::Normalize() {
129 auto working = paths_buffer_;
130
131 if (working.empty()) {
132 return working;
133 }
134
135 // Next write occurs at a nonexistent path's slot
136 if (paths_buffer_[buffer_idx_].empty()) {
137 working.resize(buffer_idx_);
138 } else {
139 std::rotate(working.begin(), working.begin() + buffer_idx_, working.end());
140 }
141
142 return working;
143 }
144
145 std::vector<DeadlockPath> DeadlockInfoBuffer::PrepareBuffer() {
146 std::lock_guard<std::mutex> lock(paths_buffer_mutex_);
147
148 // Reversing the normalized vector returns the latest deadlocks first
149 auto working = Normalize();
150 std::reverse(working.begin(), working.end());
151
152 return working;
153 }
154
155 namespace {
156 void UnrefLockMapsCache(void* ptr) {
157 // Called when a thread exits or a ThreadLocalPtr gets destroyed.
158 auto lock_maps_cache =
159 static_cast<std::unordered_map<uint32_t, std::shared_ptr<LockMap>>*>(ptr);
160 delete lock_maps_cache;
161 }
162 } // anonymous namespace
163
164 PointLockManager::PointLockManager(PessimisticTransactionDB* txn_db,
165 const TransactionDBOptions& opt)
166 : txn_db_impl_(txn_db),
167 default_num_stripes_(opt.num_stripes),
168 max_num_locks_(opt.max_num_locks),
169 lock_maps_cache_(new ThreadLocalPtr(&UnrefLockMapsCache)),
170 dlock_buffer_(opt.max_num_deadlocks),
171 mutex_factory_(opt.custom_mutex_factory
172 ? opt.custom_mutex_factory
173 : std::make_shared<TransactionDBMutexFactoryImpl>()) {}
174
175 PointLockManager::~PointLockManager() {}
176
177 size_t LockMap::GetStripe(const std::string& key) const {
178 assert(num_stripes_ > 0);
179 return FastRange64(GetSliceNPHash64(key), num_stripes_);
180 }
181
182 void PointLockManager::AddColumnFamily(const ColumnFamilyHandle* cf) {
183 InstrumentedMutexLock l(&lock_map_mutex_);
184
185 if (lock_maps_.find(cf->GetID()) == lock_maps_.end()) {
186 lock_maps_.emplace(cf->GetID(), std::make_shared<LockMap>(
187 default_num_stripes_, mutex_factory_));
188 } else {
189 // column_family already exists in lock map
190 assert(false);
191 }
192 }
193
194 void PointLockManager::RemoveColumnFamily(const ColumnFamilyHandle* cf) {
195 // Remove lock_map for this column family. Since the lock map is stored
196 // as a shared ptr, concurrent transactions can still keep using it
197 // until they release their references to it.
198 {
199 InstrumentedMutexLock l(&lock_map_mutex_);
200
201 auto lock_maps_iter = lock_maps_.find(cf->GetID());
202 if (lock_maps_iter == lock_maps_.end()) {
203 return;
204 }
205
206 lock_maps_.erase(lock_maps_iter);
207 } // lock_map_mutex_
208
209 // Clear all thread-local caches
210 autovector<void*> local_caches;
211 lock_maps_cache_->Scrape(&local_caches, nullptr);
212 for (auto cache : local_caches) {
213 delete static_cast<LockMaps*>(cache);
214 }
215 }
216
217 // Look up the LockMap std::shared_ptr for a given column_family_id.
218 // Note: The LockMap is only valid as long as the caller is still holding on
219 // to the returned std::shared_ptr.
220 std::shared_ptr<LockMap> PointLockManager::GetLockMap(
221 ColumnFamilyId column_family_id) {
222 // First check thread-local cache
223 if (lock_maps_cache_->Get() == nullptr) {
224 lock_maps_cache_->Reset(new LockMaps());
225 }
226
227 auto lock_maps_cache = static_cast<LockMaps*>(lock_maps_cache_->Get());
228
229 auto lock_map_iter = lock_maps_cache->find(column_family_id);
230 if (lock_map_iter != lock_maps_cache->end()) {
231 // Found lock map for this column family.
232 return lock_map_iter->second;
233 }
234
235 // Not found in local cache, grab mutex and check shared LockMaps
236 InstrumentedMutexLock l(&lock_map_mutex_);
237
238 lock_map_iter = lock_maps_.find(column_family_id);
239 if (lock_map_iter == lock_maps_.end()) {
240 return std::shared_ptr<LockMap>(nullptr);
241 } else {
242 // Found lock map. Store in thread-local cache and return.
243 std::shared_ptr<LockMap>& lock_map = lock_map_iter->second;
244 lock_maps_cache->insert({column_family_id, lock_map});
245
246 return lock_map;
247 }
248 }
249
250 // Returns true if this lock has expired and can be acquired by another
251 // transaction.
252 // If false, sets *expire_time to the expiration time of the lock according
253 // to Env->GetMicros() or 0 if no expiration.
254 bool PointLockManager::IsLockExpired(TransactionID txn_id,
255 const LockInfo& lock_info, Env* env,
256 uint64_t* expire_time) {
257 if (lock_info.expiration_time == 0) {
258 *expire_time = 0;
259 return false;
260 }
261
262 auto now = env->NowMicros();
263 bool expired = lock_info.expiration_time <= now;
264 if (!expired) {
265 // return how many microseconds until lock will be expired
266 *expire_time = lock_info.expiration_time;
267 } else {
268 for (auto id : lock_info.txn_ids) {
269 if (txn_id == id) {
270 continue;
271 }
272
273 bool success = txn_db_impl_->TryStealingExpiredTransactionLocks(id);
274 if (!success) {
275 expired = false;
276 *expire_time = 0;
277 break;
278 }
279 }
280 }
281
282 return expired;
283 }
284
285 Status PointLockManager::TryLock(PessimisticTransaction* txn,
286 ColumnFamilyId column_family_id,
287 const std::string& key, Env* env,
288 bool exclusive) {
289 // Lookup lock map for this column family id
290 std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
291 LockMap* lock_map = lock_map_ptr.get();
292 if (lock_map == nullptr) {
293 char msg[255];
294 snprintf(msg, sizeof(msg), "Column family id not found: %" PRIu32,
295 column_family_id);
296
297 return Status::InvalidArgument(msg);
298 }
299
300 // Need to lock the mutex for the stripe that this key hashes to
301 size_t stripe_num = lock_map->GetStripe(key);
302 assert(lock_map->lock_map_stripes_.size() > stripe_num);
303 LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
304
305 LockInfo lock_info(txn->GetID(), txn->GetExpirationTime(), exclusive);
306 int64_t timeout = txn->GetLockTimeout();
307
308 return AcquireWithTimeout(txn, lock_map, stripe, column_family_id, key, env,
309 timeout, std::move(lock_info));
310 }
311
312 // Helper function for TryLock().
313 Status PointLockManager::AcquireWithTimeout(
314 PessimisticTransaction* txn, LockMap* lock_map, LockMapStripe* stripe,
315 ColumnFamilyId column_family_id, const std::string& key, Env* env,
316 int64_t timeout, LockInfo&& lock_info) {
317 Status result;
318 uint64_t end_time = 0;
319
320 if (timeout > 0) {
321 uint64_t start_time = env->NowMicros();
322 end_time = start_time + timeout;
323 }
324
325 if (timeout < 0) {
326 // If timeout is negative, we wait indefinitely to acquire the lock
327 result = stripe->stripe_mutex->Lock();
328 } else {
329 result = stripe->stripe_mutex->TryLockFor(timeout);
330 }
331
332 if (!result.ok()) {
333 // failed to acquire mutex
334 return result;
335 }
336
337 // Acquire lock if we are able to
338 uint64_t expire_time_hint = 0;
339 autovector<TransactionID> wait_ids;
340 result = AcquireLocked(lock_map, stripe, key, env, std::move(lock_info),
341 &expire_time_hint, &wait_ids);
342
343 if (!result.ok() && timeout != 0) {
344 PERF_TIMER_GUARD(key_lock_wait_time);
345 PERF_COUNTER_ADD(key_lock_wait_count, 1);
346 // If we weren't able to acquire the lock, we will keep retrying as long
347 // as the timeout allows.
348 bool timed_out = false;
349 do {
350 // Decide how long to wait
351 int64_t cv_end_time = -1;
352 if (expire_time_hint > 0 && end_time > 0) {
353 cv_end_time = std::min(expire_time_hint, end_time);
354 } else if (expire_time_hint > 0) {
355 cv_end_time = expire_time_hint;
356 } else if (end_time > 0) {
357 cv_end_time = end_time;
358 }
359
360 assert(result.IsBusy() || wait_ids.size() != 0);
361
362 // We are dependent on a transaction to finish, so perform deadlock
363 // detection.
364 if (wait_ids.size() != 0) {
365 if (txn->IsDeadlockDetect()) {
366 if (IncrementWaiters(txn, wait_ids, key, column_family_id,
367 lock_info.exclusive, env)) {
368 result = Status::Busy(Status::SubCode::kDeadlock);
369 stripe->stripe_mutex->UnLock();
370 return result;
371 }
372 }
373 txn->SetWaitingTxn(wait_ids, column_family_id, &key);
374 }
375
376 TEST_SYNC_POINT("PointLockManager::AcquireWithTimeout:WaitingTxn");
377 if (cv_end_time < 0) {
378 // Wait indefinitely
379 result = stripe->stripe_cv->Wait(stripe->stripe_mutex);
380 } else {
381 uint64_t now = env->NowMicros();
382 if (static_cast<uint64_t>(cv_end_time) > now) {
383 result = stripe->stripe_cv->WaitFor(stripe->stripe_mutex,
384 cv_end_time - now);
385 }
386 }
387
388 if (wait_ids.size() != 0) {
389 txn->ClearWaitingTxn();
390 if (txn->IsDeadlockDetect()) {
391 DecrementWaiters(txn, wait_ids);
392 }
393 }
394
395 if (result.IsTimedOut()) {
396 timed_out = true;
397 // Even though we timed out, we will still make one more attempt to
398 // acquire lock below (it is possible the lock expired and we
399 // were never signaled).
400 }
401
402 if (result.ok() || result.IsTimedOut()) {
403 result = AcquireLocked(lock_map, stripe, key, env, std::move(lock_info),
404 &expire_time_hint, &wait_ids);
405 }
406 } while (!result.ok() && !timed_out);
407 }
408
409 stripe->stripe_mutex->UnLock();
410
411 return result;
412 }
413
414 void PointLockManager::DecrementWaiters(
415 const PessimisticTransaction* txn,
416 const autovector<TransactionID>& wait_ids) {
417 std::lock_guard<std::mutex> lock(wait_txn_map_mutex_);
418 DecrementWaitersImpl(txn, wait_ids);
419 }
420
421 void PointLockManager::DecrementWaitersImpl(
422 const PessimisticTransaction* txn,
423 const autovector<TransactionID>& wait_ids) {
424 auto id = txn->GetID();
425 assert(wait_txn_map_.Contains(id));
426 wait_txn_map_.Delete(id);
427
428 for (auto wait_id : wait_ids) {
429 rev_wait_txn_map_.Get(wait_id)--;
430 if (rev_wait_txn_map_.Get(wait_id) == 0) {
431 rev_wait_txn_map_.Delete(wait_id);
432 }
433 }
434 }
435
436 bool PointLockManager::IncrementWaiters(
437 const PessimisticTransaction* txn,
438 const autovector<TransactionID>& wait_ids, const std::string& key,
439 const uint32_t& cf_id, const bool& exclusive, Env* const env) {
440 auto id = txn->GetID();
441 std::vector<int> queue_parents(static_cast<size_t>(txn->GetDeadlockDetectDepth()));
442 std::vector<TransactionID> queue_values(static_cast<size_t>(txn->GetDeadlockDetectDepth()));
443 std::lock_guard<std::mutex> lock(wait_txn_map_mutex_);
444 assert(!wait_txn_map_.Contains(id));
445
446 wait_txn_map_.Insert(id, {wait_ids, cf_id, exclusive, key});
447
448 for (auto wait_id : wait_ids) {
449 if (rev_wait_txn_map_.Contains(wait_id)) {
450 rev_wait_txn_map_.Get(wait_id)++;
451 } else {
452 rev_wait_txn_map_.Insert(wait_id, 1);
453 }
454 }
455
456 // No deadlock if nobody is waiting on self.
457 if (!rev_wait_txn_map_.Contains(id)) {
458 return false;
459 }
460
461 const auto* next_ids = &wait_ids;
462 int parent = -1;
463 int64_t deadlock_time = 0;
464 for (int tail = 0, head = 0; head < txn->GetDeadlockDetectDepth(); head++) {
465 int i = 0;
466 if (next_ids) {
467 for (; i < static_cast<int>(next_ids->size()) &&
468 tail + i < txn->GetDeadlockDetectDepth();
469 i++) {
470 queue_values[tail + i] = (*next_ids)[i];
471 queue_parents[tail + i] = parent;
472 }
473 tail += i;
474 }
475
476 // No more items in the list, meaning no deadlock.
477 if (tail == head) {
478 return false;
479 }
480
481 auto next = queue_values[head];
482 if (next == id) {
483 std::vector<DeadlockInfo> path;
484 while (head != -1) {
485 assert(wait_txn_map_.Contains(queue_values[head]));
486
487 auto extracted_info = wait_txn_map_.Get(queue_values[head]);
488 path.push_back({queue_values[head], extracted_info.m_cf_id,
489 extracted_info.m_exclusive,
490 extracted_info.m_waiting_key});
491 head = queue_parents[head];
492 }
493 env->GetCurrentTime(&deadlock_time);
494 std::reverse(path.begin(), path.end());
495 dlock_buffer_.AddNewPath(DeadlockPath(path, deadlock_time));
496 deadlock_time = 0;
497 DecrementWaitersImpl(txn, wait_ids);
498 return true;
499 } else if (!wait_txn_map_.Contains(next)) {
500 next_ids = nullptr;
501 continue;
502 } else {
503 parent = head;
504 next_ids = &(wait_txn_map_.Get(next).m_neighbors);
505 }
506 }
507
508 // Wait cycle too big, just assume deadlock.
509 env->GetCurrentTime(&deadlock_time);
510 dlock_buffer_.AddNewPath(DeadlockPath(deadlock_time, true));
511 DecrementWaitersImpl(txn, wait_ids);
512 return true;
513 }
514
515 // Try to lock this key after we have acquired the mutex.
516 // Sets *expire_time to the expiration time in microseconds
517 // or 0 if no expiration.
518 // REQUIRED: Stripe mutex must be held.
519 Status PointLockManager::AcquireLocked(LockMap* lock_map, LockMapStripe* stripe,
520 const std::string& key, Env* env,
521 LockInfo&& txn_lock_info,
522 uint64_t* expire_time,
523 autovector<TransactionID>* txn_ids) {
524 assert(txn_lock_info.txn_ids.size() == 1);
525
526 Status result;
527 // Check if this key is already locked
528 auto stripe_iter = stripe->keys.find(key);
529 if (stripe_iter != stripe->keys.end()) {
530 // Lock already held
531 LockInfo& lock_info = stripe_iter->second;
532 assert(lock_info.txn_ids.size() == 1 || !lock_info.exclusive);
533
534 if (lock_info.exclusive || txn_lock_info.exclusive) {
535 if (lock_info.txn_ids.size() == 1 &&
536 lock_info.txn_ids[0] == txn_lock_info.txn_ids[0]) {
537 // The list contains one txn and we're it, so just take it.
538 lock_info.exclusive = txn_lock_info.exclusive;
539 lock_info.expiration_time = txn_lock_info.expiration_time;
540 } else {
541 // Check if it's expired. Skips over txn_lock_info.txn_ids[0] in case
542 // it's there for a shared lock with multiple holders which was not
543 // caught in the first case.
544 if (IsLockExpired(txn_lock_info.txn_ids[0], lock_info, env,
545 expire_time)) {
546 // lock is expired, can steal it
547 lock_info.txn_ids = txn_lock_info.txn_ids;
548 lock_info.exclusive = txn_lock_info.exclusive;
549 lock_info.expiration_time = txn_lock_info.expiration_time;
550 // lock_cnt does not change
551 } else {
552 result = Status::TimedOut(Status::SubCode::kLockTimeout);
553 *txn_ids = lock_info.txn_ids;
554 }
555 }
556 } else {
557 // We are requesting shared access to a shared lock, so just grant it.
558 lock_info.txn_ids.push_back(txn_lock_info.txn_ids[0]);
559 // Using std::max means that expiration time never goes down even when
560 // a transaction is removed from the list. The correct solution would be
561 // to track expiry for every transaction, but this would also work for
562 // now.
563 lock_info.expiration_time =
564 std::max(lock_info.expiration_time, txn_lock_info.expiration_time);
565 }
566 } else { // Lock not held.
567 // Check lock limit
568 if (max_num_locks_ > 0 &&
569 lock_map->lock_cnt.load(std::memory_order_acquire) >= max_num_locks_) {
570 result = Status::Busy(Status::SubCode::kLockLimit);
571 } else {
572 // acquire lock
573 stripe->keys.emplace(key, std::move(txn_lock_info));
574
575 // Maintain lock count if there is a limit on the number of locks
576 if (max_num_locks_) {
577 lock_map->lock_cnt++;
578 }
579 }
580 }
581
582 return result;
583 }
584
585 void PointLockManager::UnLockKey(PessimisticTransaction* txn,
586 const std::string& key, LockMapStripe* stripe,
587 LockMap* lock_map, Env* env) {
588 #ifdef NDEBUG
589 (void)env;
590 #endif
591 TransactionID txn_id = txn->GetID();
592
593 auto stripe_iter = stripe->keys.find(key);
594 if (stripe_iter != stripe->keys.end()) {
595 auto& txns = stripe_iter->second.txn_ids;
596 auto txn_it = std::find(txns.begin(), txns.end(), txn_id);
597 // Found the key we locked. unlock it.
598 if (txn_it != txns.end()) {
599 if (txns.size() == 1) {
600 stripe->keys.erase(stripe_iter);
601 } else {
602 auto last_it = txns.end() - 1;
603 if (txn_it != last_it) {
604 *txn_it = *last_it;
605 }
606 txns.pop_back();
607 }
608
609 if (max_num_locks_ > 0) {
610 // Maintain lock count if there is a limit on the number of locks.
611 assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0);
612 lock_map->lock_cnt--;
613 }
614 }
615 } else {
616 // This key is either not locked or locked by someone else. This should
617 // only happen if the unlocking transaction has expired.
618 assert(txn->GetExpirationTime() > 0 &&
619 txn->GetExpirationTime() < env->NowMicros());
620 }
621 }
622
623 void PointLockManager::UnLock(PessimisticTransaction* txn,
624 ColumnFamilyId column_family_id,
625 const std::string& key, Env* env) {
626 std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
627 LockMap* lock_map = lock_map_ptr.get();
628 if (lock_map == nullptr) {
629 // Column Family must have been dropped.
630 return;
631 }
632
633 // Lock the mutex for the stripe that this key hashes to
634 size_t stripe_num = lock_map->GetStripe(key);
635 assert(lock_map->lock_map_stripes_.size() > stripe_num);
636 LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
637
638 stripe->stripe_mutex->Lock().PermitUncheckedError();
639 UnLockKey(txn, key, stripe, lock_map, env);
640 stripe->stripe_mutex->UnLock();
641
642 // Signal waiting threads to retry locking
643 stripe->stripe_cv->NotifyAll();
644 }
645
646 void PointLockManager::UnLock(PessimisticTransaction* txn,
647 const LockTracker& tracker, Env* env) {
648 std::unique_ptr<LockTracker::ColumnFamilyIterator> cf_it(
649 tracker.GetColumnFamilyIterator());
650 assert(cf_it != nullptr);
651 while (cf_it->HasNext()) {
652 ColumnFamilyId cf = cf_it->Next();
653 std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(cf);
654 LockMap* lock_map = lock_map_ptr.get();
655 if (!lock_map) {
656 // Column Family must have been dropped.
657 return;
658 }
659
660 // Bucket keys by lock_map_ stripe
661 std::unordered_map<size_t, std::vector<const std::string*>> keys_by_stripe(
662 lock_map->num_stripes_);
663 std::unique_ptr<LockTracker::KeyIterator> key_it(
664 tracker.GetKeyIterator(cf));
665 assert(key_it != nullptr);
666 while (key_it->HasNext()) {
667 const std::string& key = key_it->Next();
668 size_t stripe_num = lock_map->GetStripe(key);
669 keys_by_stripe[stripe_num].push_back(&key);
670 }
671
672 // For each stripe, grab the stripe mutex and unlock all keys in this stripe
673 for (auto& stripe_iter : keys_by_stripe) {
674 size_t stripe_num = stripe_iter.first;
675 auto& stripe_keys = stripe_iter.second;
676
677 assert(lock_map->lock_map_stripes_.size() > stripe_num);
678 LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
679
680 stripe->stripe_mutex->Lock().PermitUncheckedError();
681
682 for (const std::string* key : stripe_keys) {
683 UnLockKey(txn, *key, stripe, lock_map, env);
684 }
685
686 stripe->stripe_mutex->UnLock();
687
688 // Signal waiting threads to retry locking
689 stripe->stripe_cv->NotifyAll();
690 }
691 }
692 }
693
694 PointLockManager::PointLockStatus PointLockManager::GetPointLockStatus() {
695 PointLockStatus data;
696 // Lock order here is important. The correct order is lock_map_mutex_, then
697 // for every column family ID in ascending order lock every stripe in
698 // ascending order.
699 InstrumentedMutexLock l(&lock_map_mutex_);
700
701 std::vector<uint32_t> cf_ids;
702 for (const auto& map : lock_maps_) {
703 cf_ids.push_back(map.first);
704 }
705 std::sort(cf_ids.begin(), cf_ids.end());
706
707 for (auto i : cf_ids) {
708 const auto& stripes = lock_maps_[i]->lock_map_stripes_;
709 // Iterate and lock all stripes in ascending order.
710 for (const auto& j : stripes) {
711 j->stripe_mutex->Lock().PermitUncheckedError();
712 for (const auto& it : j->keys) {
713 struct KeyLockInfo info;
714 info.exclusive = it.second.exclusive;
715 info.key = it.first;
716 for (const auto& id : it.second.txn_ids) {
717 info.ids.push_back(id);
718 }
719 data.insert({i, info});
720 }
721 }
722 }
723
724 // Unlock everything. Unlocking order is not important.
725 for (auto i : cf_ids) {
726 const auto& stripes = lock_maps_[i]->lock_map_stripes_;
727 for (const auto& j : stripes) {
728 j->stripe_mutex->UnLock();
729 }
730 }
731
732 return data;
733 }
734
735 std::vector<DeadlockPath> PointLockManager::GetDeadlockInfoBuffer() {
736 return dlock_buffer_.PrepareBuffer();
737 }
738
739 void PointLockManager::Resize(uint32_t target_size) {
740 dlock_buffer_.Resize(target_size);
741 }
742
743 PointLockManager::RangeLockStatus PointLockManager::GetRangeLockStatus() {
744 return {};
745 }
746
747 Status PointLockManager::TryLock(PessimisticTransaction* /* txn */,
748 ColumnFamilyId /* cf_id */,
749 const Endpoint& /* start */,
750 const Endpoint& /* end */, Env* /* env */,
751 bool /* exclusive */) {
752 return Status::NotSupported(
753 "PointLockManager does not support range locking");
754 }
755
756 void PointLockManager::UnLock(PessimisticTransaction* /* txn */,
757 ColumnFamilyId /* cf_id */,
758 const Endpoint& /* start */,
759 const Endpoint& /* end */, Env* /* env */) {
760 // no-op
761 }
762
763 } // namespace ROCKSDB_NAMESPACE
764 #endif // ROCKSDB_LITE