]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
6 | #pragma once | |
7 | #ifndef ROCKSDB_LITE | |
8 | ||
9 | #include <mutex> | |
10 | #include <queue> | |
11 | #include <set> | |
12 | #include <string> | |
13 | #include <unordered_map> | |
14 | #include <vector> | |
15 | ||
16 | #include "db/db_iter.h" | |
17 | #include "db/read_callback.h" | |
18 | #include "db/snapshot_checker.h" | |
19 | #include "rocksdb/db.h" | |
20 | #include "rocksdb/options.h" | |
21 | #include "rocksdb/utilities/transaction_db.h" | |
f67539c2 | 22 | #include "util/cast_util.h" |
20effc67 | 23 | #include "utilities/transactions/lock/lock_manager.h" |
1e59de90 | 24 | #include "utilities/transactions/lock/range/range_lock_manager.h" |
11fdf7f2 | 25 | #include "utilities/transactions/pessimistic_transaction.h" |
11fdf7f2 TL |
26 | #include "utilities/transactions/write_prepared_txn.h" |
27 | ||
f67539c2 | 28 | namespace ROCKSDB_NAMESPACE { |
11fdf7f2 TL |
29 | |
30 | class PessimisticTransactionDB : public TransactionDB { | |
31 | public: | |
32 | explicit PessimisticTransactionDB(DB* db, | |
33 | const TransactionDBOptions& txn_db_options); | |
34 | ||
35 | explicit PessimisticTransactionDB(StackableDB* db, | |
36 | const TransactionDBOptions& txn_db_options); | |
37 | ||
38 | virtual ~PessimisticTransactionDB(); | |
39 | ||
40 | virtual const Snapshot* GetSnapshot() override { return db_->GetSnapshot(); } | |
41 | ||
42 | virtual Status Initialize( | |
43 | const std::vector<size_t>& compaction_enabled_cf_indices, | |
44 | const std::vector<ColumnFamilyHandle*>& handles); | |
45 | ||
46 | Transaction* BeginTransaction(const WriteOptions& write_options, | |
47 | const TransactionOptions& txn_options, | |
48 | Transaction* old_txn) override = 0; | |
49 | ||
50 | using StackableDB::Put; | |
51 | virtual Status Put(const WriteOptions& options, | |
52 | ColumnFamilyHandle* column_family, const Slice& key, | |
53 | const Slice& val) override; | |
54 | ||
55 | using StackableDB::Delete; | |
56 | virtual Status Delete(const WriteOptions& wopts, | |
57 | ColumnFamilyHandle* column_family, | |
58 | const Slice& key) override; | |
59 | ||
60 | using StackableDB::SingleDelete; | |
61 | virtual Status SingleDelete(const WriteOptions& wopts, | |
62 | ColumnFamilyHandle* column_family, | |
63 | const Slice& key) override; | |
64 | ||
65 | using StackableDB::Merge; | |
66 | virtual Status Merge(const WriteOptions& options, | |
67 | ColumnFamilyHandle* column_family, const Slice& key, | |
68 | const Slice& value) override; | |
69 | ||
70 | using TransactionDB::Write; | |
71 | virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override; | |
f67539c2 TL |
72 | inline Status WriteWithConcurrencyControl(const WriteOptions& opts, |
73 | WriteBatch* updates) { | |
1e59de90 TL |
74 | Status s; |
75 | if (opts.protection_bytes_per_key > 0) { | |
76 | s = WriteBatchInternal::UpdateProtectionInfo( | |
77 | updates, opts.protection_bytes_per_key); | |
78 | } | |
79 | if (s.ok()) { | |
80 | // Need to lock all keys in this batch to prevent write conflicts with | |
81 | // concurrent transactions. | |
82 | Transaction* txn = BeginInternalTransaction(opts); | |
83 | txn->DisableIndexing(); | |
84 | ||
85 | auto txn_impl = static_cast_with_check<PessimisticTransaction>(txn); | |
86 | ||
87 | // Since commitBatch sorts the keys before locking, concurrent Write() | |
88 | // operations will not cause a deadlock. | |
89 | // In order to avoid a deadlock with a concurrent Transaction, | |
90 | // Transactions should use a lock timeout. | |
91 | s = txn_impl->CommitBatch(updates); | |
92 | ||
93 | delete txn; | |
94 | } | |
f67539c2 TL |
95 | |
96 | return s; | |
97 | } | |
11fdf7f2 TL |
98 | |
99 | using StackableDB::CreateColumnFamily; | |
100 | virtual Status CreateColumnFamily(const ColumnFamilyOptions& options, | |
101 | const std::string& column_family_name, | |
102 | ColumnFamilyHandle** handle) override; | |
103 | ||
1e59de90 TL |
104 | Status CreateColumnFamilies( |
105 | const ColumnFamilyOptions& options, | |
106 | const std::vector<std::string>& column_family_names, | |
107 | std::vector<ColumnFamilyHandle*>* handles) override; | |
108 | ||
109 | Status CreateColumnFamilies( | |
110 | const std::vector<ColumnFamilyDescriptor>& column_families, | |
111 | std::vector<ColumnFamilyHandle*>* handles) override; | |
112 | ||
11fdf7f2 TL |
113 | using StackableDB::DropColumnFamily; |
114 | virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override; | |
115 | ||
1e59de90 TL |
116 | Status DropColumnFamilies( |
117 | const std::vector<ColumnFamilyHandle*>& column_families) override; | |
118 | ||
11fdf7f2 TL |
119 | Status TryLock(PessimisticTransaction* txn, uint32_t cfh_id, |
120 | const std::string& key, bool exclusive); | |
1e59de90 TL |
121 | Status TryRangeLock(PessimisticTransaction* txn, uint32_t cfh_id, |
122 | const Endpoint& start_endp, const Endpoint& end_endp); | |
11fdf7f2 | 123 | |
20effc67 | 124 | void UnLock(PessimisticTransaction* txn, const LockTracker& keys); |
11fdf7f2 TL |
125 | void UnLock(PessimisticTransaction* txn, uint32_t cfh_id, |
126 | const std::string& key); | |
127 | ||
128 | void AddColumnFamily(const ColumnFamilyHandle* handle); | |
129 | ||
130 | static TransactionDBOptions ValidateTxnDBOptions( | |
131 | const TransactionDBOptions& txn_db_options); | |
132 | ||
133 | const TransactionDBOptions& GetTxnDBOptions() const { | |
134 | return txn_db_options_; | |
135 | } | |
136 | ||
137 | void InsertExpirableTransaction(TransactionID tx_id, | |
138 | PessimisticTransaction* tx); | |
139 | void RemoveExpirableTransaction(TransactionID tx_id); | |
140 | ||
141 | // If transaction is no longer available, locks can be stolen | |
142 | // If transaction is available, try stealing locks directly from transaction | |
143 | // It is the caller's responsibility to ensure that the referred transaction | |
144 | // is expirable (GetExpirationTime() > 0) and that it is expired. | |
145 | bool TryStealingExpiredTransactionLocks(TransactionID tx_id); | |
146 | ||
147 | Transaction* GetTransactionByName(const TransactionName& name) override; | |
148 | ||
149 | void RegisterTransaction(Transaction* txn); | |
150 | void UnregisterTransaction(Transaction* txn); | |
151 | ||
152 | // not thread safe. current use case is during recovery (single thread) | |
153 | void GetAllPreparedTransactions(std::vector<Transaction*>* trans) override; | |
154 | ||
20effc67 | 155 | LockManager::PointLockStatus GetLockStatusData() override; |
11fdf7f2 TL |
156 | |
157 | std::vector<DeadlockPath> GetDeadlockInfoBuffer() override; | |
158 | void SetDeadlockInfoBufferSize(uint32_t target_size) override; | |
159 | ||
160 | // The default implementation does nothing. The actual implementation is moved | |
161 | // to the child classes that actually need this information. This was due to | |
162 | // an odd performance drop we observed when the added std::atomic member to | |
163 | // the base class even when the subclass do not read it in the fast path. | |
164 | virtual void UpdateCFComparatorMap(const std::vector<ColumnFamilyHandle*>&) {} | |
165 | virtual void UpdateCFComparatorMap(ColumnFamilyHandle*) {} | |
166 | ||
20effc67 TL |
167 | // Use the returned factory to create LockTrackers in transactions. |
168 | const LockTrackerFactory& GetLockTrackerFactory() const { | |
169 | return lock_manager_->GetLockTrackerFactory(); | |
170 | } | |
171 | ||
1e59de90 TL |
172 | std::pair<Status, std::shared_ptr<const Snapshot>> CreateTimestampedSnapshot( |
173 | TxnTimestamp ts) override; | |
174 | ||
175 | std::shared_ptr<const Snapshot> GetTimestampedSnapshot( | |
176 | TxnTimestamp ts) const override; | |
177 | ||
178 | void ReleaseTimestampedSnapshotsOlderThan(TxnTimestamp ts) override; | |
179 | ||
180 | Status GetTimestampedSnapshots(TxnTimestamp ts_lb, TxnTimestamp ts_ub, | |
181 | std::vector<std::shared_ptr<const Snapshot>>& | |
182 | timestamped_snapshots) const override; | |
183 | ||
11fdf7f2 TL |
184 | protected: |
185 | DBImpl* db_impl_; | |
186 | std::shared_ptr<Logger> info_log_; | |
187 | const TransactionDBOptions txn_db_options_; | |
188 | ||
1e59de90 TL |
189 | static Status FailIfBatchHasTs(const WriteBatch* wb); |
190 | ||
191 | static Status FailIfCfEnablesTs(const DB* db, | |
192 | const ColumnFamilyHandle* column_family); | |
193 | ||
11fdf7f2 TL |
194 | void ReinitializeTransaction( |
195 | Transaction* txn, const WriteOptions& write_options, | |
196 | const TransactionOptions& txn_options = TransactionOptions()); | |
197 | ||
198 | virtual Status VerifyCFOptions(const ColumnFamilyOptions& cf_options); | |
199 | ||
200 | private: | |
201 | friend class WritePreparedTxnDB; | |
202 | friend class WritePreparedTxnDBMock; | |
203 | friend class WriteUnpreparedTxn; | |
f67539c2 | 204 | friend class TransactionTest_DoubleCrashInRecovery_Test; |
11fdf7f2 TL |
205 | friend class TransactionTest_DoubleEmptyWrite_Test; |
206 | friend class TransactionTest_DuplicateKeys_Test; | |
207 | friend class TransactionTest_PersistentTwoPhaseTransactionTest_Test; | |
11fdf7f2 TL |
208 | friend class TransactionTest_TwoPhaseDoubleRecoveryTest_Test; |
209 | friend class TransactionTest_TwoPhaseOutOfOrderDelete_Test; | |
f67539c2 | 210 | friend class TransactionStressTest_TwoPhaseLongPrepareTest_Test; |
11fdf7f2 TL |
211 | friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; |
212 | friend class WriteUnpreparedTransactionTest_MarkLogWithPrepSection_Test; | |
20effc67 | 213 | |
1e59de90 TL |
214 | Transaction* BeginInternalTransaction(const WriteOptions& options); |
215 | ||
216 | std::shared_ptr<LockManager> lock_manager_; | |
11fdf7f2 TL |
217 | |
218 | // Must be held when adding/dropping column families. | |
219 | InstrumentedMutex column_family_mutex_; | |
11fdf7f2 TL |
220 | |
221 | // Used to ensure that no locks are stolen from an expirable transaction | |
222 | // that has started a commit. Only transactions with an expiration time | |
223 | // should be in this map. | |
224 | std::mutex map_mutex_; | |
225 | std::unordered_map<TransactionID, PessimisticTransaction*> | |
226 | expirable_transactions_map_; | |
227 | ||
228 | // map from name to two phase transaction instance | |
229 | std::mutex name_map_mutex_; | |
230 | std::unordered_map<TransactionName, Transaction*> transactions_; | |
231 | ||
232 | // Signal that we are testing a crash scenario. Some asserts could be relaxed | |
233 | // in such cases. | |
234 | virtual void TEST_Crash() {} | |
235 | }; | |
236 | ||
237 | // A PessimisticTransactionDB that writes the data to the DB after the commit. | |
238 | // In this way the DB only contains the committed data. | |
239 | class WriteCommittedTxnDB : public PessimisticTransactionDB { | |
240 | public: | |
241 | explicit WriteCommittedTxnDB(DB* db, | |
242 | const TransactionDBOptions& txn_db_options) | |
243 | : PessimisticTransactionDB(db, txn_db_options) {} | |
244 | ||
245 | explicit WriteCommittedTxnDB(StackableDB* db, | |
246 | const TransactionDBOptions& txn_db_options) | |
247 | : PessimisticTransactionDB(db, txn_db_options) {} | |
248 | ||
249 | virtual ~WriteCommittedTxnDB() {} | |
250 | ||
251 | Transaction* BeginTransaction(const WriteOptions& write_options, | |
252 | const TransactionOptions& txn_options, | |
253 | Transaction* old_txn) override; | |
254 | ||
255 | // Optimized version of ::Write that makes use of skip_concurrency_control | |
256 | // hint | |
257 | using TransactionDB::Write; | |
258 | virtual Status Write(const WriteOptions& opts, | |
259 | const TransactionDBWriteOptimizations& optimizations, | |
260 | WriteBatch* updates) override; | |
f67539c2 | 261 | virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override; |
11fdf7f2 TL |
262 | }; |
263 | ||
1e59de90 TL |
264 | inline Status PessimisticTransactionDB::FailIfBatchHasTs( |
265 | const WriteBatch* batch) { | |
266 | if (batch != nullptr && WriteBatchInternal::HasKeyWithTimestamp(*batch)) { | |
267 | return Status::NotSupported( | |
268 | "Writes with timestamp must go through transaction API instead of " | |
269 | "TransactionDB."); | |
270 | } | |
271 | return Status::OK(); | |
272 | } | |
273 | ||
274 | inline Status PessimisticTransactionDB::FailIfCfEnablesTs( | |
275 | const DB* db, const ColumnFamilyHandle* column_family) { | |
276 | assert(db); | |
277 | column_family = column_family ? column_family : db->DefaultColumnFamily(); | |
278 | assert(column_family); | |
279 | const Comparator* const ucmp = column_family->GetComparator(); | |
280 | assert(ucmp); | |
281 | if (ucmp->timestamp_size() > 0) { | |
282 | return Status::NotSupported( | |
283 | "Write operation with user timestamp must go through the transaction " | |
284 | "API instead of TransactionDB."); | |
285 | } | |
286 | return Status::OK(); | |
287 | } | |
288 | ||
289 | class SnapshotCreationCallback : public PostMemTableCallback { | |
290 | public: | |
291 | explicit SnapshotCreationCallback( | |
292 | DBImpl* dbi, TxnTimestamp commit_ts, | |
293 | const std::shared_ptr<TransactionNotifier>& notifier, | |
294 | std::shared_ptr<const Snapshot>& snapshot) | |
295 | : db_impl_(dbi), | |
296 | commit_ts_(commit_ts), | |
297 | snapshot_notifier_(notifier), | |
298 | snapshot_(snapshot) { | |
299 | assert(db_impl_); | |
300 | } | |
301 | ||
302 | ~SnapshotCreationCallback() override { | |
303 | snapshot_creation_status_.PermitUncheckedError(); | |
304 | } | |
305 | ||
306 | Status operator()(SequenceNumber seq, bool disable_memtable) override; | |
307 | ||
308 | private: | |
309 | DBImpl* const db_impl_; | |
310 | const TxnTimestamp commit_ts_; | |
311 | std::shared_ptr<TransactionNotifier> snapshot_notifier_; | |
312 | std::shared_ptr<const Snapshot>& snapshot_; | |
313 | ||
314 | Status snapshot_creation_status_; | |
315 | }; | |
316 | ||
f67539c2 | 317 | } // namespace ROCKSDB_NAMESPACE |
11fdf7f2 | 318 | #endif // ROCKSDB_LITE |