]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/transactions/transaction_lock_mgr.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / utilities / transactions / transaction_lock_mgr.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "utilities/transactions/transaction_lock_mgr.h"
9
10 #include <cinttypes>
11
12 #include <algorithm>
13 #include <condition_variable>
14 #include <functional>
15 #include <mutex>
16 #include <string>
17 #include <vector>
18
19 #include "monitoring/perf_context_imp.h"
20 #include "rocksdb/slice.h"
21 #include "rocksdb/utilities/transaction_db_mutex.h"
22 #include "test_util/sync_point.h"
23 #include "util/cast_util.h"
24 #include "util/hash.h"
25 #include "util/thread_local.h"
26 #include "utilities/transactions/pessimistic_transaction_db.h"
27
28 namespace ROCKSDB_NAMESPACE {
29
30 struct LockInfo {
31 bool exclusive;
32 autovector<TransactionID> txn_ids;
33
34 // Transaction locks are not valid after this time in us
35 uint64_t expiration_time;
36
37 LockInfo(TransactionID id, uint64_t time, bool ex)
38 : exclusive(ex), expiration_time(time) {
39 txn_ids.push_back(id);
40 }
41 LockInfo(const LockInfo& lock_info)
42 : exclusive(lock_info.exclusive),
43 txn_ids(lock_info.txn_ids),
44 expiration_time(lock_info.expiration_time) {}
45 };
46
47 struct LockMapStripe {
48 explicit LockMapStripe(std::shared_ptr<TransactionDBMutexFactory> factory) {
49 stripe_mutex = factory->AllocateMutex();
50 stripe_cv = factory->AllocateCondVar();
51 assert(stripe_mutex);
52 assert(stripe_cv);
53 }
54
55 // Mutex must be held before modifying keys map
56 std::shared_ptr<TransactionDBMutex> stripe_mutex;
57
58 // Condition Variable per stripe for waiting on a lock
59 std::shared_ptr<TransactionDBCondVar> stripe_cv;
60
61 // Locked keys mapped to the info about the transactions that locked them.
62 // TODO(agiardullo): Explore performance of other data structures.
63 std::unordered_map<std::string, LockInfo> keys;
64 };
65
66 // Map of #num_stripes LockMapStripes
67 struct LockMap {
68 explicit LockMap(size_t num_stripes,
69 std::shared_ptr<TransactionDBMutexFactory> factory)
70 : num_stripes_(num_stripes) {
71 lock_map_stripes_.reserve(num_stripes);
72 for (size_t i = 0; i < num_stripes; i++) {
73 LockMapStripe* stripe = new LockMapStripe(factory);
74 lock_map_stripes_.push_back(stripe);
75 }
76 }
77
78 ~LockMap() {
79 for (auto stripe : lock_map_stripes_) {
80 delete stripe;
81 }
82 }
83
84 // Number of sepearate LockMapStripes to create, each with their own Mutex
85 const size_t num_stripes_;
86
87 // Count of keys that are currently locked in this column family.
88 // (Only maintained if TransactionLockMgr::max_num_locks_ is positive.)
89 std::atomic<int64_t> lock_cnt{0};
90
91 std::vector<LockMapStripe*> lock_map_stripes_;
92
93 size_t GetStripe(const std::string& key) const;
94 };
95
96 void DeadlockInfoBuffer::AddNewPath(DeadlockPath path) {
97 std::lock_guard<std::mutex> lock(paths_buffer_mutex_);
98
99 if (paths_buffer_.empty()) {
100 return;
101 }
102
103 paths_buffer_[buffer_idx_] = std::move(path);
104 buffer_idx_ = (buffer_idx_ + 1) % paths_buffer_.size();
105 }
106
107 void DeadlockInfoBuffer::Resize(uint32_t target_size) {
108 std::lock_guard<std::mutex> lock(paths_buffer_mutex_);
109
110 paths_buffer_ = Normalize();
111
112 // Drop the deadlocks that will no longer be needed ater the normalize
113 if (target_size < paths_buffer_.size()) {
114 paths_buffer_.erase(
115 paths_buffer_.begin(),
116 paths_buffer_.begin() + (paths_buffer_.size() - target_size));
117 buffer_idx_ = 0;
118 }
119 // Resize the buffer to the target size and restore the buffer's idx
120 else {
121 auto prev_size = paths_buffer_.size();
122 paths_buffer_.resize(target_size);
123 buffer_idx_ = (uint32_t)prev_size;
124 }
125 }
126
127 std::vector<DeadlockPath> DeadlockInfoBuffer::Normalize() {
128 auto working = paths_buffer_;
129
130 if (working.empty()) {
131 return working;
132 }
133
134 // Next write occurs at a nonexistent path's slot
135 if (paths_buffer_[buffer_idx_].empty()) {
136 working.resize(buffer_idx_);
137 } else {
138 std::rotate(working.begin(), working.begin() + buffer_idx_, working.end());
139 }
140
141 return working;
142 }
143
144 std::vector<DeadlockPath> DeadlockInfoBuffer::PrepareBuffer() {
145 std::lock_guard<std::mutex> lock(paths_buffer_mutex_);
146
147 // Reversing the normalized vector returns the latest deadlocks first
148 auto working = Normalize();
149 std::reverse(working.begin(), working.end());
150
151 return working;
152 }
153
154 namespace {
155 void UnrefLockMapsCache(void* ptr) {
156 // Called when a thread exits or a ThreadLocalPtr gets destroyed.
157 auto lock_maps_cache =
158 static_cast<std::unordered_map<uint32_t, std::shared_ptr<LockMap>>*>(ptr);
159 delete lock_maps_cache;
160 }
161 } // anonymous namespace
162
163 TransactionLockMgr::TransactionLockMgr(
164 TransactionDB* txn_db, size_t default_num_stripes, int64_t max_num_locks,
165 uint32_t max_num_deadlocks,
166 std::shared_ptr<TransactionDBMutexFactory> mutex_factory)
167 : txn_db_impl_(nullptr),
168 default_num_stripes_(default_num_stripes),
169 max_num_locks_(max_num_locks),
170 lock_maps_cache_(new ThreadLocalPtr(&UnrefLockMapsCache)),
171 dlock_buffer_(max_num_deadlocks),
172 mutex_factory_(mutex_factory) {
173 assert(txn_db);
174 txn_db_impl_ =
175 static_cast_with_check<PessimisticTransactionDB, TransactionDB>(txn_db);
176 }
177
178 TransactionLockMgr::~TransactionLockMgr() {}
179
180 size_t LockMap::GetStripe(const std::string& key) const {
181 assert(num_stripes_ > 0);
182 return fastrange64(GetSliceNPHash64(key), num_stripes_);
183 }
184
185 void TransactionLockMgr::AddColumnFamily(uint32_t column_family_id) {
186 InstrumentedMutexLock l(&lock_map_mutex_);
187
188 if (lock_maps_.find(column_family_id) == lock_maps_.end()) {
189 lock_maps_.emplace(column_family_id,
190 std::make_shared<LockMap>(default_num_stripes_, mutex_factory_));
191 } else {
192 // column_family already exists in lock map
193 assert(false);
194 }
195 }
196
197 void TransactionLockMgr::RemoveColumnFamily(uint32_t column_family_id) {
198 // Remove lock_map for this column family. Since the lock map is stored
199 // as a shared ptr, concurrent transactions can still keep using it
200 // until they release their references to it.
201 {
202 InstrumentedMutexLock l(&lock_map_mutex_);
203
204 auto lock_maps_iter = lock_maps_.find(column_family_id);
205 assert(lock_maps_iter != lock_maps_.end());
206
207 lock_maps_.erase(lock_maps_iter);
208 } // lock_map_mutex_
209
210 // Clear all thread-local caches
211 autovector<void*> local_caches;
212 lock_maps_cache_->Scrape(&local_caches, nullptr);
213 for (auto cache : local_caches) {
214 delete static_cast<LockMaps*>(cache);
215 }
216 }
217
218 // Look up the LockMap std::shared_ptr for a given column_family_id.
219 // Note: The LockMap is only valid as long as the caller is still holding on
220 // to the returned std::shared_ptr.
221 std::shared_ptr<LockMap> TransactionLockMgr::GetLockMap(
222 uint32_t column_family_id) {
223 // First check thread-local cache
224 if (lock_maps_cache_->Get() == nullptr) {
225 lock_maps_cache_->Reset(new LockMaps());
226 }
227
228 auto lock_maps_cache = static_cast<LockMaps*>(lock_maps_cache_->Get());
229
230 auto lock_map_iter = lock_maps_cache->find(column_family_id);
231 if (lock_map_iter != lock_maps_cache->end()) {
232 // Found lock map for this column family.
233 return lock_map_iter->second;
234 }
235
236 // Not found in local cache, grab mutex and check shared LockMaps
237 InstrumentedMutexLock l(&lock_map_mutex_);
238
239 lock_map_iter = lock_maps_.find(column_family_id);
240 if (lock_map_iter == lock_maps_.end()) {
241 return std::shared_ptr<LockMap>(nullptr);
242 } else {
243 // Found lock map. Store in thread-local cache and return.
244 std::shared_ptr<LockMap>& lock_map = lock_map_iter->second;
245 lock_maps_cache->insert({column_family_id, lock_map});
246
247 return lock_map;
248 }
249 }
250
251 // Returns true if this lock has expired and can be acquired by another
252 // transaction.
253 // If false, sets *expire_time to the expiration time of the lock according
254 // to Env->GetMicros() or 0 if no expiration.
255 bool TransactionLockMgr::IsLockExpired(TransactionID txn_id,
256 const LockInfo& lock_info, Env* env,
257 uint64_t* expire_time) {
258 auto now = env->NowMicros();
259
260 bool expired =
261 (lock_info.expiration_time > 0 && lock_info.expiration_time <= now);
262
263 if (!expired && lock_info.expiration_time > 0) {
264 // return how many microseconds until lock will be expired
265 *expire_time = lock_info.expiration_time;
266 } else {
267 for (auto id : lock_info.txn_ids) {
268 if (txn_id == id) {
269 continue;
270 }
271
272 bool success = txn_db_impl_->TryStealingExpiredTransactionLocks(id);
273 if (!success) {
274 expired = false;
275 break;
276 }
277 *expire_time = 0;
278 }
279 }
280
281 return expired;
282 }
283
284 Status TransactionLockMgr::TryLock(PessimisticTransaction* txn,
285 uint32_t column_family_id,
286 const std::string& key, Env* env,
287 bool exclusive) {
288 // Lookup lock map for this column family id
289 std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
290 LockMap* lock_map = lock_map_ptr.get();
291 if (lock_map == nullptr) {
292 char msg[255];
293 snprintf(msg, sizeof(msg), "Column family id not found: %" PRIu32,
294 column_family_id);
295
296 return Status::InvalidArgument(msg);
297 }
298
299 // Need to lock the mutex for the stripe that this key hashes to
300 size_t stripe_num = lock_map->GetStripe(key);
301 assert(lock_map->lock_map_stripes_.size() > stripe_num);
302 LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
303
304 LockInfo lock_info(txn->GetID(), txn->GetExpirationTime(), exclusive);
305 int64_t timeout = txn->GetLockTimeout();
306
307 return AcquireWithTimeout(txn, lock_map, stripe, column_family_id, key, env,
308 timeout, std::move(lock_info));
309 }
310
311 // Helper function for TryLock().
312 Status TransactionLockMgr::AcquireWithTimeout(
313 PessimisticTransaction* txn, LockMap* lock_map, LockMapStripe* stripe,
314 uint32_t column_family_id, const std::string& key, Env* env,
315 int64_t timeout, LockInfo&& lock_info) {
316 Status result;
317 uint64_t end_time = 0;
318
319 if (timeout > 0) {
320 uint64_t start_time = env->NowMicros();
321 end_time = start_time + timeout;
322 }
323
324 if (timeout < 0) {
325 // If timeout is negative, we wait indefinitely to acquire the lock
326 result = stripe->stripe_mutex->Lock();
327 } else {
328 result = stripe->stripe_mutex->TryLockFor(timeout);
329 }
330
331 if (!result.ok()) {
332 // failed to acquire mutex
333 return result;
334 }
335
336 // Acquire lock if we are able to
337 uint64_t expire_time_hint = 0;
338 autovector<TransactionID> wait_ids;
339 result = AcquireLocked(lock_map, stripe, key, env, std::move(lock_info),
340 &expire_time_hint, &wait_ids);
341
342 if (!result.ok() && timeout != 0) {
343 PERF_TIMER_GUARD(key_lock_wait_time);
344 PERF_COUNTER_ADD(key_lock_wait_count, 1);
345 // If we weren't able to acquire the lock, we will keep retrying as long
346 // as the timeout allows.
347 bool timed_out = false;
348 do {
349 // Decide how long to wait
350 int64_t cv_end_time = -1;
351
352 // Check if held lock's expiration time is sooner than our timeout
353 if (expire_time_hint > 0 &&
354 (timeout < 0 || (timeout > 0 && expire_time_hint < end_time))) {
355 // expiration time is sooner than our timeout
356 cv_end_time = expire_time_hint;
357 } else if (timeout >= 0) {
358 cv_end_time = end_time;
359 }
360
361 assert(result.IsBusy() || wait_ids.size() != 0);
362
363 // We are dependent on a transaction to finish, so perform deadlock
364 // detection.
365 if (wait_ids.size() != 0) {
366 if (txn->IsDeadlockDetect()) {
367 if (IncrementWaiters(txn, wait_ids, key, column_family_id,
368 lock_info.exclusive, env)) {
369 result = Status::Busy(Status::SubCode::kDeadlock);
370 stripe->stripe_mutex->UnLock();
371 return result;
372 }
373 }
374 txn->SetWaitingTxn(wait_ids, column_family_id, &key);
375 }
376
377 TEST_SYNC_POINT("TransactionLockMgr::AcquireWithTimeout:WaitingTxn");
378 if (cv_end_time < 0) {
379 // Wait indefinitely
380 result = stripe->stripe_cv->Wait(stripe->stripe_mutex);
381 } else {
382 uint64_t now = env->NowMicros();
383 if (static_cast<uint64_t>(cv_end_time) > now) {
384 result = stripe->stripe_cv->WaitFor(stripe->stripe_mutex,
385 cv_end_time - now);
386 }
387 }
388
389 if (wait_ids.size() != 0) {
390 txn->ClearWaitingTxn();
391 if (txn->IsDeadlockDetect()) {
392 DecrementWaiters(txn, wait_ids);
393 }
394 }
395
396 if (result.IsTimedOut()) {
397 timed_out = true;
398 // Even though we timed out, we will still make one more attempt to
399 // acquire lock below (it is possible the lock expired and we
400 // were never signaled).
401 }
402
403 if (result.ok() || result.IsTimedOut()) {
404 result = AcquireLocked(lock_map, stripe, key, env, std::move(lock_info),
405 &expire_time_hint, &wait_ids);
406 }
407 } while (!result.ok() && !timed_out);
408 }
409
410 stripe->stripe_mutex->UnLock();
411
412 return result;
413 }
414
415 void TransactionLockMgr::DecrementWaiters(
416 const PessimisticTransaction* txn,
417 const autovector<TransactionID>& wait_ids) {
418 std::lock_guard<std::mutex> lock(wait_txn_map_mutex_);
419 DecrementWaitersImpl(txn, wait_ids);
420 }
421
422 void TransactionLockMgr::DecrementWaitersImpl(
423 const PessimisticTransaction* txn,
424 const autovector<TransactionID>& wait_ids) {
425 auto id = txn->GetID();
426 assert(wait_txn_map_.Contains(id));
427 wait_txn_map_.Delete(id);
428
429 for (auto wait_id : wait_ids) {
430 rev_wait_txn_map_.Get(wait_id)--;
431 if (rev_wait_txn_map_.Get(wait_id) == 0) {
432 rev_wait_txn_map_.Delete(wait_id);
433 }
434 }
435 }
436
437 bool TransactionLockMgr::IncrementWaiters(
438 const PessimisticTransaction* txn,
439 const autovector<TransactionID>& wait_ids, const std::string& key,
440 const uint32_t& cf_id, const bool& exclusive, Env* const env) {
441 auto id = txn->GetID();
442 std::vector<int> queue_parents(static_cast<size_t>(txn->GetDeadlockDetectDepth()));
443 std::vector<TransactionID> queue_values(static_cast<size_t>(txn->GetDeadlockDetectDepth()));
444 std::lock_guard<std::mutex> lock(wait_txn_map_mutex_);
445 assert(!wait_txn_map_.Contains(id));
446
447 wait_txn_map_.Insert(id, {wait_ids, cf_id, exclusive, key});
448
449 for (auto wait_id : wait_ids) {
450 if (rev_wait_txn_map_.Contains(wait_id)) {
451 rev_wait_txn_map_.Get(wait_id)++;
452 } else {
453 rev_wait_txn_map_.Insert(wait_id, 1);
454 }
455 }
456
457 // No deadlock if nobody is waiting on self.
458 if (!rev_wait_txn_map_.Contains(id)) {
459 return false;
460 }
461
462 const auto* next_ids = &wait_ids;
463 int parent = -1;
464 int64_t deadlock_time = 0;
465 for (int tail = 0, head = 0; head < txn->GetDeadlockDetectDepth(); head++) {
466 int i = 0;
467 if (next_ids) {
468 for (; i < static_cast<int>(next_ids->size()) &&
469 tail + i < txn->GetDeadlockDetectDepth();
470 i++) {
471 queue_values[tail + i] = (*next_ids)[i];
472 queue_parents[tail + i] = parent;
473 }
474 tail += i;
475 }
476
477 // No more items in the list, meaning no deadlock.
478 if (tail == head) {
479 return false;
480 }
481
482 auto next = queue_values[head];
483 if (next == id) {
484 std::vector<DeadlockInfo> path;
485 while (head != -1) {
486 assert(wait_txn_map_.Contains(queue_values[head]));
487
488 auto extracted_info = wait_txn_map_.Get(queue_values[head]);
489 path.push_back({queue_values[head], extracted_info.m_cf_id,
490 extracted_info.m_exclusive,
491 extracted_info.m_waiting_key});
492 head = queue_parents[head];
493 }
494 env->GetCurrentTime(&deadlock_time);
495 std::reverse(path.begin(), path.end());
496 dlock_buffer_.AddNewPath(DeadlockPath(path, deadlock_time));
497 deadlock_time = 0;
498 DecrementWaitersImpl(txn, wait_ids);
499 return true;
500 } else if (!wait_txn_map_.Contains(next)) {
501 next_ids = nullptr;
502 continue;
503 } else {
504 parent = head;
505 next_ids = &(wait_txn_map_.Get(next).m_neighbors);
506 }
507 }
508
509 // Wait cycle too big, just assume deadlock.
510 env->GetCurrentTime(&deadlock_time);
511 dlock_buffer_.AddNewPath(DeadlockPath(deadlock_time, true));
512 DecrementWaitersImpl(txn, wait_ids);
513 return true;
514 }
515
516 // Try to lock this key after we have acquired the mutex.
517 // Sets *expire_time to the expiration time in microseconds
518 // or 0 if no expiration.
519 // REQUIRED: Stripe mutex must be held.
520 Status TransactionLockMgr::AcquireLocked(LockMap* lock_map,
521 LockMapStripe* stripe,
522 const std::string& key, Env* env,
523 LockInfo&& txn_lock_info,
524 uint64_t* expire_time,
525 autovector<TransactionID>* txn_ids) {
526 assert(txn_lock_info.txn_ids.size() == 1);
527
528 Status result;
529 // Check if this key is already locked
530 auto stripe_iter = stripe->keys.find(key);
531 if (stripe_iter != stripe->keys.end()) {
532 // Lock already held
533 LockInfo& lock_info = stripe_iter->second;
534 assert(lock_info.txn_ids.size() == 1 || !lock_info.exclusive);
535
536 if (lock_info.exclusive || txn_lock_info.exclusive) {
537 if (lock_info.txn_ids.size() == 1 &&
538 lock_info.txn_ids[0] == txn_lock_info.txn_ids[0]) {
539 // The list contains one txn and we're it, so just take it.
540 lock_info.exclusive = txn_lock_info.exclusive;
541 lock_info.expiration_time = txn_lock_info.expiration_time;
542 } else {
543 // Check if it's expired. Skips over txn_lock_info.txn_ids[0] in case
544 // it's there for a shared lock with multiple holders which was not
545 // caught in the first case.
546 if (IsLockExpired(txn_lock_info.txn_ids[0], lock_info, env,
547 expire_time)) {
548 // lock is expired, can steal it
549 lock_info.txn_ids = txn_lock_info.txn_ids;
550 lock_info.exclusive = txn_lock_info.exclusive;
551 lock_info.expiration_time = txn_lock_info.expiration_time;
552 // lock_cnt does not change
553 } else {
554 result = Status::TimedOut(Status::SubCode::kLockTimeout);
555 *txn_ids = lock_info.txn_ids;
556 }
557 }
558 } else {
559 // We are requesting shared access to a shared lock, so just grant it.
560 lock_info.txn_ids.push_back(txn_lock_info.txn_ids[0]);
561 // Using std::max means that expiration time never goes down even when
562 // a transaction is removed from the list. The correct solution would be
563 // to track expiry for every transaction, but this would also work for
564 // now.
565 lock_info.expiration_time =
566 std::max(lock_info.expiration_time, txn_lock_info.expiration_time);
567 }
568 } else { // Lock not held.
569 // Check lock limit
570 if (max_num_locks_ > 0 &&
571 lock_map->lock_cnt.load(std::memory_order_acquire) >= max_num_locks_) {
572 result = Status::Busy(Status::SubCode::kLockLimit);
573 } else {
574 // acquire lock
575 stripe->keys.emplace(key, std::move(txn_lock_info));
576
577 // Maintain lock count if there is a limit on the number of locks
578 if (max_num_locks_) {
579 lock_map->lock_cnt++;
580 }
581 }
582 }
583
584 return result;
585 }
586
587 void TransactionLockMgr::UnLockKey(const PessimisticTransaction* txn,
588 const std::string& key,
589 LockMapStripe* stripe, LockMap* lock_map,
590 Env* env) {
591 #ifdef NDEBUG
592 (void)env;
593 #endif
594 TransactionID txn_id = txn->GetID();
595
596 auto stripe_iter = stripe->keys.find(key);
597 if (stripe_iter != stripe->keys.end()) {
598 auto& txns = stripe_iter->second.txn_ids;
599 auto txn_it = std::find(txns.begin(), txns.end(), txn_id);
600 // Found the key we locked. unlock it.
601 if (txn_it != txns.end()) {
602 if (txns.size() == 1) {
603 stripe->keys.erase(stripe_iter);
604 } else {
605 auto last_it = txns.end() - 1;
606 if (txn_it != last_it) {
607 *txn_it = *last_it;
608 }
609 txns.pop_back();
610 }
611
612 if (max_num_locks_ > 0) {
613 // Maintain lock count if there is a limit on the number of locks.
614 assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0);
615 lock_map->lock_cnt--;
616 }
617 }
618 } else {
619 // This key is either not locked or locked by someone else. This should
620 // only happen if the unlocking transaction has expired.
621 assert(txn->GetExpirationTime() > 0 &&
622 txn->GetExpirationTime() < env->NowMicros());
623 }
624 }
625
626 void TransactionLockMgr::UnLock(PessimisticTransaction* txn,
627 uint32_t column_family_id,
628 const std::string& key, Env* env) {
629 std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
630 LockMap* lock_map = lock_map_ptr.get();
631 if (lock_map == nullptr) {
632 // Column Family must have been dropped.
633 return;
634 }
635
636 // Lock the mutex for the stripe that this key hashes to
637 size_t stripe_num = lock_map->GetStripe(key);
638 assert(lock_map->lock_map_stripes_.size() > stripe_num);
639 LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
640
641 stripe->stripe_mutex->Lock();
642 UnLockKey(txn, key, stripe, lock_map, env);
643 stripe->stripe_mutex->UnLock();
644
645 // Signal waiting threads to retry locking
646 stripe->stripe_cv->NotifyAll();
647 }
648
649 void TransactionLockMgr::UnLock(const PessimisticTransaction* txn,
650 const TransactionKeyMap* key_map, Env* env) {
651 for (auto& key_map_iter : *key_map) {
652 uint32_t column_family_id = key_map_iter.first;
653 auto& keys = key_map_iter.second;
654
655 std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
656 LockMap* lock_map = lock_map_ptr.get();
657
658 if (lock_map == nullptr) {
659 // Column Family must have been dropped.
660 return;
661 }
662
663 // Bucket keys by lock_map_ stripe
664 std::unordered_map<size_t, std::vector<const std::string*>> keys_by_stripe(
665 std::max(keys.size(), lock_map->num_stripes_));
666
667 for (auto& key_iter : keys) {
668 const std::string& key = key_iter.first;
669
670 size_t stripe_num = lock_map->GetStripe(key);
671 keys_by_stripe[stripe_num].push_back(&key);
672 }
673
674 // For each stripe, grab the stripe mutex and unlock all keys in this stripe
675 for (auto& stripe_iter : keys_by_stripe) {
676 size_t stripe_num = stripe_iter.first;
677 auto& stripe_keys = stripe_iter.second;
678
679 assert(lock_map->lock_map_stripes_.size() > stripe_num);
680 LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
681
682 stripe->stripe_mutex->Lock();
683
684 for (const std::string* key : stripe_keys) {
685 UnLockKey(txn, *key, stripe, lock_map, env);
686 }
687
688 stripe->stripe_mutex->UnLock();
689
690 // Signal waiting threads to retry locking
691 stripe->stripe_cv->NotifyAll();
692 }
693 }
694 }
695
696 TransactionLockMgr::LockStatusData TransactionLockMgr::GetLockStatusData() {
697 LockStatusData data;
698 // Lock order here is important. The correct order is lock_map_mutex_, then
699 // for every column family ID in ascending order lock every stripe in
700 // ascending order.
701 InstrumentedMutexLock l(&lock_map_mutex_);
702
703 std::vector<uint32_t> cf_ids;
704 for (const auto& map : lock_maps_) {
705 cf_ids.push_back(map.first);
706 }
707 std::sort(cf_ids.begin(), cf_ids.end());
708
709 for (auto i : cf_ids) {
710 const auto& stripes = lock_maps_[i]->lock_map_stripes_;
711 // Iterate and lock all stripes in ascending order.
712 for (const auto& j : stripes) {
713 j->stripe_mutex->Lock();
714 for (const auto& it : j->keys) {
715 struct KeyLockInfo info;
716 info.exclusive = it.second.exclusive;
717 info.key = it.first;
718 for (const auto& id : it.second.txn_ids) {
719 info.ids.push_back(id);
720 }
721 data.insert({i, info});
722 }
723 }
724 }
725
726 // Unlock everything. Unlocking order is not important.
727 for (auto i : cf_ids) {
728 const auto& stripes = lock_maps_[i]->lock_map_stripes_;
729 for (const auto& j : stripes) {
730 j->stripe_mutex->UnLock();
731 }
732 }
733
734 return data;
735 }
736 std::vector<DeadlockPath> TransactionLockMgr::GetDeadlockInfoBuffer() {
737 return dlock_buffer_.PrepareBuffer();
738 }
739
740 void TransactionLockMgr::Resize(uint32_t target_size) {
741 dlock_buffer_.Resize(target_size);
742 }
743
744 } // namespace ROCKSDB_NAMESPACE
745 #endif // ROCKSDB_LITE