]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | |
6 | #ifndef ROCKSDB_LITE | |
7 | ||
8 | #ifndef __STDC_FORMAT_MACROS | |
9 | #define __STDC_FORMAT_MACROS | |
10 | #endif | |
11 | ||
12 | #include "utilities/transactions/transaction_lock_mgr.h" | |
13 | ||
14 | #include <inttypes.h> | |
15 | ||
16 | #include <algorithm> | |
17 | #include <condition_variable> | |
18 | #include <functional> | |
19 | #include <mutex> | |
20 | #include <string> | |
21 | #include <vector> | |
22 | ||
11fdf7f2 | 23 | #include "monitoring/perf_context_imp.h" |
7c673cae FG |
24 | #include "rocksdb/slice.h" |
25 | #include "rocksdb/utilities/transaction_db_mutex.h" | |
11fdf7f2 | 26 | #include "util/cast_util.h" |
494da23a | 27 | #include "util/hash.h" |
7c673cae FG |
28 | #include "util/sync_point.h" |
29 | #include "util/thread_local.h" | |
11fdf7f2 | 30 | #include "utilities/transactions/pessimistic_transaction_db.h" |
7c673cae FG |
31 | |
32 | namespace rocksdb { | |
33 | ||
34 | struct LockInfo { | |
35 | bool exclusive; | |
36 | autovector<TransactionID> txn_ids; | |
37 | ||
38 | // Transaction locks are not valid after this time in us | |
39 | uint64_t expiration_time; | |
40 | ||
41 | LockInfo(TransactionID id, uint64_t time, bool ex) | |
42 | : exclusive(ex), expiration_time(time) { | |
43 | txn_ids.push_back(id); | |
44 | } | |
45 | LockInfo(const LockInfo& lock_info) | |
46 | : exclusive(lock_info.exclusive), | |
47 | txn_ids(lock_info.txn_ids), | |
48 | expiration_time(lock_info.expiration_time) {} | |
49 | }; | |
50 | ||
51 | struct LockMapStripe { | |
52 | explicit LockMapStripe(std::shared_ptr<TransactionDBMutexFactory> factory) { | |
53 | stripe_mutex = factory->AllocateMutex(); | |
54 | stripe_cv = factory->AllocateCondVar(); | |
55 | assert(stripe_mutex); | |
56 | assert(stripe_cv); | |
57 | } | |
58 | ||
59 | // Mutex must be held before modifying keys map | |
60 | std::shared_ptr<TransactionDBMutex> stripe_mutex; | |
61 | ||
62 | // Condition Variable per stripe for waiting on a lock | |
63 | std::shared_ptr<TransactionDBCondVar> stripe_cv; | |
64 | ||
65 | // Locked keys mapped to the info about the transactions that locked them. | |
66 | // TODO(agiardullo): Explore performance of other data structures. | |
67 | std::unordered_map<std::string, LockInfo> keys; | |
68 | }; | |
69 | ||
70 | // Map of #num_stripes LockMapStripes | |
71 | struct LockMap { | |
72 | explicit LockMap(size_t num_stripes, | |
73 | std::shared_ptr<TransactionDBMutexFactory> factory) | |
74 | : num_stripes_(num_stripes) { | |
75 | lock_map_stripes_.reserve(num_stripes); | |
76 | for (size_t i = 0; i < num_stripes; i++) { | |
77 | LockMapStripe* stripe = new LockMapStripe(factory); | |
78 | lock_map_stripes_.push_back(stripe); | |
79 | } | |
80 | } | |
81 | ||
82 | ~LockMap() { | |
83 | for (auto stripe : lock_map_stripes_) { | |
84 | delete stripe; | |
85 | } | |
86 | } | |
87 | ||
88 | // Number of sepearate LockMapStripes to create, each with their own Mutex | |
89 | const size_t num_stripes_; | |
90 | ||
91 | // Count of keys that are currently locked in this column family. | |
92 | // (Only maintained if TransactionLockMgr::max_num_locks_ is positive.) | |
93 | std::atomic<int64_t> lock_cnt{0}; | |
94 | ||
95 | std::vector<LockMapStripe*> lock_map_stripes_; | |
96 | ||
97 | size_t GetStripe(const std::string& key) const; | |
98 | }; | |
99 | ||
11fdf7f2 TL |
100 | void DeadlockInfoBuffer::AddNewPath(DeadlockPath path) { |
101 | std::lock_guard<std::mutex> lock(paths_buffer_mutex_); | |
102 | ||
103 | if (paths_buffer_.empty()) { | |
104 | return; | |
105 | } | |
106 | ||
494da23a | 107 | paths_buffer_[buffer_idx_] = std::move(path); |
11fdf7f2 TL |
108 | buffer_idx_ = (buffer_idx_ + 1) % paths_buffer_.size(); |
109 | } | |
110 | ||
111 | void DeadlockInfoBuffer::Resize(uint32_t target_size) { | |
112 | std::lock_guard<std::mutex> lock(paths_buffer_mutex_); | |
113 | ||
114 | paths_buffer_ = Normalize(); | |
115 | ||
116 | // Drop the deadlocks that will no longer be needed ater the normalize | |
117 | if (target_size < paths_buffer_.size()) { | |
118 | paths_buffer_.erase( | |
119 | paths_buffer_.begin(), | |
120 | paths_buffer_.begin() + (paths_buffer_.size() - target_size)); | |
121 | buffer_idx_ = 0; | |
122 | } | |
123 | // Resize the buffer to the target size and restore the buffer's idx | |
124 | else { | |
125 | auto prev_size = paths_buffer_.size(); | |
126 | paths_buffer_.resize(target_size); | |
127 | buffer_idx_ = (uint32_t)prev_size; | |
128 | } | |
129 | } | |
130 | ||
131 | std::vector<DeadlockPath> DeadlockInfoBuffer::Normalize() { | |
132 | auto working = paths_buffer_; | |
133 | ||
134 | if (working.empty()) { | |
135 | return working; | |
136 | } | |
137 | ||
138 | // Next write occurs at a nonexistent path's slot | |
139 | if (paths_buffer_[buffer_idx_].empty()) { | |
140 | working.resize(buffer_idx_); | |
141 | } else { | |
142 | std::rotate(working.begin(), working.begin() + buffer_idx_, working.end()); | |
143 | } | |
144 | ||
145 | return working; | |
146 | } | |
147 | ||
148 | std::vector<DeadlockPath> DeadlockInfoBuffer::PrepareBuffer() { | |
149 | std::lock_guard<std::mutex> lock(paths_buffer_mutex_); | |
150 | ||
151 | // Reversing the normalized vector returns the latest deadlocks first | |
152 | auto working = Normalize(); | |
153 | std::reverse(working.begin(), working.end()); | |
154 | ||
155 | return working; | |
156 | } | |
157 | ||
7c673cae FG |
158 | namespace { |
159 | void UnrefLockMapsCache(void* ptr) { | |
160 | // Called when a thread exits or a ThreadLocalPtr gets destroyed. | |
161 | auto lock_maps_cache = | |
162 | static_cast<std::unordered_map<uint32_t, std::shared_ptr<LockMap>>*>(ptr); | |
163 | delete lock_maps_cache; | |
164 | } | |
165 | } // anonymous namespace | |
166 | ||
167 | TransactionLockMgr::TransactionLockMgr( | |
168 | TransactionDB* txn_db, size_t default_num_stripes, int64_t max_num_locks, | |
11fdf7f2 | 169 | uint32_t max_num_deadlocks, |
7c673cae FG |
170 | std::shared_ptr<TransactionDBMutexFactory> mutex_factory) |
171 | : txn_db_impl_(nullptr), | |
172 | default_num_stripes_(default_num_stripes), | |
173 | max_num_locks_(max_num_locks), | |
174 | lock_maps_cache_(new ThreadLocalPtr(&UnrefLockMapsCache)), | |
11fdf7f2 | 175 | dlock_buffer_(max_num_deadlocks), |
7c673cae | 176 | mutex_factory_(mutex_factory) { |
11fdf7f2 TL |
177 | assert(txn_db); |
178 | txn_db_impl_ = | |
179 | static_cast_with_check<PessimisticTransactionDB, TransactionDB>(txn_db); | |
7c673cae FG |
180 | } |
181 | ||
182 | TransactionLockMgr::~TransactionLockMgr() {} | |
183 | ||
184 | size_t LockMap::GetStripe(const std::string& key) const { | |
185 | assert(num_stripes_ > 0); | |
494da23a | 186 | size_t stripe = static_cast<size_t>(GetSliceNPHash64(key)) % num_stripes_; |
7c673cae FG |
187 | return stripe; |
188 | } | |
189 | ||
190 | void TransactionLockMgr::AddColumnFamily(uint32_t column_family_id) { | |
191 | InstrumentedMutexLock l(&lock_map_mutex_); | |
192 | ||
193 | if (lock_maps_.find(column_family_id) == lock_maps_.end()) { | |
194 | lock_maps_.emplace(column_family_id, | |
195 | std::shared_ptr<LockMap>( | |
196 | new LockMap(default_num_stripes_, mutex_factory_))); | |
197 | } else { | |
198 | // column_family already exists in lock map | |
199 | assert(false); | |
200 | } | |
201 | } | |
202 | ||
203 | void TransactionLockMgr::RemoveColumnFamily(uint32_t column_family_id) { | |
204 | // Remove lock_map for this column family. Since the lock map is stored | |
205 | // as a shared ptr, concurrent transactions can still keep using it | |
206 | // until they release their references to it. | |
207 | { | |
208 | InstrumentedMutexLock l(&lock_map_mutex_); | |
209 | ||
210 | auto lock_maps_iter = lock_maps_.find(column_family_id); | |
211 | assert(lock_maps_iter != lock_maps_.end()); | |
212 | ||
213 | lock_maps_.erase(lock_maps_iter); | |
214 | } // lock_map_mutex_ | |
215 | ||
216 | // Clear all thread-local caches | |
217 | autovector<void*> local_caches; | |
218 | lock_maps_cache_->Scrape(&local_caches, nullptr); | |
219 | for (auto cache : local_caches) { | |
220 | delete static_cast<LockMaps*>(cache); | |
221 | } | |
222 | } | |
223 | ||
494da23a | 224 | // Look up the LockMap std::shared_ptr for a given column_family_id. |
7c673cae | 225 | // Note: The LockMap is only valid as long as the caller is still holding on |
494da23a | 226 | // to the returned std::shared_ptr. |
7c673cae FG |
227 | std::shared_ptr<LockMap> TransactionLockMgr::GetLockMap( |
228 | uint32_t column_family_id) { | |
229 | // First check thread-local cache | |
230 | if (lock_maps_cache_->Get() == nullptr) { | |
231 | lock_maps_cache_->Reset(new LockMaps()); | |
232 | } | |
233 | ||
234 | auto lock_maps_cache = static_cast<LockMaps*>(lock_maps_cache_->Get()); | |
235 | ||
236 | auto lock_map_iter = lock_maps_cache->find(column_family_id); | |
237 | if (lock_map_iter != lock_maps_cache->end()) { | |
238 | // Found lock map for this column family. | |
239 | return lock_map_iter->second; | |
240 | } | |
241 | ||
242 | // Not found in local cache, grab mutex and check shared LockMaps | |
243 | InstrumentedMutexLock l(&lock_map_mutex_); | |
244 | ||
245 | lock_map_iter = lock_maps_.find(column_family_id); | |
246 | if (lock_map_iter == lock_maps_.end()) { | |
247 | return std::shared_ptr<LockMap>(nullptr); | |
248 | } else { | |
249 | // Found lock map. Store in thread-local cache and return. | |
250 | std::shared_ptr<LockMap>& lock_map = lock_map_iter->second; | |
251 | lock_maps_cache->insert({column_family_id, lock_map}); | |
252 | ||
253 | return lock_map; | |
254 | } | |
255 | } | |
256 | ||
257 | // Returns true if this lock has expired and can be acquired by another | |
258 | // transaction. | |
259 | // If false, sets *expire_time to the expiration time of the lock according | |
260 | // to Env->GetMicros() or 0 if no expiration. | |
261 | bool TransactionLockMgr::IsLockExpired(TransactionID txn_id, | |
262 | const LockInfo& lock_info, Env* env, | |
263 | uint64_t* expire_time) { | |
264 | auto now = env->NowMicros(); | |
265 | ||
266 | bool expired = | |
267 | (lock_info.expiration_time > 0 && lock_info.expiration_time <= now); | |
268 | ||
269 | if (!expired && lock_info.expiration_time > 0) { | |
270 | // return how many microseconds until lock will be expired | |
271 | *expire_time = lock_info.expiration_time; | |
272 | } else { | |
273 | for (auto id : lock_info.txn_ids) { | |
274 | if (txn_id == id) { | |
275 | continue; | |
276 | } | |
277 | ||
278 | bool success = txn_db_impl_->TryStealingExpiredTransactionLocks(id); | |
279 | if (!success) { | |
280 | expired = false; | |
281 | break; | |
282 | } | |
283 | *expire_time = 0; | |
284 | } | |
285 | } | |
286 | ||
287 | return expired; | |
288 | } | |
289 | ||
11fdf7f2 | 290 | Status TransactionLockMgr::TryLock(PessimisticTransaction* txn, |
7c673cae FG |
291 | uint32_t column_family_id, |
292 | const std::string& key, Env* env, | |
293 | bool exclusive) { | |
294 | // Lookup lock map for this column family id | |
295 | std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id); | |
296 | LockMap* lock_map = lock_map_ptr.get(); | |
297 | if (lock_map == nullptr) { | |
298 | char msg[255]; | |
299 | snprintf(msg, sizeof(msg), "Column family id not found: %" PRIu32, | |
300 | column_family_id); | |
301 | ||
302 | return Status::InvalidArgument(msg); | |
303 | } | |
304 | ||
305 | // Need to lock the mutex for the stripe that this key hashes to | |
306 | size_t stripe_num = lock_map->GetStripe(key); | |
307 | assert(lock_map->lock_map_stripes_.size() > stripe_num); | |
308 | LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num); | |
309 | ||
310 | LockInfo lock_info(txn->GetID(), txn->GetExpirationTime(), exclusive); | |
311 | int64_t timeout = txn->GetLockTimeout(); | |
312 | ||
313 | return AcquireWithTimeout(txn, lock_map, stripe, column_family_id, key, env, | |
314 | timeout, lock_info); | |
315 | } | |
316 | ||
317 | // Helper function for TryLock(). | |
318 | Status TransactionLockMgr::AcquireWithTimeout( | |
11fdf7f2 | 319 | PessimisticTransaction* txn, LockMap* lock_map, LockMapStripe* stripe, |
7c673cae FG |
320 | uint32_t column_family_id, const std::string& key, Env* env, |
321 | int64_t timeout, const LockInfo& lock_info) { | |
322 | Status result; | |
7c673cae FG |
323 | uint64_t end_time = 0; |
324 | ||
325 | if (timeout > 0) { | |
11fdf7f2 | 326 | uint64_t start_time = env->NowMicros(); |
7c673cae FG |
327 | end_time = start_time + timeout; |
328 | } | |
329 | ||
330 | if (timeout < 0) { | |
331 | // If timeout is negative, we wait indefinitely to acquire the lock | |
332 | result = stripe->stripe_mutex->Lock(); | |
333 | } else { | |
334 | result = stripe->stripe_mutex->TryLockFor(timeout); | |
335 | } | |
336 | ||
337 | if (!result.ok()) { | |
338 | // failed to acquire mutex | |
339 | return result; | |
340 | } | |
341 | ||
342 | // Acquire lock if we are able to | |
343 | uint64_t expire_time_hint = 0; | |
344 | autovector<TransactionID> wait_ids; | |
345 | result = AcquireLocked(lock_map, stripe, key, env, lock_info, | |
346 | &expire_time_hint, &wait_ids); | |
347 | ||
348 | if (!result.ok() && timeout != 0) { | |
11fdf7f2 TL |
349 | PERF_TIMER_GUARD(key_lock_wait_time); |
350 | PERF_COUNTER_ADD(key_lock_wait_count, 1); | |
7c673cae FG |
351 | // If we weren't able to acquire the lock, we will keep retrying as long |
352 | // as the timeout allows. | |
353 | bool timed_out = false; | |
354 | do { | |
355 | // Decide how long to wait | |
356 | int64_t cv_end_time = -1; | |
357 | ||
358 | // Check if held lock's expiration time is sooner than our timeout | |
359 | if (expire_time_hint > 0 && | |
360 | (timeout < 0 || (timeout > 0 && expire_time_hint < end_time))) { | |
361 | // expiration time is sooner than our timeout | |
362 | cv_end_time = expire_time_hint; | |
363 | } else if (timeout >= 0) { | |
364 | cv_end_time = end_time; | |
365 | } | |
366 | ||
367 | assert(result.IsBusy() || wait_ids.size() != 0); | |
368 | ||
369 | // We are dependent on a transaction to finish, so perform deadlock | |
370 | // detection. | |
371 | if (wait_ids.size() != 0) { | |
372 | if (txn->IsDeadlockDetect()) { | |
11fdf7f2 TL |
373 | if (IncrementWaiters(txn, wait_ids, key, column_family_id, |
374 | lock_info.exclusive, env)) { | |
7c673cae FG |
375 | result = Status::Busy(Status::SubCode::kDeadlock); |
376 | stripe->stripe_mutex->UnLock(); | |
377 | return result; | |
378 | } | |
379 | } | |
380 | txn->SetWaitingTxn(wait_ids, column_family_id, &key); | |
381 | } | |
382 | ||
383 | TEST_SYNC_POINT("TransactionLockMgr::AcquireWithTimeout:WaitingTxn"); | |
384 | if (cv_end_time < 0) { | |
385 | // Wait indefinitely | |
386 | result = stripe->stripe_cv->Wait(stripe->stripe_mutex); | |
387 | } else { | |
388 | uint64_t now = env->NowMicros(); | |
389 | if (static_cast<uint64_t>(cv_end_time) > now) { | |
390 | result = stripe->stripe_cv->WaitFor(stripe->stripe_mutex, | |
391 | cv_end_time - now); | |
392 | } | |
393 | } | |
394 | ||
395 | if (wait_ids.size() != 0) { | |
396 | txn->ClearWaitingTxn(); | |
397 | if (txn->IsDeadlockDetect()) { | |
398 | DecrementWaiters(txn, wait_ids); | |
399 | } | |
400 | } | |
401 | ||
402 | if (result.IsTimedOut()) { | |
403 | timed_out = true; | |
404 | // Even though we timed out, we will still make one more attempt to | |
405 | // acquire lock below (it is possible the lock expired and we | |
406 | // were never signaled). | |
407 | } | |
408 | ||
409 | if (result.ok() || result.IsTimedOut()) { | |
410 | result = AcquireLocked(lock_map, stripe, key, env, lock_info, | |
411 | &expire_time_hint, &wait_ids); | |
412 | } | |
413 | } while (!result.ok() && !timed_out); | |
414 | } | |
415 | ||
416 | stripe->stripe_mutex->UnLock(); | |
417 | ||
418 | return result; | |
419 | } | |
420 | ||
421 | void TransactionLockMgr::DecrementWaiters( | |
11fdf7f2 TL |
422 | const PessimisticTransaction* txn, |
423 | const autovector<TransactionID>& wait_ids) { | |
7c673cae FG |
424 | std::lock_guard<std::mutex> lock(wait_txn_map_mutex_); |
425 | DecrementWaitersImpl(txn, wait_ids); | |
426 | } | |
427 | ||
428 | void TransactionLockMgr::DecrementWaitersImpl( | |
11fdf7f2 TL |
429 | const PessimisticTransaction* txn, |
430 | const autovector<TransactionID>& wait_ids) { | |
7c673cae FG |
431 | auto id = txn->GetID(); |
432 | assert(wait_txn_map_.Contains(id)); | |
433 | wait_txn_map_.Delete(id); | |
434 | ||
435 | for (auto wait_id : wait_ids) { | |
436 | rev_wait_txn_map_.Get(wait_id)--; | |
437 | if (rev_wait_txn_map_.Get(wait_id) == 0) { | |
438 | rev_wait_txn_map_.Delete(wait_id); | |
439 | } | |
440 | } | |
441 | } | |
442 | ||
443 | bool TransactionLockMgr::IncrementWaiters( | |
11fdf7f2 TL |
444 | const PessimisticTransaction* txn, |
445 | const autovector<TransactionID>& wait_ids, const std::string& key, | |
446 | const uint32_t& cf_id, const bool& exclusive, Env* const env) { | |
7c673cae | 447 | auto id = txn->GetID(); |
11fdf7f2 TL |
448 | std::vector<int> queue_parents(static_cast<size_t>(txn->GetDeadlockDetectDepth())); |
449 | std::vector<TransactionID> queue_values(static_cast<size_t>(txn->GetDeadlockDetectDepth())); | |
7c673cae FG |
450 | std::lock_guard<std::mutex> lock(wait_txn_map_mutex_); |
451 | assert(!wait_txn_map_.Contains(id)); | |
11fdf7f2 TL |
452 | |
453 | wait_txn_map_.Insert(id, {wait_ids, cf_id, key, exclusive}); | |
7c673cae FG |
454 | |
455 | for (auto wait_id : wait_ids) { | |
456 | if (rev_wait_txn_map_.Contains(wait_id)) { | |
457 | rev_wait_txn_map_.Get(wait_id)++; | |
458 | } else { | |
459 | rev_wait_txn_map_.Insert(wait_id, 1); | |
460 | } | |
461 | } | |
462 | ||
463 | // No deadlock if nobody is waiting on self. | |
464 | if (!rev_wait_txn_map_.Contains(id)) { | |
465 | return false; | |
466 | } | |
467 | ||
468 | const auto* next_ids = &wait_ids; | |
11fdf7f2 TL |
469 | int parent = -1; |
470 | int64_t deadlock_time = 0; | |
7c673cae FG |
471 | for (int tail = 0, head = 0; head < txn->GetDeadlockDetectDepth(); head++) { |
472 | int i = 0; | |
473 | if (next_ids) { | |
474 | for (; i < static_cast<int>(next_ids->size()) && | |
475 | tail + i < txn->GetDeadlockDetectDepth(); | |
476 | i++) { | |
11fdf7f2 TL |
477 | queue_values[tail + i] = (*next_ids)[i]; |
478 | queue_parents[tail + i] = parent; | |
7c673cae FG |
479 | } |
480 | tail += i; | |
481 | } | |
482 | ||
483 | // No more items in the list, meaning no deadlock. | |
484 | if (tail == head) { | |
485 | return false; | |
486 | } | |
487 | ||
11fdf7f2 | 488 | auto next = queue_values[head]; |
7c673cae | 489 | if (next == id) { |
11fdf7f2 TL |
490 | std::vector<DeadlockInfo> path; |
491 | while (head != -1) { | |
492 | assert(wait_txn_map_.Contains(queue_values[head])); | |
493 | ||
494 | auto extracted_info = wait_txn_map_.Get(queue_values[head]); | |
495 | path.push_back({queue_values[head], extracted_info.m_cf_id, | |
494da23a TL |
496 | extracted_info.m_exclusive, |
497 | extracted_info.m_waiting_key}); | |
11fdf7f2 TL |
498 | head = queue_parents[head]; |
499 | } | |
500 | env->GetCurrentTime(&deadlock_time); | |
501 | std::reverse(path.begin(), path.end()); | |
502 | dlock_buffer_.AddNewPath(DeadlockPath(path, deadlock_time)); | |
503 | deadlock_time = 0; | |
7c673cae FG |
504 | DecrementWaitersImpl(txn, wait_ids); |
505 | return true; | |
506 | } else if (!wait_txn_map_.Contains(next)) { | |
507 | next_ids = nullptr; | |
508 | continue; | |
509 | } else { | |
11fdf7f2 TL |
510 | parent = head; |
511 | next_ids = &(wait_txn_map_.Get(next).m_neighbors); | |
7c673cae FG |
512 | } |
513 | } | |
514 | ||
515 | // Wait cycle too big, just assume deadlock. | |
11fdf7f2 TL |
516 | env->GetCurrentTime(&deadlock_time); |
517 | dlock_buffer_.AddNewPath(DeadlockPath(deadlock_time, true)); | |
7c673cae FG |
518 | DecrementWaitersImpl(txn, wait_ids); |
519 | return true; | |
520 | } | |
521 | ||
522 | // Try to lock this key after we have acquired the mutex. | |
523 | // Sets *expire_time to the expiration time in microseconds | |
524 | // or 0 if no expiration. | |
525 | // REQUIRED: Stripe mutex must be held. | |
526 | Status TransactionLockMgr::AcquireLocked(LockMap* lock_map, | |
527 | LockMapStripe* stripe, | |
528 | const std::string& key, Env* env, | |
529 | const LockInfo& txn_lock_info, | |
530 | uint64_t* expire_time, | |
531 | autovector<TransactionID>* txn_ids) { | |
532 | assert(txn_lock_info.txn_ids.size() == 1); | |
533 | ||
534 | Status result; | |
535 | // Check if this key is already locked | |
11fdf7f2 TL |
536 | auto stripe_iter = stripe->keys.find(key); |
537 | if (stripe_iter != stripe->keys.end()) { | |
7c673cae | 538 | // Lock already held |
11fdf7f2 | 539 | LockInfo& lock_info = stripe_iter->second; |
7c673cae FG |
540 | assert(lock_info.txn_ids.size() == 1 || !lock_info.exclusive); |
541 | ||
542 | if (lock_info.exclusive || txn_lock_info.exclusive) { | |
543 | if (lock_info.txn_ids.size() == 1 && | |
544 | lock_info.txn_ids[0] == txn_lock_info.txn_ids[0]) { | |
545 | // The list contains one txn and we're it, so just take it. | |
546 | lock_info.exclusive = txn_lock_info.exclusive; | |
547 | lock_info.expiration_time = txn_lock_info.expiration_time; | |
548 | } else { | |
549 | // Check if it's expired. Skips over txn_lock_info.txn_ids[0] in case | |
550 | // it's there for a shared lock with multiple holders which was not | |
551 | // caught in the first case. | |
552 | if (IsLockExpired(txn_lock_info.txn_ids[0], lock_info, env, | |
553 | expire_time)) { | |
554 | // lock is expired, can steal it | |
555 | lock_info.txn_ids = txn_lock_info.txn_ids; | |
556 | lock_info.exclusive = txn_lock_info.exclusive; | |
557 | lock_info.expiration_time = txn_lock_info.expiration_time; | |
558 | // lock_cnt does not change | |
559 | } else { | |
560 | result = Status::TimedOut(Status::SubCode::kLockTimeout); | |
561 | *txn_ids = lock_info.txn_ids; | |
562 | } | |
563 | } | |
564 | } else { | |
565 | // We are requesting shared access to a shared lock, so just grant it. | |
566 | lock_info.txn_ids.push_back(txn_lock_info.txn_ids[0]); | |
567 | // Using std::max means that expiration time never goes down even when | |
568 | // a transaction is removed from the list. The correct solution would be | |
569 | // to track expiry for every transaction, but this would also work for | |
570 | // now. | |
571 | lock_info.expiration_time = | |
572 | std::max(lock_info.expiration_time, txn_lock_info.expiration_time); | |
573 | } | |
574 | } else { // Lock not held. | |
575 | // Check lock limit | |
576 | if (max_num_locks_ > 0 && | |
577 | lock_map->lock_cnt.load(std::memory_order_acquire) >= max_num_locks_) { | |
578 | result = Status::Busy(Status::SubCode::kLockLimit); | |
579 | } else { | |
580 | // acquire lock | |
581 | stripe->keys.insert({key, txn_lock_info}); | |
582 | ||
583 | // Maintain lock count if there is a limit on the number of locks | |
584 | if (max_num_locks_) { | |
585 | lock_map->lock_cnt++; | |
586 | } | |
587 | } | |
588 | } | |
589 | ||
590 | return result; | |
591 | } | |
592 | ||
11fdf7f2 | 593 | void TransactionLockMgr::UnLockKey(const PessimisticTransaction* txn, |
7c673cae FG |
594 | const std::string& key, |
595 | LockMapStripe* stripe, LockMap* lock_map, | |
596 | Env* env) { | |
11fdf7f2 TL |
597 | #ifdef NDEBUG |
598 | (void)env; | |
599 | #endif | |
7c673cae FG |
600 | TransactionID txn_id = txn->GetID(); |
601 | ||
602 | auto stripe_iter = stripe->keys.find(key); | |
603 | if (stripe_iter != stripe->keys.end()) { | |
604 | auto& txns = stripe_iter->second.txn_ids; | |
605 | auto txn_it = std::find(txns.begin(), txns.end(), txn_id); | |
606 | // Found the key we locked. unlock it. | |
607 | if (txn_it != txns.end()) { | |
608 | if (txns.size() == 1) { | |
609 | stripe->keys.erase(stripe_iter); | |
610 | } else { | |
611 | auto last_it = txns.end() - 1; | |
612 | if (txn_it != last_it) { | |
613 | *txn_it = *last_it; | |
614 | } | |
615 | txns.pop_back(); | |
616 | } | |
617 | ||
618 | if (max_num_locks_ > 0) { | |
619 | // Maintain lock count if there is a limit on the number of locks. | |
620 | assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0); | |
621 | lock_map->lock_cnt--; | |
622 | } | |
623 | } | |
624 | } else { | |
625 | // This key is either not locked or locked by someone else. This should | |
626 | // only happen if the unlocking transaction has expired. | |
627 | assert(txn->GetExpirationTime() > 0 && | |
628 | txn->GetExpirationTime() < env->NowMicros()); | |
629 | } | |
630 | } | |
631 | ||
11fdf7f2 TL |
632 | void TransactionLockMgr::UnLock(PessimisticTransaction* txn, |
633 | uint32_t column_family_id, | |
7c673cae FG |
634 | const std::string& key, Env* env) { |
635 | std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id); | |
636 | LockMap* lock_map = lock_map_ptr.get(); | |
637 | if (lock_map == nullptr) { | |
638 | // Column Family must have been dropped. | |
639 | return; | |
640 | } | |
641 | ||
642 | // Lock the mutex for the stripe that this key hashes to | |
643 | size_t stripe_num = lock_map->GetStripe(key); | |
644 | assert(lock_map->lock_map_stripes_.size() > stripe_num); | |
645 | LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num); | |
646 | ||
647 | stripe->stripe_mutex->Lock(); | |
648 | UnLockKey(txn, key, stripe, lock_map, env); | |
649 | stripe->stripe_mutex->UnLock(); | |
650 | ||
651 | // Signal waiting threads to retry locking | |
652 | stripe->stripe_cv->NotifyAll(); | |
653 | } | |
654 | ||
11fdf7f2 | 655 | void TransactionLockMgr::UnLock(const PessimisticTransaction* txn, |
7c673cae FG |
656 | const TransactionKeyMap* key_map, Env* env) { |
657 | for (auto& key_map_iter : *key_map) { | |
658 | uint32_t column_family_id = key_map_iter.first; | |
659 | auto& keys = key_map_iter.second; | |
660 | ||
661 | std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id); | |
662 | LockMap* lock_map = lock_map_ptr.get(); | |
663 | ||
664 | if (lock_map == nullptr) { | |
665 | // Column Family must have been dropped. | |
666 | return; | |
667 | } | |
668 | ||
669 | // Bucket keys by lock_map_ stripe | |
670 | std::unordered_map<size_t, std::vector<const std::string*>> keys_by_stripe( | |
671 | std::max(keys.size(), lock_map->num_stripes_)); | |
672 | ||
673 | for (auto& key_iter : keys) { | |
674 | const std::string& key = key_iter.first; | |
675 | ||
676 | size_t stripe_num = lock_map->GetStripe(key); | |
677 | keys_by_stripe[stripe_num].push_back(&key); | |
678 | } | |
679 | ||
680 | // For each stripe, grab the stripe mutex and unlock all keys in this stripe | |
681 | for (auto& stripe_iter : keys_by_stripe) { | |
682 | size_t stripe_num = stripe_iter.first; | |
683 | auto& stripe_keys = stripe_iter.second; | |
684 | ||
685 | assert(lock_map->lock_map_stripes_.size() > stripe_num); | |
686 | LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num); | |
687 | ||
688 | stripe->stripe_mutex->Lock(); | |
689 | ||
690 | for (const std::string* key : stripe_keys) { | |
691 | UnLockKey(txn, *key, stripe, lock_map, env); | |
692 | } | |
693 | ||
694 | stripe->stripe_mutex->UnLock(); | |
695 | ||
696 | // Signal waiting threads to retry locking | |
697 | stripe->stripe_cv->NotifyAll(); | |
698 | } | |
699 | } | |
700 | } | |
701 | ||
702 | TransactionLockMgr::LockStatusData TransactionLockMgr::GetLockStatusData() { | |
703 | LockStatusData data; | |
704 | // Lock order here is important. The correct order is lock_map_mutex_, then | |
705 | // for every column family ID in ascending order lock every stripe in | |
706 | // ascending order. | |
707 | InstrumentedMutexLock l(&lock_map_mutex_); | |
708 | ||
709 | std::vector<uint32_t> cf_ids; | |
710 | for (const auto& map : lock_maps_) { | |
711 | cf_ids.push_back(map.first); | |
712 | } | |
713 | std::sort(cf_ids.begin(), cf_ids.end()); | |
714 | ||
715 | for (auto i : cf_ids) { | |
716 | const auto& stripes = lock_maps_[i]->lock_map_stripes_; | |
717 | // Iterate and lock all stripes in ascending order. | |
718 | for (const auto& j : stripes) { | |
719 | j->stripe_mutex->Lock(); | |
720 | for (const auto& it : j->keys) { | |
721 | struct KeyLockInfo info; | |
722 | info.exclusive = it.second.exclusive; | |
723 | info.key = it.first; | |
724 | for (const auto& id : it.second.txn_ids) { | |
725 | info.ids.push_back(id); | |
726 | } | |
727 | data.insert({i, info}); | |
728 | } | |
729 | } | |
730 | } | |
731 | ||
732 | // Unlock everything. Unlocking order is not important. | |
733 | for (auto i : cf_ids) { | |
734 | const auto& stripes = lock_maps_[i]->lock_map_stripes_; | |
735 | for (const auto& j : stripes) { | |
736 | j->stripe_mutex->UnLock(); | |
737 | } | |
738 | } | |
739 | ||
740 | return data; | |
741 | } | |
11fdf7f2 TL |
742 | std::vector<DeadlockPath> TransactionLockMgr::GetDeadlockInfoBuffer() { |
743 | return dlock_buffer_.PrepareBuffer(); | |
744 | } | |
745 | ||
746 | void TransactionLockMgr::Resize(uint32_t target_size) { | |
747 | dlock_buffer_.Resize(target_size); | |
748 | } | |
7c673cae FG |
749 | |
750 | } // namespace rocksdb | |
751 | #endif // ROCKSDB_LITE |