]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/transactions/transaction_lock_mgr.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / utilities / transactions / transaction_lock_mgr.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5
6#ifndef ROCKSDB_LITE
7
8#ifndef __STDC_FORMAT_MACROS
9#define __STDC_FORMAT_MACROS
10#endif
11
12#include "utilities/transactions/transaction_lock_mgr.h"
13
14#include <inttypes.h>
15
16#include <algorithm>
17#include <condition_variable>
18#include <functional>
19#include <mutex>
20#include <string>
21#include <vector>
22
11fdf7f2 23#include "monitoring/perf_context_imp.h"
7c673cae
FG
24#include "rocksdb/slice.h"
25#include "rocksdb/utilities/transaction_db_mutex.h"
11fdf7f2 26#include "util/cast_util.h"
494da23a 27#include "util/hash.h"
7c673cae
FG
28#include "util/sync_point.h"
29#include "util/thread_local.h"
11fdf7f2 30#include "utilities/transactions/pessimistic_transaction_db.h"
7c673cae
FG
31
32namespace rocksdb {
33
34struct LockInfo {
35 bool exclusive;
36 autovector<TransactionID> txn_ids;
37
38 // Transaction locks are not valid after this time in us
39 uint64_t expiration_time;
40
41 LockInfo(TransactionID id, uint64_t time, bool ex)
42 : exclusive(ex), expiration_time(time) {
43 txn_ids.push_back(id);
44 }
45 LockInfo(const LockInfo& lock_info)
46 : exclusive(lock_info.exclusive),
47 txn_ids(lock_info.txn_ids),
48 expiration_time(lock_info.expiration_time) {}
49};
50
51struct LockMapStripe {
52 explicit LockMapStripe(std::shared_ptr<TransactionDBMutexFactory> factory) {
53 stripe_mutex = factory->AllocateMutex();
54 stripe_cv = factory->AllocateCondVar();
55 assert(stripe_mutex);
56 assert(stripe_cv);
57 }
58
59 // Mutex must be held before modifying keys map
60 std::shared_ptr<TransactionDBMutex> stripe_mutex;
61
62 // Condition Variable per stripe for waiting on a lock
63 std::shared_ptr<TransactionDBCondVar> stripe_cv;
64
65 // Locked keys mapped to the info about the transactions that locked them.
66 // TODO(agiardullo): Explore performance of other data structures.
67 std::unordered_map<std::string, LockInfo> keys;
68};
69
70// Map of #num_stripes LockMapStripes
71struct LockMap {
72 explicit LockMap(size_t num_stripes,
73 std::shared_ptr<TransactionDBMutexFactory> factory)
74 : num_stripes_(num_stripes) {
75 lock_map_stripes_.reserve(num_stripes);
76 for (size_t i = 0; i < num_stripes; i++) {
77 LockMapStripe* stripe = new LockMapStripe(factory);
78 lock_map_stripes_.push_back(stripe);
79 }
80 }
81
82 ~LockMap() {
83 for (auto stripe : lock_map_stripes_) {
84 delete stripe;
85 }
86 }
87
88 // Number of sepearate LockMapStripes to create, each with their own Mutex
89 const size_t num_stripes_;
90
91 // Count of keys that are currently locked in this column family.
92 // (Only maintained if TransactionLockMgr::max_num_locks_ is positive.)
93 std::atomic<int64_t> lock_cnt{0};
94
95 std::vector<LockMapStripe*> lock_map_stripes_;
96
97 size_t GetStripe(const std::string& key) const;
98};
99
11fdf7f2
TL
100void DeadlockInfoBuffer::AddNewPath(DeadlockPath path) {
101 std::lock_guard<std::mutex> lock(paths_buffer_mutex_);
102
103 if (paths_buffer_.empty()) {
104 return;
105 }
106
494da23a 107 paths_buffer_[buffer_idx_] = std::move(path);
11fdf7f2
TL
108 buffer_idx_ = (buffer_idx_ + 1) % paths_buffer_.size();
109}
110
111void DeadlockInfoBuffer::Resize(uint32_t target_size) {
112 std::lock_guard<std::mutex> lock(paths_buffer_mutex_);
113
114 paths_buffer_ = Normalize();
115
116 // Drop the deadlocks that will no longer be needed ater the normalize
117 if (target_size < paths_buffer_.size()) {
118 paths_buffer_.erase(
119 paths_buffer_.begin(),
120 paths_buffer_.begin() + (paths_buffer_.size() - target_size));
121 buffer_idx_ = 0;
122 }
123 // Resize the buffer to the target size and restore the buffer's idx
124 else {
125 auto prev_size = paths_buffer_.size();
126 paths_buffer_.resize(target_size);
127 buffer_idx_ = (uint32_t)prev_size;
128 }
129}
130
131std::vector<DeadlockPath> DeadlockInfoBuffer::Normalize() {
132 auto working = paths_buffer_;
133
134 if (working.empty()) {
135 return working;
136 }
137
138 // Next write occurs at a nonexistent path's slot
139 if (paths_buffer_[buffer_idx_].empty()) {
140 working.resize(buffer_idx_);
141 } else {
142 std::rotate(working.begin(), working.begin() + buffer_idx_, working.end());
143 }
144
145 return working;
146}
147
148std::vector<DeadlockPath> DeadlockInfoBuffer::PrepareBuffer() {
149 std::lock_guard<std::mutex> lock(paths_buffer_mutex_);
150
151 // Reversing the normalized vector returns the latest deadlocks first
152 auto working = Normalize();
153 std::reverse(working.begin(), working.end());
154
155 return working;
156}
157
7c673cae
FG
158namespace {
159void UnrefLockMapsCache(void* ptr) {
160 // Called when a thread exits or a ThreadLocalPtr gets destroyed.
161 auto lock_maps_cache =
162 static_cast<std::unordered_map<uint32_t, std::shared_ptr<LockMap>>*>(ptr);
163 delete lock_maps_cache;
164}
165} // anonymous namespace
166
167TransactionLockMgr::TransactionLockMgr(
168 TransactionDB* txn_db, size_t default_num_stripes, int64_t max_num_locks,
11fdf7f2 169 uint32_t max_num_deadlocks,
7c673cae
FG
170 std::shared_ptr<TransactionDBMutexFactory> mutex_factory)
171 : txn_db_impl_(nullptr),
172 default_num_stripes_(default_num_stripes),
173 max_num_locks_(max_num_locks),
174 lock_maps_cache_(new ThreadLocalPtr(&UnrefLockMapsCache)),
11fdf7f2 175 dlock_buffer_(max_num_deadlocks),
7c673cae 176 mutex_factory_(mutex_factory) {
11fdf7f2
TL
177 assert(txn_db);
178 txn_db_impl_ =
179 static_cast_with_check<PessimisticTransactionDB, TransactionDB>(txn_db);
7c673cae
FG
180}
181
182TransactionLockMgr::~TransactionLockMgr() {}
183
184size_t LockMap::GetStripe(const std::string& key) const {
185 assert(num_stripes_ > 0);
494da23a 186 size_t stripe = static_cast<size_t>(GetSliceNPHash64(key)) % num_stripes_;
7c673cae
FG
187 return stripe;
188}
189
190void TransactionLockMgr::AddColumnFamily(uint32_t column_family_id) {
191 InstrumentedMutexLock l(&lock_map_mutex_);
192
193 if (lock_maps_.find(column_family_id) == lock_maps_.end()) {
194 lock_maps_.emplace(column_family_id,
195 std::shared_ptr<LockMap>(
196 new LockMap(default_num_stripes_, mutex_factory_)));
197 } else {
198 // column_family already exists in lock map
199 assert(false);
200 }
201}
202
203void TransactionLockMgr::RemoveColumnFamily(uint32_t column_family_id) {
204 // Remove lock_map for this column family. Since the lock map is stored
205 // as a shared ptr, concurrent transactions can still keep using it
206 // until they release their references to it.
207 {
208 InstrumentedMutexLock l(&lock_map_mutex_);
209
210 auto lock_maps_iter = lock_maps_.find(column_family_id);
211 assert(lock_maps_iter != lock_maps_.end());
212
213 lock_maps_.erase(lock_maps_iter);
214 } // lock_map_mutex_
215
216 // Clear all thread-local caches
217 autovector<void*> local_caches;
218 lock_maps_cache_->Scrape(&local_caches, nullptr);
219 for (auto cache : local_caches) {
220 delete static_cast<LockMaps*>(cache);
221 }
222}
223
494da23a 224// Look up the LockMap std::shared_ptr for a given column_family_id.
7c673cae 225// Note: The LockMap is only valid as long as the caller is still holding on
494da23a 226// to the returned std::shared_ptr.
7c673cae
FG
227std::shared_ptr<LockMap> TransactionLockMgr::GetLockMap(
228 uint32_t column_family_id) {
229 // First check thread-local cache
230 if (lock_maps_cache_->Get() == nullptr) {
231 lock_maps_cache_->Reset(new LockMaps());
232 }
233
234 auto lock_maps_cache = static_cast<LockMaps*>(lock_maps_cache_->Get());
235
236 auto lock_map_iter = lock_maps_cache->find(column_family_id);
237 if (lock_map_iter != lock_maps_cache->end()) {
238 // Found lock map for this column family.
239 return lock_map_iter->second;
240 }
241
242 // Not found in local cache, grab mutex and check shared LockMaps
243 InstrumentedMutexLock l(&lock_map_mutex_);
244
245 lock_map_iter = lock_maps_.find(column_family_id);
246 if (lock_map_iter == lock_maps_.end()) {
247 return std::shared_ptr<LockMap>(nullptr);
248 } else {
249 // Found lock map. Store in thread-local cache and return.
250 std::shared_ptr<LockMap>& lock_map = lock_map_iter->second;
251 lock_maps_cache->insert({column_family_id, lock_map});
252
253 return lock_map;
254 }
255}
256
257// Returns true if this lock has expired and can be acquired by another
258// transaction.
259// If false, sets *expire_time to the expiration time of the lock according
260// to Env->GetMicros() or 0 if no expiration.
261bool TransactionLockMgr::IsLockExpired(TransactionID txn_id,
262 const LockInfo& lock_info, Env* env,
263 uint64_t* expire_time) {
264 auto now = env->NowMicros();
265
266 bool expired =
267 (lock_info.expiration_time > 0 && lock_info.expiration_time <= now);
268
269 if (!expired && lock_info.expiration_time > 0) {
270 // return how many microseconds until lock will be expired
271 *expire_time = lock_info.expiration_time;
272 } else {
273 for (auto id : lock_info.txn_ids) {
274 if (txn_id == id) {
275 continue;
276 }
277
278 bool success = txn_db_impl_->TryStealingExpiredTransactionLocks(id);
279 if (!success) {
280 expired = false;
281 break;
282 }
283 *expire_time = 0;
284 }
285 }
286
287 return expired;
288}
289
11fdf7f2 290Status TransactionLockMgr::TryLock(PessimisticTransaction* txn,
7c673cae
FG
291 uint32_t column_family_id,
292 const std::string& key, Env* env,
293 bool exclusive) {
294 // Lookup lock map for this column family id
295 std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
296 LockMap* lock_map = lock_map_ptr.get();
297 if (lock_map == nullptr) {
298 char msg[255];
299 snprintf(msg, sizeof(msg), "Column family id not found: %" PRIu32,
300 column_family_id);
301
302 return Status::InvalidArgument(msg);
303 }
304
305 // Need to lock the mutex for the stripe that this key hashes to
306 size_t stripe_num = lock_map->GetStripe(key);
307 assert(lock_map->lock_map_stripes_.size() > stripe_num);
308 LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
309
310 LockInfo lock_info(txn->GetID(), txn->GetExpirationTime(), exclusive);
311 int64_t timeout = txn->GetLockTimeout();
312
313 return AcquireWithTimeout(txn, lock_map, stripe, column_family_id, key, env,
314 timeout, lock_info);
315}
316
317// Helper function for TryLock().
318Status TransactionLockMgr::AcquireWithTimeout(
11fdf7f2 319 PessimisticTransaction* txn, LockMap* lock_map, LockMapStripe* stripe,
7c673cae
FG
320 uint32_t column_family_id, const std::string& key, Env* env,
321 int64_t timeout, const LockInfo& lock_info) {
322 Status result;
7c673cae
FG
323 uint64_t end_time = 0;
324
325 if (timeout > 0) {
11fdf7f2 326 uint64_t start_time = env->NowMicros();
7c673cae
FG
327 end_time = start_time + timeout;
328 }
329
330 if (timeout < 0) {
331 // If timeout is negative, we wait indefinitely to acquire the lock
332 result = stripe->stripe_mutex->Lock();
333 } else {
334 result = stripe->stripe_mutex->TryLockFor(timeout);
335 }
336
337 if (!result.ok()) {
338 // failed to acquire mutex
339 return result;
340 }
341
342 // Acquire lock if we are able to
343 uint64_t expire_time_hint = 0;
344 autovector<TransactionID> wait_ids;
345 result = AcquireLocked(lock_map, stripe, key, env, lock_info,
346 &expire_time_hint, &wait_ids);
347
348 if (!result.ok() && timeout != 0) {
11fdf7f2
TL
349 PERF_TIMER_GUARD(key_lock_wait_time);
350 PERF_COUNTER_ADD(key_lock_wait_count, 1);
7c673cae
FG
351 // If we weren't able to acquire the lock, we will keep retrying as long
352 // as the timeout allows.
353 bool timed_out = false;
354 do {
355 // Decide how long to wait
356 int64_t cv_end_time = -1;
357
358 // Check if held lock's expiration time is sooner than our timeout
359 if (expire_time_hint > 0 &&
360 (timeout < 0 || (timeout > 0 && expire_time_hint < end_time))) {
361 // expiration time is sooner than our timeout
362 cv_end_time = expire_time_hint;
363 } else if (timeout >= 0) {
364 cv_end_time = end_time;
365 }
366
367 assert(result.IsBusy() || wait_ids.size() != 0);
368
369 // We are dependent on a transaction to finish, so perform deadlock
370 // detection.
371 if (wait_ids.size() != 0) {
372 if (txn->IsDeadlockDetect()) {
11fdf7f2
TL
373 if (IncrementWaiters(txn, wait_ids, key, column_family_id,
374 lock_info.exclusive, env)) {
7c673cae
FG
375 result = Status::Busy(Status::SubCode::kDeadlock);
376 stripe->stripe_mutex->UnLock();
377 return result;
378 }
379 }
380 txn->SetWaitingTxn(wait_ids, column_family_id, &key);
381 }
382
383 TEST_SYNC_POINT("TransactionLockMgr::AcquireWithTimeout:WaitingTxn");
384 if (cv_end_time < 0) {
385 // Wait indefinitely
386 result = stripe->stripe_cv->Wait(stripe->stripe_mutex);
387 } else {
388 uint64_t now = env->NowMicros();
389 if (static_cast<uint64_t>(cv_end_time) > now) {
390 result = stripe->stripe_cv->WaitFor(stripe->stripe_mutex,
391 cv_end_time - now);
392 }
393 }
394
395 if (wait_ids.size() != 0) {
396 txn->ClearWaitingTxn();
397 if (txn->IsDeadlockDetect()) {
398 DecrementWaiters(txn, wait_ids);
399 }
400 }
401
402 if (result.IsTimedOut()) {
403 timed_out = true;
404 // Even though we timed out, we will still make one more attempt to
405 // acquire lock below (it is possible the lock expired and we
406 // were never signaled).
407 }
408
409 if (result.ok() || result.IsTimedOut()) {
410 result = AcquireLocked(lock_map, stripe, key, env, lock_info,
411 &expire_time_hint, &wait_ids);
412 }
413 } while (!result.ok() && !timed_out);
414 }
415
416 stripe->stripe_mutex->UnLock();
417
418 return result;
419}
420
421void TransactionLockMgr::DecrementWaiters(
11fdf7f2
TL
422 const PessimisticTransaction* txn,
423 const autovector<TransactionID>& wait_ids) {
7c673cae
FG
424 std::lock_guard<std::mutex> lock(wait_txn_map_mutex_);
425 DecrementWaitersImpl(txn, wait_ids);
426}
427
428void TransactionLockMgr::DecrementWaitersImpl(
11fdf7f2
TL
429 const PessimisticTransaction* txn,
430 const autovector<TransactionID>& wait_ids) {
7c673cae
FG
431 auto id = txn->GetID();
432 assert(wait_txn_map_.Contains(id));
433 wait_txn_map_.Delete(id);
434
435 for (auto wait_id : wait_ids) {
436 rev_wait_txn_map_.Get(wait_id)--;
437 if (rev_wait_txn_map_.Get(wait_id) == 0) {
438 rev_wait_txn_map_.Delete(wait_id);
439 }
440 }
441}
442
443bool TransactionLockMgr::IncrementWaiters(
11fdf7f2
TL
444 const PessimisticTransaction* txn,
445 const autovector<TransactionID>& wait_ids, const std::string& key,
446 const uint32_t& cf_id, const bool& exclusive, Env* const env) {
7c673cae 447 auto id = txn->GetID();
11fdf7f2
TL
448 std::vector<int> queue_parents(static_cast<size_t>(txn->GetDeadlockDetectDepth()));
449 std::vector<TransactionID> queue_values(static_cast<size_t>(txn->GetDeadlockDetectDepth()));
7c673cae
FG
450 std::lock_guard<std::mutex> lock(wait_txn_map_mutex_);
451 assert(!wait_txn_map_.Contains(id));
11fdf7f2
TL
452
453 wait_txn_map_.Insert(id, {wait_ids, cf_id, key, exclusive});
7c673cae
FG
454
455 for (auto wait_id : wait_ids) {
456 if (rev_wait_txn_map_.Contains(wait_id)) {
457 rev_wait_txn_map_.Get(wait_id)++;
458 } else {
459 rev_wait_txn_map_.Insert(wait_id, 1);
460 }
461 }
462
463 // No deadlock if nobody is waiting on self.
464 if (!rev_wait_txn_map_.Contains(id)) {
465 return false;
466 }
467
468 const auto* next_ids = &wait_ids;
11fdf7f2
TL
469 int parent = -1;
470 int64_t deadlock_time = 0;
7c673cae
FG
471 for (int tail = 0, head = 0; head < txn->GetDeadlockDetectDepth(); head++) {
472 int i = 0;
473 if (next_ids) {
474 for (; i < static_cast<int>(next_ids->size()) &&
475 tail + i < txn->GetDeadlockDetectDepth();
476 i++) {
11fdf7f2
TL
477 queue_values[tail + i] = (*next_ids)[i];
478 queue_parents[tail + i] = parent;
7c673cae
FG
479 }
480 tail += i;
481 }
482
483 // No more items in the list, meaning no deadlock.
484 if (tail == head) {
485 return false;
486 }
487
11fdf7f2 488 auto next = queue_values[head];
7c673cae 489 if (next == id) {
11fdf7f2
TL
490 std::vector<DeadlockInfo> path;
491 while (head != -1) {
492 assert(wait_txn_map_.Contains(queue_values[head]));
493
494 auto extracted_info = wait_txn_map_.Get(queue_values[head]);
495 path.push_back({queue_values[head], extracted_info.m_cf_id,
494da23a
TL
496 extracted_info.m_exclusive,
497 extracted_info.m_waiting_key});
11fdf7f2
TL
498 head = queue_parents[head];
499 }
500 env->GetCurrentTime(&deadlock_time);
501 std::reverse(path.begin(), path.end());
502 dlock_buffer_.AddNewPath(DeadlockPath(path, deadlock_time));
503 deadlock_time = 0;
7c673cae
FG
504 DecrementWaitersImpl(txn, wait_ids);
505 return true;
506 } else if (!wait_txn_map_.Contains(next)) {
507 next_ids = nullptr;
508 continue;
509 } else {
11fdf7f2
TL
510 parent = head;
511 next_ids = &(wait_txn_map_.Get(next).m_neighbors);
7c673cae
FG
512 }
513 }
514
515 // Wait cycle too big, just assume deadlock.
11fdf7f2
TL
516 env->GetCurrentTime(&deadlock_time);
517 dlock_buffer_.AddNewPath(DeadlockPath(deadlock_time, true));
7c673cae
FG
518 DecrementWaitersImpl(txn, wait_ids);
519 return true;
520}
521
522// Try to lock this key after we have acquired the mutex.
523// Sets *expire_time to the expiration time in microseconds
524// or 0 if no expiration.
525// REQUIRED: Stripe mutex must be held.
526Status TransactionLockMgr::AcquireLocked(LockMap* lock_map,
527 LockMapStripe* stripe,
528 const std::string& key, Env* env,
529 const LockInfo& txn_lock_info,
530 uint64_t* expire_time,
531 autovector<TransactionID>* txn_ids) {
532 assert(txn_lock_info.txn_ids.size() == 1);
533
534 Status result;
535 // Check if this key is already locked
11fdf7f2
TL
536 auto stripe_iter = stripe->keys.find(key);
537 if (stripe_iter != stripe->keys.end()) {
7c673cae 538 // Lock already held
11fdf7f2 539 LockInfo& lock_info = stripe_iter->second;
7c673cae
FG
540 assert(lock_info.txn_ids.size() == 1 || !lock_info.exclusive);
541
542 if (lock_info.exclusive || txn_lock_info.exclusive) {
543 if (lock_info.txn_ids.size() == 1 &&
544 lock_info.txn_ids[0] == txn_lock_info.txn_ids[0]) {
545 // The list contains one txn and we're it, so just take it.
546 lock_info.exclusive = txn_lock_info.exclusive;
547 lock_info.expiration_time = txn_lock_info.expiration_time;
548 } else {
549 // Check if it's expired. Skips over txn_lock_info.txn_ids[0] in case
550 // it's there for a shared lock with multiple holders which was not
551 // caught in the first case.
552 if (IsLockExpired(txn_lock_info.txn_ids[0], lock_info, env,
553 expire_time)) {
554 // lock is expired, can steal it
555 lock_info.txn_ids = txn_lock_info.txn_ids;
556 lock_info.exclusive = txn_lock_info.exclusive;
557 lock_info.expiration_time = txn_lock_info.expiration_time;
558 // lock_cnt does not change
559 } else {
560 result = Status::TimedOut(Status::SubCode::kLockTimeout);
561 *txn_ids = lock_info.txn_ids;
562 }
563 }
564 } else {
565 // We are requesting shared access to a shared lock, so just grant it.
566 lock_info.txn_ids.push_back(txn_lock_info.txn_ids[0]);
567 // Using std::max means that expiration time never goes down even when
568 // a transaction is removed from the list. The correct solution would be
569 // to track expiry for every transaction, but this would also work for
570 // now.
571 lock_info.expiration_time =
572 std::max(lock_info.expiration_time, txn_lock_info.expiration_time);
573 }
574 } else { // Lock not held.
575 // Check lock limit
576 if (max_num_locks_ > 0 &&
577 lock_map->lock_cnt.load(std::memory_order_acquire) >= max_num_locks_) {
578 result = Status::Busy(Status::SubCode::kLockLimit);
579 } else {
580 // acquire lock
581 stripe->keys.insert({key, txn_lock_info});
582
583 // Maintain lock count if there is a limit on the number of locks
584 if (max_num_locks_) {
585 lock_map->lock_cnt++;
586 }
587 }
588 }
589
590 return result;
591}
592
11fdf7f2 593void TransactionLockMgr::UnLockKey(const PessimisticTransaction* txn,
7c673cae
FG
594 const std::string& key,
595 LockMapStripe* stripe, LockMap* lock_map,
596 Env* env) {
11fdf7f2
TL
597#ifdef NDEBUG
598 (void)env;
599#endif
7c673cae
FG
600 TransactionID txn_id = txn->GetID();
601
602 auto stripe_iter = stripe->keys.find(key);
603 if (stripe_iter != stripe->keys.end()) {
604 auto& txns = stripe_iter->second.txn_ids;
605 auto txn_it = std::find(txns.begin(), txns.end(), txn_id);
606 // Found the key we locked. unlock it.
607 if (txn_it != txns.end()) {
608 if (txns.size() == 1) {
609 stripe->keys.erase(stripe_iter);
610 } else {
611 auto last_it = txns.end() - 1;
612 if (txn_it != last_it) {
613 *txn_it = *last_it;
614 }
615 txns.pop_back();
616 }
617
618 if (max_num_locks_ > 0) {
619 // Maintain lock count if there is a limit on the number of locks.
620 assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0);
621 lock_map->lock_cnt--;
622 }
623 }
624 } else {
625 // This key is either not locked or locked by someone else. This should
626 // only happen if the unlocking transaction has expired.
627 assert(txn->GetExpirationTime() > 0 &&
628 txn->GetExpirationTime() < env->NowMicros());
629 }
630}
631
11fdf7f2
TL
632void TransactionLockMgr::UnLock(PessimisticTransaction* txn,
633 uint32_t column_family_id,
7c673cae
FG
634 const std::string& key, Env* env) {
635 std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
636 LockMap* lock_map = lock_map_ptr.get();
637 if (lock_map == nullptr) {
638 // Column Family must have been dropped.
639 return;
640 }
641
642 // Lock the mutex for the stripe that this key hashes to
643 size_t stripe_num = lock_map->GetStripe(key);
644 assert(lock_map->lock_map_stripes_.size() > stripe_num);
645 LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
646
647 stripe->stripe_mutex->Lock();
648 UnLockKey(txn, key, stripe, lock_map, env);
649 stripe->stripe_mutex->UnLock();
650
651 // Signal waiting threads to retry locking
652 stripe->stripe_cv->NotifyAll();
653}
654
11fdf7f2 655void TransactionLockMgr::UnLock(const PessimisticTransaction* txn,
7c673cae
FG
656 const TransactionKeyMap* key_map, Env* env) {
657 for (auto& key_map_iter : *key_map) {
658 uint32_t column_family_id = key_map_iter.first;
659 auto& keys = key_map_iter.second;
660
661 std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
662 LockMap* lock_map = lock_map_ptr.get();
663
664 if (lock_map == nullptr) {
665 // Column Family must have been dropped.
666 return;
667 }
668
669 // Bucket keys by lock_map_ stripe
670 std::unordered_map<size_t, std::vector<const std::string*>> keys_by_stripe(
671 std::max(keys.size(), lock_map->num_stripes_));
672
673 for (auto& key_iter : keys) {
674 const std::string& key = key_iter.first;
675
676 size_t stripe_num = lock_map->GetStripe(key);
677 keys_by_stripe[stripe_num].push_back(&key);
678 }
679
680 // For each stripe, grab the stripe mutex and unlock all keys in this stripe
681 for (auto& stripe_iter : keys_by_stripe) {
682 size_t stripe_num = stripe_iter.first;
683 auto& stripe_keys = stripe_iter.second;
684
685 assert(lock_map->lock_map_stripes_.size() > stripe_num);
686 LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
687
688 stripe->stripe_mutex->Lock();
689
690 for (const std::string* key : stripe_keys) {
691 UnLockKey(txn, *key, stripe, lock_map, env);
692 }
693
694 stripe->stripe_mutex->UnLock();
695
696 // Signal waiting threads to retry locking
697 stripe->stripe_cv->NotifyAll();
698 }
699 }
700}
701
702TransactionLockMgr::LockStatusData TransactionLockMgr::GetLockStatusData() {
703 LockStatusData data;
704 // Lock order here is important. The correct order is lock_map_mutex_, then
705 // for every column family ID in ascending order lock every stripe in
706 // ascending order.
707 InstrumentedMutexLock l(&lock_map_mutex_);
708
709 std::vector<uint32_t> cf_ids;
710 for (const auto& map : lock_maps_) {
711 cf_ids.push_back(map.first);
712 }
713 std::sort(cf_ids.begin(), cf_ids.end());
714
715 for (auto i : cf_ids) {
716 const auto& stripes = lock_maps_[i]->lock_map_stripes_;
717 // Iterate and lock all stripes in ascending order.
718 for (const auto& j : stripes) {
719 j->stripe_mutex->Lock();
720 for (const auto& it : j->keys) {
721 struct KeyLockInfo info;
722 info.exclusive = it.second.exclusive;
723 info.key = it.first;
724 for (const auto& id : it.second.txn_ids) {
725 info.ids.push_back(id);
726 }
727 data.insert({i, info});
728 }
729 }
730 }
731
732 // Unlock everything. Unlocking order is not important.
733 for (auto i : cf_ids) {
734 const auto& stripes = lock_maps_[i]->lock_map_stripes_;
735 for (const auto& j : stripes) {
736 j->stripe_mutex->UnLock();
737 }
738 }
739
740 return data;
741}
11fdf7f2
TL
742std::vector<DeadlockPath> TransactionLockMgr::GetDeadlockInfoBuffer() {
743 return dlock_buffer_.PrepareBuffer();
744}
745
746void TransactionLockMgr::Resize(uint32_t target_size) {
747 dlock_buffer_.Resize(target_size);
748}
7c673cae
FG
749
750} // namespace rocksdb
751#endif // ROCKSDB_LITE