1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #include "utilities/transactions/optimistic_transaction.h"
12 #include "db/column_family.h"
13 #include "db/db_impl/db_impl.h"
14 #include "rocksdb/comparator.h"
15 #include "rocksdb/db.h"
16 #include "rocksdb/status.h"
17 #include "rocksdb/utilities/optimistic_transaction_db.h"
18 #include "util/cast_util.h"
19 #include "util/string_util.h"
20 #include "utilities/transactions/lock/point/point_lock_tracker.h"
21 #include "utilities/transactions/optimistic_transaction.h"
22 #include "utilities/transactions/optimistic_transaction_db_impl.h"
23 #include "utilities/transactions/transaction_util.h"
25 namespace ROCKSDB_NAMESPACE
{
29 OptimisticTransaction::OptimisticTransaction(
30 OptimisticTransactionDB
* txn_db
, const WriteOptions
& write_options
,
31 const OptimisticTransactionOptions
& txn_options
)
32 : TransactionBaseImpl(txn_db
->GetBaseDB(), write_options
,
33 PointLockTrackerFactory::Get()),
35 Initialize(txn_options
);
38 void OptimisticTransaction::Initialize(
39 const OptimisticTransactionOptions
& txn_options
) {
40 if (txn_options
.set_snapshot
) {
45 void OptimisticTransaction::Reinitialize(
46 OptimisticTransactionDB
* txn_db
, const WriteOptions
& write_options
,
47 const OptimisticTransactionOptions
& txn_options
) {
48 TransactionBaseImpl::Reinitialize(txn_db
->GetBaseDB(), write_options
);
49 Initialize(txn_options
);
52 OptimisticTransaction::~OptimisticTransaction() {}
54 void OptimisticTransaction::Clear() { TransactionBaseImpl::Clear(); }
56 Status
OptimisticTransaction::Prepare() {
57 return Status::InvalidArgument(
58 "Two phase commit not supported for optimistic transactions.");
61 Status
OptimisticTransaction::Commit() {
62 auto txn_db_impl
= static_cast_with_check
<OptimisticTransactionDBImpl
,
63 OptimisticTransactionDB
>(txn_db_
);
65 switch (txn_db_impl
->GetValidatePolicy()) {
66 case OccValidationPolicy::kValidateParallel
:
67 return CommitWithParallelValidate();
68 case OccValidationPolicy::kValidateSerial
:
69 return CommitWithSerialValidate();
73 // unreachable, just void compiler complain
77 Status
OptimisticTransaction::CommitWithSerialValidate() {
78 // Set up callback which will call CheckTransactionForConflicts() to
79 // check whether this transaction is safe to be committed.
80 OptimisticTransactionCallback
callback(this);
82 DBImpl
* db_impl
= static_cast_with_check
<DBImpl
>(db_
->GetRootDB());
84 Status s
= db_impl
->WriteWithCallback(
85 write_options_
, GetWriteBatch()->GetWriteBatch(), &callback
);
94 Status
OptimisticTransaction::CommitWithParallelValidate() {
95 auto txn_db_impl
= static_cast_with_check
<OptimisticTransactionDBImpl
,
96 OptimisticTransactionDB
>(txn_db_
);
98 DBImpl
* db_impl
= static_cast_with_check
<DBImpl
>(db_
->GetRootDB());
100 const size_t space
= txn_db_impl
->GetLockBucketsSize();
101 std::set
<size_t> lk_idxes
;
102 std::vector
<std::unique_lock
<std::mutex
>> lks
;
103 std::unique_ptr
<LockTracker::ColumnFamilyIterator
> cf_it(
104 tracked_locks_
->GetColumnFamilyIterator());
105 assert(cf_it
!= nullptr);
106 while (cf_it
->HasNext()) {
107 ColumnFamilyId cf
= cf_it
->Next();
108 std::unique_ptr
<LockTracker::KeyIterator
> key_it(
109 tracked_locks_
->GetKeyIterator(cf
));
110 assert(key_it
!= nullptr);
111 while (key_it
->HasNext()) {
112 const std::string
& key
= key_it
->Next();
113 lk_idxes
.insert(FastRange64(GetSliceNPHash64(key
), space
));
116 // NOTE: in a single txn, all bucket-locks are taken in ascending order.
117 // In this way, txns from different threads all obey this rule so that
118 // deadlock can be avoided.
119 for (auto v
: lk_idxes
) {
120 lks
.emplace_back(txn_db_impl
->LockBucket(v
));
123 Status s
= TransactionUtil::CheckKeysForConflicts(db_impl
, *tracked_locks_
,
124 true /* cache_only */);
129 s
= db_impl
->Write(write_options_
, GetWriteBatch()->GetWriteBatch());
137 Status
OptimisticTransaction::Rollback() {
142 // Record this key so that we can check it for conflicts at commit time.
144 // 'exclusive' is unused for OptimisticTransaction.
145 Status
OptimisticTransaction::TryLock(ColumnFamilyHandle
* column_family
,
146 const Slice
& key
, bool read_only
,
147 bool exclusive
, const bool do_validate
,
148 const bool assume_tracked
) {
149 assert(!assume_tracked
); // not supported
150 (void)assume_tracked
;
154 uint32_t cfh_id
= GetColumnFamilyID(column_family
);
156 SetSnapshotIfNeeded();
160 seq
= snapshot_
->GetSequenceNumber();
162 seq
= db_
->GetLatestSequenceNumber();
165 std::string key_str
= key
.ToString();
167 TrackKey(cfh_id
, key_str
, seq
, read_only
, exclusive
);
169 // Always return OK. Confilct checking will happen at commit time.
173 // Returns OK if it is safe to commit this transaction. Returns Status::Busy
174 // if there are read or write conflicts that would prevent us from committing OR
175 // if we can not determine whether there would be any such conflicts.
177 // Should only be called on writer thread in order to avoid any race conditions
178 // in detecting write conflicts.
179 Status
OptimisticTransaction::CheckTransactionForConflicts(DB
* db
) {
182 auto db_impl
= static_cast_with_check
<DBImpl
>(db
);
184 // Since we are on the write thread and do not want to block other writers,
185 // we will do a cache-only conflict check. This can result in TryAgain
186 // getting returned if there is not sufficient memtable history to check
188 return TransactionUtil::CheckKeysForConflicts(db_impl
, *tracked_locks_
,
189 true /* cache_only */);
192 Status
OptimisticTransaction::SetName(const TransactionName
& /* unused */) {
193 return Status::InvalidArgument("Optimistic transactions cannot be named.");
196 } // namespace ROCKSDB_NAMESPACE
198 #endif // ROCKSDB_LITE