#include "monitoring/instrumented_mutex.h"
#include "rocksdb/utilities/transaction.h"
#include "util/autovector.h"
+#include "util/hash_containers.h"
#include "util/hash_map.h"
#include "util/thread_local.h"
#include "utilities/transactions/lock/lock_manager.h"
struct LockMap;
struct LockMapStripe;
-struct DeadlockInfoBuffer {
+template <class Path>
+class DeadlockInfoBufferTempl {
private:
- std::vector<DeadlockPath> paths_buffer_;
+ std::vector<Path> paths_buffer_;
uint32_t buffer_idx_;
std::mutex paths_buffer_mutex_;
- std::vector<DeadlockPath> Normalize();
+
+ std::vector<Path> Normalize() {
+ auto working = paths_buffer_;
+
+ if (working.empty()) {
+ return working;
+ }
+
+ // Next write occurs at a nonexistent path's slot
+ if (paths_buffer_[buffer_idx_].empty()) {
+ working.resize(buffer_idx_);
+ } else {
+ std::rotate(working.begin(), working.begin() + buffer_idx_,
+ working.end());
+ }
+
+ return working;
+ }
public:
- explicit DeadlockInfoBuffer(uint32_t n_latest_dlocks)
+ explicit DeadlockInfoBufferTempl(uint32_t n_latest_dlocks)
: paths_buffer_(n_latest_dlocks), buffer_idx_(0) {}
- void AddNewPath(DeadlockPath path);
- void Resize(uint32_t target_size);
- std::vector<DeadlockPath> PrepareBuffer();
+
+ void AddNewPath(Path path) {
+ std::lock_guard<std::mutex> lock(paths_buffer_mutex_);
+
+ if (paths_buffer_.empty()) {
+ return;
+ }
+
+ paths_buffer_[buffer_idx_] = std::move(path);
+ buffer_idx_ = (buffer_idx_ + 1) % paths_buffer_.size();
+ }
+
+ void Resize(uint32_t target_size) {
+ std::lock_guard<std::mutex> lock(paths_buffer_mutex_);
+
+ paths_buffer_ = Normalize();
+
+ // Drop the deadlocks that will no longer be needed ater the normalize
+ if (target_size < paths_buffer_.size()) {
+ paths_buffer_.erase(
+ paths_buffer_.begin(),
+ paths_buffer_.begin() + (paths_buffer_.size() - target_size));
+ buffer_idx_ = 0;
+ }
+ // Resize the buffer to the target size and restore the buffer's idx
+ else {
+ auto prev_size = paths_buffer_.size();
+ paths_buffer_.resize(target_size);
+ buffer_idx_ = (uint32_t)prev_size;
+ }
+ }
+
+ std::vector<Path> PrepareBuffer() {
+ std::lock_guard<std::mutex> lock(paths_buffer_mutex_);
+
+ // Reversing the normalized vector returns the latest deadlocks first
+ auto working = Normalize();
+ std::reverse(working.begin(), working.end());
+
+ return working;
+ }
};
+using DeadlockInfoBuffer = DeadlockInfoBufferTempl<DeadlockPath>;
+
struct TrackedTrxInfo {
autovector<TransactionID> m_neighbors;
uint32_t m_cf_id;
PointLockManager(const PointLockManager&) = delete;
PointLockManager& operator=(const PointLockManager&) = delete;
- ~PointLockManager() override;
+ ~PointLockManager() override {}
bool IsPointLockSupported() const override { return true; }
return PointLockTrackerFactory::Get();
}
+ // Creates a new LockMap for this column family. Caller should guarantee
+ // that this column family does not already exist.
void AddColumnFamily(const ColumnFamilyHandle* cf) override;
+ // Deletes the LockMap for this column family. Caller should guarantee that
+ // this column family is no longer in use.
void RemoveColumnFamily(const ColumnFamilyHandle* cf) override;
Status TryLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id,
InstrumentedMutex lock_map_mutex_;
// Map of ColumnFamilyId to locked key info
- using LockMaps = std::unordered_map<uint32_t, std::shared_ptr<LockMap>>;
+ using LockMaps = UnorderedMap<uint32_t, std::shared_ptr<LockMap>>;
LockMaps lock_maps_;
// Thread-local cache of entries in lock_maps_. This is an optimization
Status AcquireWithTimeout(PessimisticTransaction* txn, LockMap* lock_map,
LockMapStripe* stripe, uint32_t column_family_id,
const std::string& key, Env* env, int64_t timeout,
- LockInfo&& lock_info);
+ const LockInfo& lock_info);
Status AcquireLocked(LockMap* lock_map, LockMapStripe* stripe,
const std::string& key, Env* env,
- LockInfo&& lock_info, uint64_t* wait_time,
+ const LockInfo& lock_info, uint64_t* wait_time,
autovector<TransactionID>* txn_ids);
void UnLockKey(PessimisticTransaction* txn, const std::string& key,