]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | |
6 | #ifndef ROCKSDB_LITE | |
7 | ||
7c673cae FG |
8 | #include "utilities/transactions/transaction_lock_mgr.h" |
9 | ||
f67539c2 | 10 | #include <cinttypes> |
7c673cae FG |
11 | |
12 | #include <algorithm> | |
13 | #include <condition_variable> | |
14 | #include <functional> | |
15 | #include <mutex> | |
16 | #include <string> | |
17 | #include <vector> | |
18 | ||
11fdf7f2 | 19 | #include "monitoring/perf_context_imp.h" |
7c673cae FG |
20 | #include "rocksdb/slice.h" |
21 | #include "rocksdb/utilities/transaction_db_mutex.h" | |
f67539c2 | 22 | #include "test_util/sync_point.h" |
11fdf7f2 | 23 | #include "util/cast_util.h" |
494da23a | 24 | #include "util/hash.h" |
7c673cae | 25 | #include "util/thread_local.h" |
11fdf7f2 | 26 | #include "utilities/transactions/pessimistic_transaction_db.h" |
7c673cae | 27 | |
f67539c2 | 28 | namespace ROCKSDB_NAMESPACE { |
7c673cae FG |
29 | |
30 | struct LockInfo { | |
31 | bool exclusive; | |
32 | autovector<TransactionID> txn_ids; | |
33 | ||
34 | // Transaction locks are not valid after this time in us | |
35 | uint64_t expiration_time; | |
36 | ||
37 | LockInfo(TransactionID id, uint64_t time, bool ex) | |
38 | : exclusive(ex), expiration_time(time) { | |
39 | txn_ids.push_back(id); | |
40 | } | |
41 | LockInfo(const LockInfo& lock_info) | |
42 | : exclusive(lock_info.exclusive), | |
43 | txn_ids(lock_info.txn_ids), | |
44 | expiration_time(lock_info.expiration_time) {} | |
45 | }; | |
46 | ||
47 | struct LockMapStripe { | |
48 | explicit LockMapStripe(std::shared_ptr<TransactionDBMutexFactory> factory) { | |
49 | stripe_mutex = factory->AllocateMutex(); | |
50 | stripe_cv = factory->AllocateCondVar(); | |
51 | assert(stripe_mutex); | |
52 | assert(stripe_cv); | |
53 | } | |
54 | ||
55 | // Mutex must be held before modifying keys map | |
56 | std::shared_ptr<TransactionDBMutex> stripe_mutex; | |
57 | ||
58 | // Condition Variable per stripe for waiting on a lock | |
59 | std::shared_ptr<TransactionDBCondVar> stripe_cv; | |
60 | ||
61 | // Locked keys mapped to the info about the transactions that locked them. | |
62 | // TODO(agiardullo): Explore performance of other data structures. | |
63 | std::unordered_map<std::string, LockInfo> keys; | |
64 | }; | |
65 | ||
66 | // Map of #num_stripes LockMapStripes | |
67 | struct LockMap { | |
68 | explicit LockMap(size_t num_stripes, | |
69 | std::shared_ptr<TransactionDBMutexFactory> factory) | |
70 | : num_stripes_(num_stripes) { | |
71 | lock_map_stripes_.reserve(num_stripes); | |
72 | for (size_t i = 0; i < num_stripes; i++) { | |
73 | LockMapStripe* stripe = new LockMapStripe(factory); | |
74 | lock_map_stripes_.push_back(stripe); | |
75 | } | |
76 | } | |
77 | ||
78 | ~LockMap() { | |
79 | for (auto stripe : lock_map_stripes_) { | |
80 | delete stripe; | |
81 | } | |
82 | } | |
83 | ||
84 | // Number of sepearate LockMapStripes to create, each with their own Mutex | |
85 | const size_t num_stripes_; | |
86 | ||
87 | // Count of keys that are currently locked in this column family. | |
88 | // (Only maintained if TransactionLockMgr::max_num_locks_ is positive.) | |
89 | std::atomic<int64_t> lock_cnt{0}; | |
90 | ||
91 | std::vector<LockMapStripe*> lock_map_stripes_; | |
92 | ||
93 | size_t GetStripe(const std::string& key) const; | |
94 | }; | |
95 | ||
11fdf7f2 TL |
96 | void DeadlockInfoBuffer::AddNewPath(DeadlockPath path) { |
97 | std::lock_guard<std::mutex> lock(paths_buffer_mutex_); | |
98 | ||
99 | if (paths_buffer_.empty()) { | |
100 | return; | |
101 | } | |
102 | ||
494da23a | 103 | paths_buffer_[buffer_idx_] = std::move(path); |
11fdf7f2 TL |
104 | buffer_idx_ = (buffer_idx_ + 1) % paths_buffer_.size(); |
105 | } | |
106 | ||
107 | void DeadlockInfoBuffer::Resize(uint32_t target_size) { | |
108 | std::lock_guard<std::mutex> lock(paths_buffer_mutex_); | |
109 | ||
110 | paths_buffer_ = Normalize(); | |
111 | ||
112 | // Drop the deadlocks that will no longer be needed ater the normalize | |
113 | if (target_size < paths_buffer_.size()) { | |
114 | paths_buffer_.erase( | |
115 | paths_buffer_.begin(), | |
116 | paths_buffer_.begin() + (paths_buffer_.size() - target_size)); | |
117 | buffer_idx_ = 0; | |
118 | } | |
119 | // Resize the buffer to the target size and restore the buffer's idx | |
120 | else { | |
121 | auto prev_size = paths_buffer_.size(); | |
122 | paths_buffer_.resize(target_size); | |
123 | buffer_idx_ = (uint32_t)prev_size; | |
124 | } | |
125 | } | |
126 | ||
127 | std::vector<DeadlockPath> DeadlockInfoBuffer::Normalize() { | |
128 | auto working = paths_buffer_; | |
129 | ||
130 | if (working.empty()) { | |
131 | return working; | |
132 | } | |
133 | ||
134 | // Next write occurs at a nonexistent path's slot | |
135 | if (paths_buffer_[buffer_idx_].empty()) { | |
136 | working.resize(buffer_idx_); | |
137 | } else { | |
138 | std::rotate(working.begin(), working.begin() + buffer_idx_, working.end()); | |
139 | } | |
140 | ||
141 | return working; | |
142 | } | |
143 | ||
144 | std::vector<DeadlockPath> DeadlockInfoBuffer::PrepareBuffer() { | |
145 | std::lock_guard<std::mutex> lock(paths_buffer_mutex_); | |
146 | ||
147 | // Reversing the normalized vector returns the latest deadlocks first | |
148 | auto working = Normalize(); | |
149 | std::reverse(working.begin(), working.end()); | |
150 | ||
151 | return working; | |
152 | } | |
153 | ||
7c673cae FG |
154 | namespace { |
155 | void UnrefLockMapsCache(void* ptr) { | |
156 | // Called when a thread exits or a ThreadLocalPtr gets destroyed. | |
157 | auto lock_maps_cache = | |
158 | static_cast<std::unordered_map<uint32_t, std::shared_ptr<LockMap>>*>(ptr); | |
159 | delete lock_maps_cache; | |
160 | } | |
161 | } // anonymous namespace | |
162 | ||
163 | TransactionLockMgr::TransactionLockMgr( | |
164 | TransactionDB* txn_db, size_t default_num_stripes, int64_t max_num_locks, | |
11fdf7f2 | 165 | uint32_t max_num_deadlocks, |
7c673cae FG |
166 | std::shared_ptr<TransactionDBMutexFactory> mutex_factory) |
167 | : txn_db_impl_(nullptr), | |
168 | default_num_stripes_(default_num_stripes), | |
169 | max_num_locks_(max_num_locks), | |
170 | lock_maps_cache_(new ThreadLocalPtr(&UnrefLockMapsCache)), | |
11fdf7f2 | 171 | dlock_buffer_(max_num_deadlocks), |
7c673cae | 172 | mutex_factory_(mutex_factory) { |
11fdf7f2 TL |
173 | assert(txn_db); |
174 | txn_db_impl_ = | |
175 | static_cast_with_check<PessimisticTransactionDB, TransactionDB>(txn_db); | |
7c673cae FG |
176 | } |
177 | ||
178 | TransactionLockMgr::~TransactionLockMgr() {} | |
179 | ||
180 | size_t LockMap::GetStripe(const std::string& key) const { | |
181 | assert(num_stripes_ > 0); | |
f67539c2 | 182 | return fastrange64(GetSliceNPHash64(key), num_stripes_); |
7c673cae FG |
183 | } |
184 | ||
185 | void TransactionLockMgr::AddColumnFamily(uint32_t column_family_id) { | |
186 | InstrumentedMutexLock l(&lock_map_mutex_); | |
187 | ||
188 | if (lock_maps_.find(column_family_id) == lock_maps_.end()) { | |
189 | lock_maps_.emplace(column_family_id, | |
f67539c2 | 190 | std::make_shared<LockMap>(default_num_stripes_, mutex_factory_)); |
7c673cae FG |
191 | } else { |
192 | // column_family already exists in lock map | |
193 | assert(false); | |
194 | } | |
195 | } | |
196 | ||
197 | void TransactionLockMgr::RemoveColumnFamily(uint32_t column_family_id) { | |
198 | // Remove lock_map for this column family. Since the lock map is stored | |
199 | // as a shared ptr, concurrent transactions can still keep using it | |
200 | // until they release their references to it. | |
201 | { | |
202 | InstrumentedMutexLock l(&lock_map_mutex_); | |
203 | ||
204 | auto lock_maps_iter = lock_maps_.find(column_family_id); | |
205 | assert(lock_maps_iter != lock_maps_.end()); | |
206 | ||
207 | lock_maps_.erase(lock_maps_iter); | |
208 | } // lock_map_mutex_ | |
209 | ||
210 | // Clear all thread-local caches | |
211 | autovector<void*> local_caches; | |
212 | lock_maps_cache_->Scrape(&local_caches, nullptr); | |
213 | for (auto cache : local_caches) { | |
214 | delete static_cast<LockMaps*>(cache); | |
215 | } | |
216 | } | |
217 | ||
494da23a | 218 | // Look up the LockMap std::shared_ptr for a given column_family_id. |
7c673cae | 219 | // Note: The LockMap is only valid as long as the caller is still holding on |
494da23a | 220 | // to the returned std::shared_ptr. |
7c673cae FG |
221 | std::shared_ptr<LockMap> TransactionLockMgr::GetLockMap( |
222 | uint32_t column_family_id) { | |
223 | // First check thread-local cache | |
224 | if (lock_maps_cache_->Get() == nullptr) { | |
225 | lock_maps_cache_->Reset(new LockMaps()); | |
226 | } | |
227 | ||
228 | auto lock_maps_cache = static_cast<LockMaps*>(lock_maps_cache_->Get()); | |
229 | ||
230 | auto lock_map_iter = lock_maps_cache->find(column_family_id); | |
231 | if (lock_map_iter != lock_maps_cache->end()) { | |
232 | // Found lock map for this column family. | |
233 | return lock_map_iter->second; | |
234 | } | |
235 | ||
236 | // Not found in local cache, grab mutex and check shared LockMaps | |
237 | InstrumentedMutexLock l(&lock_map_mutex_); | |
238 | ||
239 | lock_map_iter = lock_maps_.find(column_family_id); | |
240 | if (lock_map_iter == lock_maps_.end()) { | |
241 | return std::shared_ptr<LockMap>(nullptr); | |
242 | } else { | |
243 | // Found lock map. Store in thread-local cache and return. | |
244 | std::shared_ptr<LockMap>& lock_map = lock_map_iter->second; | |
245 | lock_maps_cache->insert({column_family_id, lock_map}); | |
246 | ||
247 | return lock_map; | |
248 | } | |
249 | } | |
250 | ||
251 | // Returns true if this lock has expired and can be acquired by another | |
252 | // transaction. | |
253 | // If false, sets *expire_time to the expiration time of the lock according | |
254 | // to Env->GetMicros() or 0 if no expiration. | |
255 | bool TransactionLockMgr::IsLockExpired(TransactionID txn_id, | |
256 | const LockInfo& lock_info, Env* env, | |
257 | uint64_t* expire_time) { | |
258 | auto now = env->NowMicros(); | |
259 | ||
260 | bool expired = | |
261 | (lock_info.expiration_time > 0 && lock_info.expiration_time <= now); | |
262 | ||
263 | if (!expired && lock_info.expiration_time > 0) { | |
264 | // return how many microseconds until lock will be expired | |
265 | *expire_time = lock_info.expiration_time; | |
266 | } else { | |
267 | for (auto id : lock_info.txn_ids) { | |
268 | if (txn_id == id) { | |
269 | continue; | |
270 | } | |
271 | ||
272 | bool success = txn_db_impl_->TryStealingExpiredTransactionLocks(id); | |
273 | if (!success) { | |
274 | expired = false; | |
275 | break; | |
276 | } | |
277 | *expire_time = 0; | |
278 | } | |
279 | } | |
280 | ||
281 | return expired; | |
282 | } | |
283 | ||
11fdf7f2 | 284 | Status TransactionLockMgr::TryLock(PessimisticTransaction* txn, |
7c673cae FG |
285 | uint32_t column_family_id, |
286 | const std::string& key, Env* env, | |
287 | bool exclusive) { | |
288 | // Lookup lock map for this column family id | |
289 | std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id); | |
290 | LockMap* lock_map = lock_map_ptr.get(); | |
291 | if (lock_map == nullptr) { | |
292 | char msg[255]; | |
293 | snprintf(msg, sizeof(msg), "Column family id not found: %" PRIu32, | |
294 | column_family_id); | |
295 | ||
296 | return Status::InvalidArgument(msg); | |
297 | } | |
298 | ||
299 | // Need to lock the mutex for the stripe that this key hashes to | |
300 | size_t stripe_num = lock_map->GetStripe(key); | |
301 | assert(lock_map->lock_map_stripes_.size() > stripe_num); | |
302 | LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num); | |
303 | ||
304 | LockInfo lock_info(txn->GetID(), txn->GetExpirationTime(), exclusive); | |
305 | int64_t timeout = txn->GetLockTimeout(); | |
306 | ||
307 | return AcquireWithTimeout(txn, lock_map, stripe, column_family_id, key, env, | |
f67539c2 | 308 | timeout, std::move(lock_info)); |
7c673cae FG |
309 | } |
310 | ||
311 | // Helper function for TryLock(). | |
312 | Status TransactionLockMgr::AcquireWithTimeout( | |
11fdf7f2 | 313 | PessimisticTransaction* txn, LockMap* lock_map, LockMapStripe* stripe, |
7c673cae | 314 | uint32_t column_family_id, const std::string& key, Env* env, |
f67539c2 | 315 | int64_t timeout, LockInfo&& lock_info) { |
7c673cae | 316 | Status result; |
7c673cae FG |
317 | uint64_t end_time = 0; |
318 | ||
319 | if (timeout > 0) { | |
11fdf7f2 | 320 | uint64_t start_time = env->NowMicros(); |
7c673cae FG |
321 | end_time = start_time + timeout; |
322 | } | |
323 | ||
324 | if (timeout < 0) { | |
325 | // If timeout is negative, we wait indefinitely to acquire the lock | |
326 | result = stripe->stripe_mutex->Lock(); | |
327 | } else { | |
328 | result = stripe->stripe_mutex->TryLockFor(timeout); | |
329 | } | |
330 | ||
331 | if (!result.ok()) { | |
332 | // failed to acquire mutex | |
333 | return result; | |
334 | } | |
335 | ||
336 | // Acquire lock if we are able to | |
337 | uint64_t expire_time_hint = 0; | |
338 | autovector<TransactionID> wait_ids; | |
f67539c2 | 339 | result = AcquireLocked(lock_map, stripe, key, env, std::move(lock_info), |
7c673cae FG |
340 | &expire_time_hint, &wait_ids); |
341 | ||
342 | if (!result.ok() && timeout != 0) { | |
11fdf7f2 TL |
343 | PERF_TIMER_GUARD(key_lock_wait_time); |
344 | PERF_COUNTER_ADD(key_lock_wait_count, 1); | |
7c673cae FG |
345 | // If we weren't able to acquire the lock, we will keep retrying as long |
346 | // as the timeout allows. | |
347 | bool timed_out = false; | |
348 | do { | |
349 | // Decide how long to wait | |
350 | int64_t cv_end_time = -1; | |
351 | ||
352 | // Check if held lock's expiration time is sooner than our timeout | |
353 | if (expire_time_hint > 0 && | |
354 | (timeout < 0 || (timeout > 0 && expire_time_hint < end_time))) { | |
355 | // expiration time is sooner than our timeout | |
356 | cv_end_time = expire_time_hint; | |
357 | } else if (timeout >= 0) { | |
358 | cv_end_time = end_time; | |
359 | } | |
360 | ||
361 | assert(result.IsBusy() || wait_ids.size() != 0); | |
362 | ||
363 | // We are dependent on a transaction to finish, so perform deadlock | |
364 | // detection. | |
365 | if (wait_ids.size() != 0) { | |
366 | if (txn->IsDeadlockDetect()) { | |
11fdf7f2 TL |
367 | if (IncrementWaiters(txn, wait_ids, key, column_family_id, |
368 | lock_info.exclusive, env)) { | |
7c673cae FG |
369 | result = Status::Busy(Status::SubCode::kDeadlock); |
370 | stripe->stripe_mutex->UnLock(); | |
371 | return result; | |
372 | } | |
373 | } | |
374 | txn->SetWaitingTxn(wait_ids, column_family_id, &key); | |
375 | } | |
376 | ||
377 | TEST_SYNC_POINT("TransactionLockMgr::AcquireWithTimeout:WaitingTxn"); | |
378 | if (cv_end_time < 0) { | |
379 | // Wait indefinitely | |
380 | result = stripe->stripe_cv->Wait(stripe->stripe_mutex); | |
381 | } else { | |
382 | uint64_t now = env->NowMicros(); | |
383 | if (static_cast<uint64_t>(cv_end_time) > now) { | |
384 | result = stripe->stripe_cv->WaitFor(stripe->stripe_mutex, | |
385 | cv_end_time - now); | |
386 | } | |
387 | } | |
388 | ||
389 | if (wait_ids.size() != 0) { | |
390 | txn->ClearWaitingTxn(); | |
391 | if (txn->IsDeadlockDetect()) { | |
392 | DecrementWaiters(txn, wait_ids); | |
393 | } | |
394 | } | |
395 | ||
396 | if (result.IsTimedOut()) { | |
397 | timed_out = true; | |
398 | // Even though we timed out, we will still make one more attempt to | |
399 | // acquire lock below (it is possible the lock expired and we | |
400 | // were never signaled). | |
401 | } | |
402 | ||
403 | if (result.ok() || result.IsTimedOut()) { | |
f67539c2 | 404 | result = AcquireLocked(lock_map, stripe, key, env, std::move(lock_info), |
7c673cae FG |
405 | &expire_time_hint, &wait_ids); |
406 | } | |
407 | } while (!result.ok() && !timed_out); | |
408 | } | |
409 | ||
410 | stripe->stripe_mutex->UnLock(); | |
411 | ||
412 | return result; | |
413 | } | |
414 | ||
415 | void TransactionLockMgr::DecrementWaiters( | |
11fdf7f2 TL |
416 | const PessimisticTransaction* txn, |
417 | const autovector<TransactionID>& wait_ids) { | |
7c673cae FG |
418 | std::lock_guard<std::mutex> lock(wait_txn_map_mutex_); |
419 | DecrementWaitersImpl(txn, wait_ids); | |
420 | } | |
421 | ||
422 | void TransactionLockMgr::DecrementWaitersImpl( | |
11fdf7f2 TL |
423 | const PessimisticTransaction* txn, |
424 | const autovector<TransactionID>& wait_ids) { | |
7c673cae FG |
425 | auto id = txn->GetID(); |
426 | assert(wait_txn_map_.Contains(id)); | |
427 | wait_txn_map_.Delete(id); | |
428 | ||
429 | for (auto wait_id : wait_ids) { | |
430 | rev_wait_txn_map_.Get(wait_id)--; | |
431 | if (rev_wait_txn_map_.Get(wait_id) == 0) { | |
432 | rev_wait_txn_map_.Delete(wait_id); | |
433 | } | |
434 | } | |
435 | } | |
436 | ||
437 | bool TransactionLockMgr::IncrementWaiters( | |
11fdf7f2 TL |
438 | const PessimisticTransaction* txn, |
439 | const autovector<TransactionID>& wait_ids, const std::string& key, | |
440 | const uint32_t& cf_id, const bool& exclusive, Env* const env) { | |
7c673cae | 441 | auto id = txn->GetID(); |
11fdf7f2 TL |
442 | std::vector<int> queue_parents(static_cast<size_t>(txn->GetDeadlockDetectDepth())); |
443 | std::vector<TransactionID> queue_values(static_cast<size_t>(txn->GetDeadlockDetectDepth())); | |
7c673cae FG |
444 | std::lock_guard<std::mutex> lock(wait_txn_map_mutex_); |
445 | assert(!wait_txn_map_.Contains(id)); | |
11fdf7f2 | 446 | |
f67539c2 | 447 | wait_txn_map_.Insert(id, {wait_ids, cf_id, exclusive, key}); |
7c673cae FG |
448 | |
449 | for (auto wait_id : wait_ids) { | |
450 | if (rev_wait_txn_map_.Contains(wait_id)) { | |
451 | rev_wait_txn_map_.Get(wait_id)++; | |
452 | } else { | |
453 | rev_wait_txn_map_.Insert(wait_id, 1); | |
454 | } | |
455 | } | |
456 | ||
457 | // No deadlock if nobody is waiting on self. | |
458 | if (!rev_wait_txn_map_.Contains(id)) { | |
459 | return false; | |
460 | } | |
461 | ||
462 | const auto* next_ids = &wait_ids; | |
11fdf7f2 TL |
463 | int parent = -1; |
464 | int64_t deadlock_time = 0; | |
7c673cae FG |
465 | for (int tail = 0, head = 0; head < txn->GetDeadlockDetectDepth(); head++) { |
466 | int i = 0; | |
467 | if (next_ids) { | |
468 | for (; i < static_cast<int>(next_ids->size()) && | |
469 | tail + i < txn->GetDeadlockDetectDepth(); | |
470 | i++) { | |
11fdf7f2 TL |
471 | queue_values[tail + i] = (*next_ids)[i]; |
472 | queue_parents[tail + i] = parent; | |
7c673cae FG |
473 | } |
474 | tail += i; | |
475 | } | |
476 | ||
477 | // No more items in the list, meaning no deadlock. | |
478 | if (tail == head) { | |
479 | return false; | |
480 | } | |
481 | ||
11fdf7f2 | 482 | auto next = queue_values[head]; |
7c673cae | 483 | if (next == id) { |
11fdf7f2 TL |
484 | std::vector<DeadlockInfo> path; |
485 | while (head != -1) { | |
486 | assert(wait_txn_map_.Contains(queue_values[head])); | |
487 | ||
488 | auto extracted_info = wait_txn_map_.Get(queue_values[head]); | |
489 | path.push_back({queue_values[head], extracted_info.m_cf_id, | |
494da23a TL |
490 | extracted_info.m_exclusive, |
491 | extracted_info.m_waiting_key}); | |
11fdf7f2 TL |
492 | head = queue_parents[head]; |
493 | } | |
494 | env->GetCurrentTime(&deadlock_time); | |
495 | std::reverse(path.begin(), path.end()); | |
496 | dlock_buffer_.AddNewPath(DeadlockPath(path, deadlock_time)); | |
497 | deadlock_time = 0; | |
7c673cae FG |
498 | DecrementWaitersImpl(txn, wait_ids); |
499 | return true; | |
500 | } else if (!wait_txn_map_.Contains(next)) { | |
501 | next_ids = nullptr; | |
502 | continue; | |
503 | } else { | |
11fdf7f2 TL |
504 | parent = head; |
505 | next_ids = &(wait_txn_map_.Get(next).m_neighbors); | |
7c673cae FG |
506 | } |
507 | } | |
508 | ||
509 | // Wait cycle too big, just assume deadlock. | |
11fdf7f2 TL |
510 | env->GetCurrentTime(&deadlock_time); |
511 | dlock_buffer_.AddNewPath(DeadlockPath(deadlock_time, true)); | |
7c673cae FG |
512 | DecrementWaitersImpl(txn, wait_ids); |
513 | return true; | |
514 | } | |
515 | ||
516 | // Try to lock this key after we have acquired the mutex. | |
517 | // Sets *expire_time to the expiration time in microseconds | |
518 | // or 0 if no expiration. | |
519 | // REQUIRED: Stripe mutex must be held. | |
520 | Status TransactionLockMgr::AcquireLocked(LockMap* lock_map, | |
521 | LockMapStripe* stripe, | |
522 | const std::string& key, Env* env, | |
f67539c2 | 523 | LockInfo&& txn_lock_info, |
7c673cae FG |
524 | uint64_t* expire_time, |
525 | autovector<TransactionID>* txn_ids) { | |
526 | assert(txn_lock_info.txn_ids.size() == 1); | |
527 | ||
528 | Status result; | |
529 | // Check if this key is already locked | |
11fdf7f2 TL |
530 | auto stripe_iter = stripe->keys.find(key); |
531 | if (stripe_iter != stripe->keys.end()) { | |
7c673cae | 532 | // Lock already held |
11fdf7f2 | 533 | LockInfo& lock_info = stripe_iter->second; |
7c673cae FG |
534 | assert(lock_info.txn_ids.size() == 1 || !lock_info.exclusive); |
535 | ||
536 | if (lock_info.exclusive || txn_lock_info.exclusive) { | |
537 | if (lock_info.txn_ids.size() == 1 && | |
538 | lock_info.txn_ids[0] == txn_lock_info.txn_ids[0]) { | |
539 | // The list contains one txn and we're it, so just take it. | |
540 | lock_info.exclusive = txn_lock_info.exclusive; | |
541 | lock_info.expiration_time = txn_lock_info.expiration_time; | |
542 | } else { | |
543 | // Check if it's expired. Skips over txn_lock_info.txn_ids[0] in case | |
544 | // it's there for a shared lock with multiple holders which was not | |
545 | // caught in the first case. | |
546 | if (IsLockExpired(txn_lock_info.txn_ids[0], lock_info, env, | |
547 | expire_time)) { | |
548 | // lock is expired, can steal it | |
549 | lock_info.txn_ids = txn_lock_info.txn_ids; | |
550 | lock_info.exclusive = txn_lock_info.exclusive; | |
551 | lock_info.expiration_time = txn_lock_info.expiration_time; | |
552 | // lock_cnt does not change | |
553 | } else { | |
554 | result = Status::TimedOut(Status::SubCode::kLockTimeout); | |
555 | *txn_ids = lock_info.txn_ids; | |
556 | } | |
557 | } | |
558 | } else { | |
559 | // We are requesting shared access to a shared lock, so just grant it. | |
560 | lock_info.txn_ids.push_back(txn_lock_info.txn_ids[0]); | |
561 | // Using std::max means that expiration time never goes down even when | |
562 | // a transaction is removed from the list. The correct solution would be | |
563 | // to track expiry for every transaction, but this would also work for | |
564 | // now. | |
565 | lock_info.expiration_time = | |
566 | std::max(lock_info.expiration_time, txn_lock_info.expiration_time); | |
567 | } | |
568 | } else { // Lock not held. | |
569 | // Check lock limit | |
570 | if (max_num_locks_ > 0 && | |
571 | lock_map->lock_cnt.load(std::memory_order_acquire) >= max_num_locks_) { | |
572 | result = Status::Busy(Status::SubCode::kLockLimit); | |
573 | } else { | |
574 | // acquire lock | |
f67539c2 | 575 | stripe->keys.emplace(key, std::move(txn_lock_info)); |
7c673cae FG |
576 | |
577 | // Maintain lock count if there is a limit on the number of locks | |
578 | if (max_num_locks_) { | |
579 | lock_map->lock_cnt++; | |
580 | } | |
581 | } | |
582 | } | |
583 | ||
584 | return result; | |
585 | } | |
586 | ||
11fdf7f2 | 587 | void TransactionLockMgr::UnLockKey(const PessimisticTransaction* txn, |
7c673cae FG |
588 | const std::string& key, |
589 | LockMapStripe* stripe, LockMap* lock_map, | |
590 | Env* env) { | |
11fdf7f2 TL |
591 | #ifdef NDEBUG |
592 | (void)env; | |
593 | #endif | |
7c673cae FG |
594 | TransactionID txn_id = txn->GetID(); |
595 | ||
596 | auto stripe_iter = stripe->keys.find(key); | |
597 | if (stripe_iter != stripe->keys.end()) { | |
598 | auto& txns = stripe_iter->second.txn_ids; | |
599 | auto txn_it = std::find(txns.begin(), txns.end(), txn_id); | |
600 | // Found the key we locked. unlock it. | |
601 | if (txn_it != txns.end()) { | |
602 | if (txns.size() == 1) { | |
603 | stripe->keys.erase(stripe_iter); | |
604 | } else { | |
605 | auto last_it = txns.end() - 1; | |
606 | if (txn_it != last_it) { | |
607 | *txn_it = *last_it; | |
608 | } | |
609 | txns.pop_back(); | |
610 | } | |
611 | ||
612 | if (max_num_locks_ > 0) { | |
613 | // Maintain lock count if there is a limit on the number of locks. | |
614 | assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0); | |
615 | lock_map->lock_cnt--; | |
616 | } | |
617 | } | |
618 | } else { | |
619 | // This key is either not locked or locked by someone else. This should | |
620 | // only happen if the unlocking transaction has expired. | |
621 | assert(txn->GetExpirationTime() > 0 && | |
622 | txn->GetExpirationTime() < env->NowMicros()); | |
623 | } | |
624 | } | |
625 | ||
11fdf7f2 TL |
626 | void TransactionLockMgr::UnLock(PessimisticTransaction* txn, |
627 | uint32_t column_family_id, | |
7c673cae FG |
628 | const std::string& key, Env* env) { |
629 | std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id); | |
630 | LockMap* lock_map = lock_map_ptr.get(); | |
631 | if (lock_map == nullptr) { | |
632 | // Column Family must have been dropped. | |
633 | return; | |
634 | } | |
635 | ||
636 | // Lock the mutex for the stripe that this key hashes to | |
637 | size_t stripe_num = lock_map->GetStripe(key); | |
638 | assert(lock_map->lock_map_stripes_.size() > stripe_num); | |
639 | LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num); | |
640 | ||
641 | stripe->stripe_mutex->Lock(); | |
642 | UnLockKey(txn, key, stripe, lock_map, env); | |
643 | stripe->stripe_mutex->UnLock(); | |
644 | ||
645 | // Signal waiting threads to retry locking | |
646 | stripe->stripe_cv->NotifyAll(); | |
647 | } | |
648 | ||
11fdf7f2 | 649 | void TransactionLockMgr::UnLock(const PessimisticTransaction* txn, |
7c673cae FG |
650 | const TransactionKeyMap* key_map, Env* env) { |
651 | for (auto& key_map_iter : *key_map) { | |
652 | uint32_t column_family_id = key_map_iter.first; | |
653 | auto& keys = key_map_iter.second; | |
654 | ||
655 | std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id); | |
656 | LockMap* lock_map = lock_map_ptr.get(); | |
657 | ||
658 | if (lock_map == nullptr) { | |
659 | // Column Family must have been dropped. | |
660 | return; | |
661 | } | |
662 | ||
663 | // Bucket keys by lock_map_ stripe | |
664 | std::unordered_map<size_t, std::vector<const std::string*>> keys_by_stripe( | |
665 | std::max(keys.size(), lock_map->num_stripes_)); | |
666 | ||
667 | for (auto& key_iter : keys) { | |
668 | const std::string& key = key_iter.first; | |
669 | ||
670 | size_t stripe_num = lock_map->GetStripe(key); | |
671 | keys_by_stripe[stripe_num].push_back(&key); | |
672 | } | |
673 | ||
674 | // For each stripe, grab the stripe mutex and unlock all keys in this stripe | |
675 | for (auto& stripe_iter : keys_by_stripe) { | |
676 | size_t stripe_num = stripe_iter.first; | |
677 | auto& stripe_keys = stripe_iter.second; | |
678 | ||
679 | assert(lock_map->lock_map_stripes_.size() > stripe_num); | |
680 | LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num); | |
681 | ||
682 | stripe->stripe_mutex->Lock(); | |
683 | ||
684 | for (const std::string* key : stripe_keys) { | |
685 | UnLockKey(txn, *key, stripe, lock_map, env); | |
686 | } | |
687 | ||
688 | stripe->stripe_mutex->UnLock(); | |
689 | ||
690 | // Signal waiting threads to retry locking | |
691 | stripe->stripe_cv->NotifyAll(); | |
692 | } | |
693 | } | |
694 | } | |
695 | ||
696 | TransactionLockMgr::LockStatusData TransactionLockMgr::GetLockStatusData() { | |
697 | LockStatusData data; | |
698 | // Lock order here is important. The correct order is lock_map_mutex_, then | |
699 | // for every column family ID in ascending order lock every stripe in | |
700 | // ascending order. | |
701 | InstrumentedMutexLock l(&lock_map_mutex_); | |
702 | ||
703 | std::vector<uint32_t> cf_ids; | |
704 | for (const auto& map : lock_maps_) { | |
705 | cf_ids.push_back(map.first); | |
706 | } | |
707 | std::sort(cf_ids.begin(), cf_ids.end()); | |
708 | ||
709 | for (auto i : cf_ids) { | |
710 | const auto& stripes = lock_maps_[i]->lock_map_stripes_; | |
711 | // Iterate and lock all stripes in ascending order. | |
712 | for (const auto& j : stripes) { | |
713 | j->stripe_mutex->Lock(); | |
714 | for (const auto& it : j->keys) { | |
715 | struct KeyLockInfo info; | |
716 | info.exclusive = it.second.exclusive; | |
717 | info.key = it.first; | |
718 | for (const auto& id : it.second.txn_ids) { | |
719 | info.ids.push_back(id); | |
720 | } | |
721 | data.insert({i, info}); | |
722 | } | |
723 | } | |
724 | } | |
725 | ||
726 | // Unlock everything. Unlocking order is not important. | |
727 | for (auto i : cf_ids) { | |
728 | const auto& stripes = lock_maps_[i]->lock_map_stripes_; | |
729 | for (const auto& j : stripes) { | |
730 | j->stripe_mutex->UnLock(); | |
731 | } | |
732 | } | |
733 | ||
734 | return data; | |
735 | } | |
11fdf7f2 TL |
736 | std::vector<DeadlockPath> TransactionLockMgr::GetDeadlockInfoBuffer() { |
737 | return dlock_buffer_.PrepareBuffer(); | |
738 | } | |
739 | ||
740 | void TransactionLockMgr::Resize(uint32_t target_size) { | |
741 | dlock_buffer_.Resize(target_size); | |
742 | } | |
7c673cae | 743 | |
f67539c2 | 744 | } // namespace ROCKSDB_NAMESPACE |
7c673cae | 745 | #endif // ROCKSDB_LITE |