]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/transactions/pessimistic_transaction_db.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / utilities / transactions / pessimistic_transaction_db.h
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #pragma once
7 #ifndef ROCKSDB_LITE
8
9 #include <mutex>
10 #include <queue>
11 #include <set>
12 #include <string>
13 #include <unordered_map>
14 #include <vector>
15
16 #include "db/db_iter.h"
17 #include "db/read_callback.h"
18 #include "db/snapshot_checker.h"
19 #include "rocksdb/db.h"
20 #include "rocksdb/options.h"
21 #include "rocksdb/utilities/transaction_db.h"
22 #include "util/cast_util.h"
23 #include "utilities/transactions/lock/lock_manager.h"
24 #include "utilities/transactions/lock/range/range_lock_manager.h"
25 #include "utilities/transactions/pessimistic_transaction.h"
26 #include "utilities/transactions/write_prepared_txn.h"
27
28 namespace ROCKSDB_NAMESPACE {
29
30 class PessimisticTransactionDB : public TransactionDB {
31 public:
32 explicit PessimisticTransactionDB(DB* db,
33 const TransactionDBOptions& txn_db_options);
34
35 explicit PessimisticTransactionDB(StackableDB* db,
36 const TransactionDBOptions& txn_db_options);
37
38 virtual ~PessimisticTransactionDB();
39
40 virtual const Snapshot* GetSnapshot() override { return db_->GetSnapshot(); }
41
42 virtual Status Initialize(
43 const std::vector<size_t>& compaction_enabled_cf_indices,
44 const std::vector<ColumnFamilyHandle*>& handles);
45
46 Transaction* BeginTransaction(const WriteOptions& write_options,
47 const TransactionOptions& txn_options,
48 Transaction* old_txn) override = 0;
49
50 using StackableDB::Put;
51 virtual Status Put(const WriteOptions& options,
52 ColumnFamilyHandle* column_family, const Slice& key,
53 const Slice& val) override;
54
55 using StackableDB::Delete;
56 virtual Status Delete(const WriteOptions& wopts,
57 ColumnFamilyHandle* column_family,
58 const Slice& key) override;
59
60 using StackableDB::SingleDelete;
61 virtual Status SingleDelete(const WriteOptions& wopts,
62 ColumnFamilyHandle* column_family,
63 const Slice& key) override;
64
65 using StackableDB::Merge;
66 virtual Status Merge(const WriteOptions& options,
67 ColumnFamilyHandle* column_family, const Slice& key,
68 const Slice& value) override;
69
70 using TransactionDB::Write;
71 virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
72 inline Status WriteWithConcurrencyControl(const WriteOptions& opts,
73 WriteBatch* updates) {
74 Status s;
75 if (opts.protection_bytes_per_key > 0) {
76 s = WriteBatchInternal::UpdateProtectionInfo(
77 updates, opts.protection_bytes_per_key);
78 }
79 if (s.ok()) {
80 // Need to lock all keys in this batch to prevent write conflicts with
81 // concurrent transactions.
82 Transaction* txn = BeginInternalTransaction(opts);
83 txn->DisableIndexing();
84
85 auto txn_impl = static_cast_with_check<PessimisticTransaction>(txn);
86
87 // Since commitBatch sorts the keys before locking, concurrent Write()
88 // operations will not cause a deadlock.
89 // In order to avoid a deadlock with a concurrent Transaction,
90 // Transactions should use a lock timeout.
91 s = txn_impl->CommitBatch(updates);
92
93 delete txn;
94 }
95
96 return s;
97 }
98
99 using StackableDB::CreateColumnFamily;
100 virtual Status CreateColumnFamily(const ColumnFamilyOptions& options,
101 const std::string& column_family_name,
102 ColumnFamilyHandle** handle) override;
103
104 Status CreateColumnFamilies(
105 const ColumnFamilyOptions& options,
106 const std::vector<std::string>& column_family_names,
107 std::vector<ColumnFamilyHandle*>* handles) override;
108
109 Status CreateColumnFamilies(
110 const std::vector<ColumnFamilyDescriptor>& column_families,
111 std::vector<ColumnFamilyHandle*>* handles) override;
112
113 using StackableDB::DropColumnFamily;
114 virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override;
115
116 Status DropColumnFamilies(
117 const std::vector<ColumnFamilyHandle*>& column_families) override;
118
119 Status TryLock(PessimisticTransaction* txn, uint32_t cfh_id,
120 const std::string& key, bool exclusive);
121 Status TryRangeLock(PessimisticTransaction* txn, uint32_t cfh_id,
122 const Endpoint& start_endp, const Endpoint& end_endp);
123
124 void UnLock(PessimisticTransaction* txn, const LockTracker& keys);
125 void UnLock(PessimisticTransaction* txn, uint32_t cfh_id,
126 const std::string& key);
127
128 void AddColumnFamily(const ColumnFamilyHandle* handle);
129
130 static TransactionDBOptions ValidateTxnDBOptions(
131 const TransactionDBOptions& txn_db_options);
132
133 const TransactionDBOptions& GetTxnDBOptions() const {
134 return txn_db_options_;
135 }
136
137 void InsertExpirableTransaction(TransactionID tx_id,
138 PessimisticTransaction* tx);
139 void RemoveExpirableTransaction(TransactionID tx_id);
140
141 // If transaction is no longer available, locks can be stolen
142 // If transaction is available, try stealing locks directly from transaction
143 // It is the caller's responsibility to ensure that the referred transaction
144 // is expirable (GetExpirationTime() > 0) and that it is expired.
145 bool TryStealingExpiredTransactionLocks(TransactionID tx_id);
146
147 Transaction* GetTransactionByName(const TransactionName& name) override;
148
149 void RegisterTransaction(Transaction* txn);
150 void UnregisterTransaction(Transaction* txn);
151
152 // not thread safe. current use case is during recovery (single thread)
153 void GetAllPreparedTransactions(std::vector<Transaction*>* trans) override;
154
155 LockManager::PointLockStatus GetLockStatusData() override;
156
157 std::vector<DeadlockPath> GetDeadlockInfoBuffer() override;
158 void SetDeadlockInfoBufferSize(uint32_t target_size) override;
159
160 // The default implementation does nothing. The actual implementation is moved
161 // to the child classes that actually need this information. This was due to
162 // an odd performance drop we observed when the added std::atomic member to
163 // the base class even when the subclass do not read it in the fast path.
164 virtual void UpdateCFComparatorMap(const std::vector<ColumnFamilyHandle*>&) {}
165 virtual void UpdateCFComparatorMap(ColumnFamilyHandle*) {}
166
167 // Use the returned factory to create LockTrackers in transactions.
168 const LockTrackerFactory& GetLockTrackerFactory() const {
169 return lock_manager_->GetLockTrackerFactory();
170 }
171
172 std::pair<Status, std::shared_ptr<const Snapshot>> CreateTimestampedSnapshot(
173 TxnTimestamp ts) override;
174
175 std::shared_ptr<const Snapshot> GetTimestampedSnapshot(
176 TxnTimestamp ts) const override;
177
178 void ReleaseTimestampedSnapshotsOlderThan(TxnTimestamp ts) override;
179
180 Status GetTimestampedSnapshots(TxnTimestamp ts_lb, TxnTimestamp ts_ub,
181 std::vector<std::shared_ptr<const Snapshot>>&
182 timestamped_snapshots) const override;
183
184 protected:
185 DBImpl* db_impl_;
186 std::shared_ptr<Logger> info_log_;
187 const TransactionDBOptions txn_db_options_;
188
189 static Status FailIfBatchHasTs(const WriteBatch* wb);
190
191 static Status FailIfCfEnablesTs(const DB* db,
192 const ColumnFamilyHandle* column_family);
193
194 void ReinitializeTransaction(
195 Transaction* txn, const WriteOptions& write_options,
196 const TransactionOptions& txn_options = TransactionOptions());
197
198 virtual Status VerifyCFOptions(const ColumnFamilyOptions& cf_options);
199
200 private:
201 friend class WritePreparedTxnDB;
202 friend class WritePreparedTxnDBMock;
203 friend class WriteUnpreparedTxn;
204 friend class TransactionTest_DoubleCrashInRecovery_Test;
205 friend class TransactionTest_DoubleEmptyWrite_Test;
206 friend class TransactionTest_DuplicateKeys_Test;
207 friend class TransactionTest_PersistentTwoPhaseTransactionTest_Test;
208 friend class TransactionTest_TwoPhaseDoubleRecoveryTest_Test;
209 friend class TransactionTest_TwoPhaseOutOfOrderDelete_Test;
210 friend class TransactionStressTest_TwoPhaseLongPrepareTest_Test;
211 friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
212 friend class WriteUnpreparedTransactionTest_MarkLogWithPrepSection_Test;
213
214 Transaction* BeginInternalTransaction(const WriteOptions& options);
215
216 std::shared_ptr<LockManager> lock_manager_;
217
218 // Must be held when adding/dropping column families.
219 InstrumentedMutex column_family_mutex_;
220
221 // Used to ensure that no locks are stolen from an expirable transaction
222 // that has started a commit. Only transactions with an expiration time
223 // should be in this map.
224 std::mutex map_mutex_;
225 std::unordered_map<TransactionID, PessimisticTransaction*>
226 expirable_transactions_map_;
227
228 // map from name to two phase transaction instance
229 std::mutex name_map_mutex_;
230 std::unordered_map<TransactionName, Transaction*> transactions_;
231
232 // Signal that we are testing a crash scenario. Some asserts could be relaxed
233 // in such cases.
234 virtual void TEST_Crash() {}
235 };
236
237 // A PessimisticTransactionDB that writes the data to the DB after the commit.
238 // In this way the DB only contains the committed data.
239 class WriteCommittedTxnDB : public PessimisticTransactionDB {
240 public:
241 explicit WriteCommittedTxnDB(DB* db,
242 const TransactionDBOptions& txn_db_options)
243 : PessimisticTransactionDB(db, txn_db_options) {}
244
245 explicit WriteCommittedTxnDB(StackableDB* db,
246 const TransactionDBOptions& txn_db_options)
247 : PessimisticTransactionDB(db, txn_db_options) {}
248
249 virtual ~WriteCommittedTxnDB() {}
250
251 Transaction* BeginTransaction(const WriteOptions& write_options,
252 const TransactionOptions& txn_options,
253 Transaction* old_txn) override;
254
255 // Optimized version of ::Write that makes use of skip_concurrency_control
256 // hint
257 using TransactionDB::Write;
258 virtual Status Write(const WriteOptions& opts,
259 const TransactionDBWriteOptimizations& optimizations,
260 WriteBatch* updates) override;
261 virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
262 };
263
264 inline Status PessimisticTransactionDB::FailIfBatchHasTs(
265 const WriteBatch* batch) {
266 if (batch != nullptr && WriteBatchInternal::HasKeyWithTimestamp(*batch)) {
267 return Status::NotSupported(
268 "Writes with timestamp must go through transaction API instead of "
269 "TransactionDB.");
270 }
271 return Status::OK();
272 }
273
274 inline Status PessimisticTransactionDB::FailIfCfEnablesTs(
275 const DB* db, const ColumnFamilyHandle* column_family) {
276 assert(db);
277 column_family = column_family ? column_family : db->DefaultColumnFamily();
278 assert(column_family);
279 const Comparator* const ucmp = column_family->GetComparator();
280 assert(ucmp);
281 if (ucmp->timestamp_size() > 0) {
282 return Status::NotSupported(
283 "Write operation with user timestamp must go through the transaction "
284 "API instead of TransactionDB.");
285 }
286 return Status::OK();
287 }
288
289 class SnapshotCreationCallback : public PostMemTableCallback {
290 public:
291 explicit SnapshotCreationCallback(
292 DBImpl* dbi, TxnTimestamp commit_ts,
293 const std::shared_ptr<TransactionNotifier>& notifier,
294 std::shared_ptr<const Snapshot>& snapshot)
295 : db_impl_(dbi),
296 commit_ts_(commit_ts),
297 snapshot_notifier_(notifier),
298 snapshot_(snapshot) {
299 assert(db_impl_);
300 }
301
302 ~SnapshotCreationCallback() override {
303 snapshot_creation_status_.PermitUncheckedError();
304 }
305
306 Status operator()(SequenceNumber seq, bool disable_memtable) override;
307
308 private:
309 DBImpl* const db_impl_;
310 const TxnTimestamp commit_ts_;
311 std::shared_ptr<TransactionNotifier> snapshot_notifier_;
312 std::shared_ptr<const Snapshot>& snapshot_;
313
314 Status snapshot_creation_status_;
315 };
316
317 } // namespace ROCKSDB_NAMESPACE
318 #endif // ROCKSDB_LITE