]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | |
6 | #ifndef ROCKSDB_LITE | |
7 | ||
11fdf7f2 | 8 | #include "utilities/transactions/optimistic_transaction.h" |
7c673cae | 9 | |
7c673cae | 10 | #include <string> |
7c673cae FG |
11 | |
12 | #include "db/column_family.h" | |
f67539c2 | 13 | #include "db/db_impl/db_impl.h" |
7c673cae FG |
14 | #include "rocksdb/comparator.h" |
15 | #include "rocksdb/db.h" | |
16 | #include "rocksdb/status.h" | |
17 | #include "rocksdb/utilities/optimistic_transaction_db.h" | |
11fdf7f2 | 18 | #include "util/cast_util.h" |
7c673cae | 19 | #include "util/string_util.h" |
20effc67 | 20 | #include "utilities/transactions/lock/point/point_lock_tracker.h" |
f67539c2 TL |
21 | #include "utilities/transactions/optimistic_transaction.h" |
22 | #include "utilities/transactions/optimistic_transaction_db_impl.h" | |
20effc67 | 23 | #include "utilities/transactions/transaction_util.h" |
7c673cae | 24 | |
f67539c2 | 25 | namespace ROCKSDB_NAMESPACE { |
7c673cae FG |
26 | |
27 | struct WriteOptions; | |
28 | ||
11fdf7f2 | 29 | OptimisticTransaction::OptimisticTransaction( |
7c673cae FG |
30 | OptimisticTransactionDB* txn_db, const WriteOptions& write_options, |
31 | const OptimisticTransactionOptions& txn_options) | |
20effc67 TL |
32 | : TransactionBaseImpl(txn_db->GetBaseDB(), write_options, |
33 | PointLockTrackerFactory::Get()), | |
34 | txn_db_(txn_db) { | |
7c673cae FG |
35 | Initialize(txn_options); |
36 | } | |
37 | ||
11fdf7f2 | 38 | void OptimisticTransaction::Initialize( |
7c673cae FG |
39 | const OptimisticTransactionOptions& txn_options) { |
40 | if (txn_options.set_snapshot) { | |
41 | SetSnapshot(); | |
42 | } | |
43 | } | |
44 | ||
11fdf7f2 | 45 | void OptimisticTransaction::Reinitialize( |
7c673cae FG |
46 | OptimisticTransactionDB* txn_db, const WriteOptions& write_options, |
47 | const OptimisticTransactionOptions& txn_options) { | |
48 | TransactionBaseImpl::Reinitialize(txn_db->GetBaseDB(), write_options); | |
49 | Initialize(txn_options); | |
50 | } | |
51 | ||
11fdf7f2 | 52 | OptimisticTransaction::~OptimisticTransaction() {} |
7c673cae | 53 | |
11fdf7f2 | 54 | void OptimisticTransaction::Clear() { TransactionBaseImpl::Clear(); } |
7c673cae | 55 | |
11fdf7f2 | 56 | Status OptimisticTransaction::Prepare() { |
7c673cae FG |
57 | return Status::InvalidArgument( |
58 | "Two phase commit not supported for optimistic transactions."); | |
59 | } | |
60 | ||
11fdf7f2 | 61 | Status OptimisticTransaction::Commit() { |
f67539c2 TL |
62 | auto txn_db_impl = static_cast_with_check<OptimisticTransactionDBImpl, |
63 | OptimisticTransactionDB>(txn_db_); | |
64 | assert(txn_db_impl); | |
65 | switch (txn_db_impl->GetValidatePolicy()) { | |
66 | case OccValidationPolicy::kValidateParallel: | |
67 | return CommitWithParallelValidate(); | |
68 | case OccValidationPolicy::kValidateSerial: | |
69 | return CommitWithSerialValidate(); | |
70 | default: | |
71 | assert(0); | |
72 | } | |
73 | // unreachable, just void compiler complain | |
74 | return Status::OK(); | |
75 | } | |
76 | ||
77 | Status OptimisticTransaction::CommitWithSerialValidate() { | |
7c673cae FG |
78 | // Set up callback which will call CheckTransactionForConflicts() to |
79 | // check whether this transaction is safe to be committed. | |
80 | OptimisticTransactionCallback callback(this); | |
81 | ||
20effc67 | 82 | DBImpl* db_impl = static_cast_with_check<DBImpl>(db_->GetRootDB()); |
7c673cae FG |
83 | |
84 | Status s = db_impl->WriteWithCallback( | |
85 | write_options_, GetWriteBatch()->GetWriteBatch(), &callback); | |
86 | ||
87 | if (s.ok()) { | |
88 | Clear(); | |
89 | } | |
90 | ||
91 | return s; | |
92 | } | |
93 | ||
f67539c2 TL |
94 | Status OptimisticTransaction::CommitWithParallelValidate() { |
95 | auto txn_db_impl = static_cast_with_check<OptimisticTransactionDBImpl, | |
96 | OptimisticTransactionDB>(txn_db_); | |
97 | assert(txn_db_impl); | |
20effc67 | 98 | DBImpl* db_impl = static_cast_with_check<DBImpl>(db_->GetRootDB()); |
f67539c2 TL |
99 | assert(db_impl); |
100 | const size_t space = txn_db_impl->GetLockBucketsSize(); | |
101 | std::set<size_t> lk_idxes; | |
102 | std::vector<std::unique_lock<std::mutex>> lks; | |
20effc67 TL |
103 | std::unique_ptr<LockTracker::ColumnFamilyIterator> cf_it( |
104 | tracked_locks_->GetColumnFamilyIterator()); | |
105 | assert(cf_it != nullptr); | |
106 | while (cf_it->HasNext()) { | |
107 | ColumnFamilyId cf = cf_it->Next(); | |
108 | std::unique_ptr<LockTracker::KeyIterator> key_it( | |
109 | tracked_locks_->GetKeyIterator(cf)); | |
110 | assert(key_it != nullptr); | |
111 | while (key_it->HasNext()) { | |
112 | const std::string& key = key_it->Next(); | |
113 | lk_idxes.insert(FastRange64(GetSliceNPHash64(key), space)); | |
f67539c2 TL |
114 | } |
115 | } | |
116 | // NOTE: in a single txn, all bucket-locks are taken in ascending order. | |
117 | // In this way, txns from different threads all obey this rule so that | |
118 | // deadlock can be avoided. | |
119 | for (auto v : lk_idxes) { | |
120 | lks.emplace_back(txn_db_impl->LockBucket(v)); | |
121 | } | |
122 | ||
20effc67 | 123 | Status s = TransactionUtil::CheckKeysForConflicts(db_impl, *tracked_locks_, |
f67539c2 TL |
124 | true /* cache_only */); |
125 | if (!s.ok()) { | |
126 | return s; | |
127 | } | |
128 | ||
129 | s = db_impl->Write(write_options_, GetWriteBatch()->GetWriteBatch()); | |
130 | if (s.ok()) { | |
131 | Clear(); | |
132 | } | |
133 | ||
134 | return s; | |
135 | } | |
136 | ||
11fdf7f2 | 137 | Status OptimisticTransaction::Rollback() { |
7c673cae FG |
138 | Clear(); |
139 | return Status::OK(); | |
140 | } | |
141 | ||
142 | // Record this key so that we can check it for conflicts at commit time. | |
143 | // | |
144 | // 'exclusive' is unused for OptimisticTransaction. | |
11fdf7f2 TL |
145 | Status OptimisticTransaction::TryLock(ColumnFamilyHandle* column_family, |
146 | const Slice& key, bool read_only, | |
494da23a TL |
147 | bool exclusive, const bool do_validate, |
148 | const bool assume_tracked) { | |
149 | assert(!assume_tracked); // not supported | |
150 | (void)assume_tracked; | |
151 | if (!do_validate) { | |
7c673cae FG |
152 | return Status::OK(); |
153 | } | |
154 | uint32_t cfh_id = GetColumnFamilyID(column_family); | |
155 | ||
156 | SetSnapshotIfNeeded(); | |
157 | ||
158 | SequenceNumber seq; | |
159 | if (snapshot_) { | |
160 | seq = snapshot_->GetSequenceNumber(); | |
161 | } else { | |
162 | seq = db_->GetLatestSequenceNumber(); | |
163 | } | |
164 | ||
165 | std::string key_str = key.ToString(); | |
166 | ||
167 | TrackKey(cfh_id, key_str, seq, read_only, exclusive); | |
168 | ||
169 | // Always return OK. Confilct checking will happen at commit time. | |
170 | return Status::OK(); | |
171 | } | |
172 | ||
173 | // Returns OK if it is safe to commit this transaction. Returns Status::Busy | |
174 | // if there are read or write conflicts that would prevent us from committing OR | |
175 | // if we can not determine whether there would be any such conflicts. | |
176 | // | |
177 | // Should only be called on writer thread in order to avoid any race conditions | |
178 | // in detecting write conflicts. | |
11fdf7f2 | 179 | Status OptimisticTransaction::CheckTransactionForConflicts(DB* db) { |
7c673cae FG |
180 | Status result; |
181 | ||
20effc67 | 182 | auto db_impl = static_cast_with_check<DBImpl>(db); |
7c673cae FG |
183 | |
184 | // Since we are on the write thread and do not want to block other writers, | |
185 | // we will do a cache-only conflict check. This can result in TryAgain | |
186 | // getting returned if there is not sufficient memtable history to check | |
187 | // for conflicts. | |
20effc67 | 188 | return TransactionUtil::CheckKeysForConflicts(db_impl, *tracked_locks_, |
7c673cae FG |
189 | true /* cache_only */); |
190 | } | |
191 | ||
11fdf7f2 | 192 | Status OptimisticTransaction::SetName(const TransactionName& /* unused */) { |
7c673cae FG |
193 | return Status::InvalidArgument("Optimistic transactions cannot be named."); |
194 | } | |
195 | ||
f67539c2 | 196 | } // namespace ROCKSDB_NAMESPACE |
7c673cae FG |
197 | |
198 | #endif // ROCKSDB_LITE |