1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
13 #include <unordered_map>
16 #include "db/db_iter.h"
17 #include "db/read_callback.h"
18 #include "db/snapshot_checker.h"
19 #include "rocksdb/db.h"
20 #include "rocksdb/options.h"
21 #include "rocksdb/utilities/transaction_db.h"
22 #include "util/cast_util.h"
23 #include "utilities/transactions/lock/lock_manager.h"
24 #include "utilities/transactions/lock/range/range_lock_manager.h"
25 #include "utilities/transactions/pessimistic_transaction.h"
26 #include "utilities/transactions/write_prepared_txn.h"
28 namespace ROCKSDB_NAMESPACE
{
30 class PessimisticTransactionDB
: public TransactionDB
{
32 explicit PessimisticTransactionDB(DB
* db
,
33 const TransactionDBOptions
& txn_db_options
);
35 explicit PessimisticTransactionDB(StackableDB
* db
,
36 const TransactionDBOptions
& txn_db_options
);
38 virtual ~PessimisticTransactionDB();
40 virtual const Snapshot
* GetSnapshot() override
{ return db_
->GetSnapshot(); }
42 virtual Status
Initialize(
43 const std::vector
<size_t>& compaction_enabled_cf_indices
,
44 const std::vector
<ColumnFamilyHandle
*>& handles
);
46 Transaction
* BeginTransaction(const WriteOptions
& write_options
,
47 const TransactionOptions
& txn_options
,
48 Transaction
* old_txn
) override
= 0;
50 using StackableDB::Put
;
51 virtual Status
Put(const WriteOptions
& options
,
52 ColumnFamilyHandle
* column_family
, const Slice
& key
,
53 const Slice
& val
) override
;
55 using StackableDB::Delete
;
56 virtual Status
Delete(const WriteOptions
& wopts
,
57 ColumnFamilyHandle
* column_family
,
58 const Slice
& key
) override
;
60 using StackableDB::SingleDelete
;
61 virtual Status
SingleDelete(const WriteOptions
& wopts
,
62 ColumnFamilyHandle
* column_family
,
63 const Slice
& key
) override
;
65 using StackableDB::Merge
;
66 virtual Status
Merge(const WriteOptions
& options
,
67 ColumnFamilyHandle
* column_family
, const Slice
& key
,
68 const Slice
& value
) override
;
70 using TransactionDB::Write
;
71 virtual Status
Write(const WriteOptions
& opts
, WriteBatch
* updates
) override
;
72 inline Status
WriteWithConcurrencyControl(const WriteOptions
& opts
,
73 WriteBatch
* updates
) {
75 if (opts
.protection_bytes_per_key
> 0) {
76 s
= WriteBatchInternal::UpdateProtectionInfo(
77 updates
, opts
.protection_bytes_per_key
);
80 // Need to lock all keys in this batch to prevent write conflicts with
81 // concurrent transactions.
82 Transaction
* txn
= BeginInternalTransaction(opts
);
83 txn
->DisableIndexing();
85 auto txn_impl
= static_cast_with_check
<PessimisticTransaction
>(txn
);
87 // Since commitBatch sorts the keys before locking, concurrent Write()
88 // operations will not cause a deadlock.
89 // In order to avoid a deadlock with a concurrent Transaction,
90 // Transactions should use a lock timeout.
91 s
= txn_impl
->CommitBatch(updates
);
99 using StackableDB::CreateColumnFamily
;
100 virtual Status
CreateColumnFamily(const ColumnFamilyOptions
& options
,
101 const std::string
& column_family_name
,
102 ColumnFamilyHandle
** handle
) override
;
104 Status
CreateColumnFamilies(
105 const ColumnFamilyOptions
& options
,
106 const std::vector
<std::string
>& column_family_names
,
107 std::vector
<ColumnFamilyHandle
*>* handles
) override
;
109 Status
CreateColumnFamilies(
110 const std::vector
<ColumnFamilyDescriptor
>& column_families
,
111 std::vector
<ColumnFamilyHandle
*>* handles
) override
;
113 using StackableDB::DropColumnFamily
;
114 virtual Status
DropColumnFamily(ColumnFamilyHandle
* column_family
) override
;
116 Status
DropColumnFamilies(
117 const std::vector
<ColumnFamilyHandle
*>& column_families
) override
;
119 Status
TryLock(PessimisticTransaction
* txn
, uint32_t cfh_id
,
120 const std::string
& key
, bool exclusive
);
121 Status
TryRangeLock(PessimisticTransaction
* txn
, uint32_t cfh_id
,
122 const Endpoint
& start_endp
, const Endpoint
& end_endp
);
124 void UnLock(PessimisticTransaction
* txn
, const LockTracker
& keys
);
125 void UnLock(PessimisticTransaction
* txn
, uint32_t cfh_id
,
126 const std::string
& key
);
128 void AddColumnFamily(const ColumnFamilyHandle
* handle
);
130 static TransactionDBOptions
ValidateTxnDBOptions(
131 const TransactionDBOptions
& txn_db_options
);
133 const TransactionDBOptions
& GetTxnDBOptions() const {
134 return txn_db_options_
;
137 void InsertExpirableTransaction(TransactionID tx_id
,
138 PessimisticTransaction
* tx
);
139 void RemoveExpirableTransaction(TransactionID tx_id
);
141 // If transaction is no longer available, locks can be stolen
142 // If transaction is available, try stealing locks directly from transaction
143 // It is the caller's responsibility to ensure that the referred transaction
144 // is expirable (GetExpirationTime() > 0) and that it is expired.
145 bool TryStealingExpiredTransactionLocks(TransactionID tx_id
);
147 Transaction
* GetTransactionByName(const TransactionName
& name
) override
;
149 void RegisterTransaction(Transaction
* txn
);
150 void UnregisterTransaction(Transaction
* txn
);
152 // not thread safe. current use case is during recovery (single thread)
153 void GetAllPreparedTransactions(std::vector
<Transaction
*>* trans
) override
;
155 LockManager::PointLockStatus
GetLockStatusData() override
;
157 std::vector
<DeadlockPath
> GetDeadlockInfoBuffer() override
;
158 void SetDeadlockInfoBufferSize(uint32_t target_size
) override
;
160 // The default implementation does nothing. The actual implementation is moved
161 // to the child classes that actually need this information. This was due to
162 // an odd performance drop we observed when the added std::atomic member to
163 // the base class even when the subclass do not read it in the fast path.
164 virtual void UpdateCFComparatorMap(const std::vector
<ColumnFamilyHandle
*>&) {}
165 virtual void UpdateCFComparatorMap(ColumnFamilyHandle
*) {}
167 // Use the returned factory to create LockTrackers in transactions.
168 const LockTrackerFactory
& GetLockTrackerFactory() const {
169 return lock_manager_
->GetLockTrackerFactory();
172 std::pair
<Status
, std::shared_ptr
<const Snapshot
>> CreateTimestampedSnapshot(
173 TxnTimestamp ts
) override
;
175 std::shared_ptr
<const Snapshot
> GetTimestampedSnapshot(
176 TxnTimestamp ts
) const override
;
178 void ReleaseTimestampedSnapshotsOlderThan(TxnTimestamp ts
) override
;
180 Status
GetTimestampedSnapshots(TxnTimestamp ts_lb
, TxnTimestamp ts_ub
,
181 std::vector
<std::shared_ptr
<const Snapshot
>>&
182 timestamped_snapshots
) const override
;
186 std::shared_ptr
<Logger
> info_log_
;
187 const TransactionDBOptions txn_db_options_
;
189 static Status
FailIfBatchHasTs(const WriteBatch
* wb
);
191 static Status
FailIfCfEnablesTs(const DB
* db
,
192 const ColumnFamilyHandle
* column_family
);
194 void ReinitializeTransaction(
195 Transaction
* txn
, const WriteOptions
& write_options
,
196 const TransactionOptions
& txn_options
= TransactionOptions());
198 virtual Status
VerifyCFOptions(const ColumnFamilyOptions
& cf_options
);
201 friend class WritePreparedTxnDB
;
202 friend class WritePreparedTxnDBMock
;
203 friend class WriteUnpreparedTxn
;
204 friend class TransactionTest_DoubleCrashInRecovery_Test
;
205 friend class TransactionTest_DoubleEmptyWrite_Test
;
206 friend class TransactionTest_DuplicateKeys_Test
;
207 friend class TransactionTest_PersistentTwoPhaseTransactionTest_Test
;
208 friend class TransactionTest_TwoPhaseDoubleRecoveryTest_Test
;
209 friend class TransactionTest_TwoPhaseOutOfOrderDelete_Test
;
210 friend class TransactionStressTest_TwoPhaseLongPrepareTest_Test
;
211 friend class WriteUnpreparedTransactionTest_RecoveryTest_Test
;
212 friend class WriteUnpreparedTransactionTest_MarkLogWithPrepSection_Test
;
214 Transaction
* BeginInternalTransaction(const WriteOptions
& options
);
216 std::shared_ptr
<LockManager
> lock_manager_
;
218 // Must be held when adding/dropping column families.
219 InstrumentedMutex column_family_mutex_
;
221 // Used to ensure that no locks are stolen from an expirable transaction
222 // that has started a commit. Only transactions with an expiration time
223 // should be in this map.
224 std::mutex map_mutex_
;
225 std::unordered_map
<TransactionID
, PessimisticTransaction
*>
226 expirable_transactions_map_
;
228 // map from name to two phase transaction instance
229 std::mutex name_map_mutex_
;
230 std::unordered_map
<TransactionName
, Transaction
*> transactions_
;
232 // Signal that we are testing a crash scenario. Some asserts could be relaxed
234 virtual void TEST_Crash() {}
237 // A PessimisticTransactionDB that writes the data to the DB after the commit.
238 // In this way the DB only contains the committed data.
239 class WriteCommittedTxnDB
: public PessimisticTransactionDB
{
241 explicit WriteCommittedTxnDB(DB
* db
,
242 const TransactionDBOptions
& txn_db_options
)
243 : PessimisticTransactionDB(db
, txn_db_options
) {}
245 explicit WriteCommittedTxnDB(StackableDB
* db
,
246 const TransactionDBOptions
& txn_db_options
)
247 : PessimisticTransactionDB(db
, txn_db_options
) {}
249 virtual ~WriteCommittedTxnDB() {}
251 Transaction
* BeginTransaction(const WriteOptions
& write_options
,
252 const TransactionOptions
& txn_options
,
253 Transaction
* old_txn
) override
;
255 // Optimized version of ::Write that makes use of skip_concurrency_control
257 using TransactionDB::Write
;
258 virtual Status
Write(const WriteOptions
& opts
,
259 const TransactionDBWriteOptimizations
& optimizations
,
260 WriteBatch
* updates
) override
;
261 virtual Status
Write(const WriteOptions
& opts
, WriteBatch
* updates
) override
;
264 inline Status
PessimisticTransactionDB::FailIfBatchHasTs(
265 const WriteBatch
* batch
) {
266 if (batch
!= nullptr && WriteBatchInternal::HasKeyWithTimestamp(*batch
)) {
267 return Status::NotSupported(
268 "Writes with timestamp must go through transaction API instead of "
274 inline Status
PessimisticTransactionDB::FailIfCfEnablesTs(
275 const DB
* db
, const ColumnFamilyHandle
* column_family
) {
277 column_family
= column_family
? column_family
: db
->DefaultColumnFamily();
278 assert(column_family
);
279 const Comparator
* const ucmp
= column_family
->GetComparator();
281 if (ucmp
->timestamp_size() > 0) {
282 return Status::NotSupported(
283 "Write operation with user timestamp must go through the transaction "
284 "API instead of TransactionDB.");
289 class SnapshotCreationCallback
: public PostMemTableCallback
{
291 explicit SnapshotCreationCallback(
292 DBImpl
* dbi
, TxnTimestamp commit_ts
,
293 const std::shared_ptr
<TransactionNotifier
>& notifier
,
294 std::shared_ptr
<const Snapshot
>& snapshot
)
296 commit_ts_(commit_ts
),
297 snapshot_notifier_(notifier
),
298 snapshot_(snapshot
) {
302 ~SnapshotCreationCallback() override
{
303 snapshot_creation_status_
.PermitUncheckedError();
306 Status
operator()(SequenceNumber seq
, bool disable_memtable
) override
;
309 DBImpl
* const db_impl_
;
310 const TxnTimestamp commit_ts_
;
311 std::shared_ptr
<TransactionNotifier
> snapshot_notifier_
;
312 std::shared_ptr
<const Snapshot
>& snapshot_
;
314 Status snapshot_creation_status_
;
317 } // namespace ROCKSDB_NAMESPACE
318 #endif // ROCKSDB_LITE