]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | |
6 | #ifndef ROCKSDB_LITE | |
7 | ||
11fdf7f2 TL |
8 | #include "utilities/transactions/pessimistic_transaction_db.h" |
9 | ||
f67539c2 | 10 | #include <cinttypes> |
7c673cae FG |
11 | #include <string> |
12 | #include <unordered_set> | |
13 | #include <vector> | |
14 | ||
f67539c2 | 15 | #include "db/db_impl/db_impl.h" |
7c673cae FG |
16 | #include "rocksdb/db.h" |
17 | #include "rocksdb/options.h" | |
18 | #include "rocksdb/utilities/transaction_db.h" | |
f67539c2 | 19 | #include "test_util/sync_point.h" |
11fdf7f2 TL |
20 | #include "util/cast_util.h" |
21 | #include "util/mutexlock.h" | |
11fdf7f2 | 22 | #include "utilities/transactions/pessimistic_transaction.h" |
7c673cae | 23 | #include "utilities/transactions/transaction_db_mutex_impl.h" |
11fdf7f2 TL |
24 | #include "utilities/transactions/write_prepared_txn_db.h" |
25 | #include "utilities/transactions/write_unprepared_txn_db.h" | |
7c673cae | 26 | |
f67539c2 | 27 | namespace ROCKSDB_NAMESPACE { |
7c673cae | 28 | |
11fdf7f2 TL |
29 | PessimisticTransactionDB::PessimisticTransactionDB( |
30 | DB* db, const TransactionDBOptions& txn_db_options) | |
7c673cae | 31 | : TransactionDB(db), |
20effc67 | 32 | db_impl_(static_cast_with_check<DBImpl>(db)), |
7c673cae | 33 | txn_db_options_(txn_db_options), |
20effc67 | 34 | lock_manager_(NewLockManager(this, txn_db_options)) { |
7c673cae | 35 | assert(db_impl_ != nullptr); |
11fdf7f2 | 36 | info_log_ = db_impl_->GetDBOptions().info_log; |
7c673cae FG |
37 | } |
38 | ||
11fdf7f2 | 39 | // Support initiliazing PessimisticTransactionDB from a stackable db |
7c673cae | 40 | // |
11fdf7f2 | 41 | // PessimisticTransactionDB |
7c673cae FG |
42 | // ^ ^ |
43 | // | | | |
44 | // | + | |
45 | // | StackableDB | |
46 | // | ^ | |
47 | // | | | |
48 | // + + | |
49 | // DBImpl | |
50 | // ^ | |
51 | // |(inherit) | |
52 | // + | |
53 | // DB | |
54 | // | |
11fdf7f2 TL |
55 | PessimisticTransactionDB::PessimisticTransactionDB( |
56 | StackableDB* db, const TransactionDBOptions& txn_db_options) | |
7c673cae | 57 | : TransactionDB(db), |
20effc67 | 58 | db_impl_(static_cast_with_check<DBImpl>(db->GetRootDB())), |
7c673cae | 59 | txn_db_options_(txn_db_options), |
20effc67 | 60 | lock_manager_(NewLockManager(this, txn_db_options)) { |
7c673cae FG |
61 | assert(db_impl_ != nullptr); |
62 | } | |
63 | ||
11fdf7f2 | 64 | PessimisticTransactionDB::~PessimisticTransactionDB() { |
7c673cae FG |
65 | while (!transactions_.empty()) { |
66 | delete transactions_.begin()->second; | |
11fdf7f2 TL |
67 | // TODO(myabandeh): this seems to be an unsafe approach as it is not quite |
68 | // clear whether delete would also remove the entry from transactions_. | |
7c673cae FG |
69 | } |
70 | } | |
71 | ||
11fdf7f2 TL |
72 | Status PessimisticTransactionDB::VerifyCFOptions(const ColumnFamilyOptions&) { |
73 | return Status::OK(); | |
74 | } | |
75 | ||
76 | Status PessimisticTransactionDB::Initialize( | |
7c673cae FG |
77 | const std::vector<size_t>& compaction_enabled_cf_indices, |
78 | const std::vector<ColumnFamilyHandle*>& handles) { | |
79 | for (auto cf_ptr : handles) { | |
80 | AddColumnFamily(cf_ptr); | |
81 | } | |
11fdf7f2 TL |
82 | // Verify cf options |
83 | for (auto handle : handles) { | |
84 | ColumnFamilyDescriptor cfd; | |
85 | Status s = handle->GetDescriptor(&cfd); | |
86 | if (!s.ok()) { | |
87 | return s; | |
88 | } | |
89 | s = VerifyCFOptions(cfd.options); | |
90 | if (!s.ok()) { | |
91 | return s; | |
92 | } | |
93 | } | |
94 | ||
7c673cae FG |
95 | // Re-enable compaction for the column families that initially had |
96 | // compaction enabled. | |
97 | std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles; | |
98 | compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size()); | |
99 | for (auto index : compaction_enabled_cf_indices) { | |
100 | compaction_enabled_cf_handles.push_back(handles[index]); | |
101 | } | |
102 | ||
103 | Status s = EnableAutoCompaction(compaction_enabled_cf_handles); | |
104 | ||
105 | // create 'real' transactions from recovered shell transactions | |
20effc67 | 106 | auto dbimpl = static_cast_with_check<DBImpl>(GetRootDB()); |
7c673cae FG |
107 | assert(dbimpl != nullptr); |
108 | auto rtrxs = dbimpl->recovered_transactions(); | |
109 | ||
f67539c2 | 110 | for (auto it = rtrxs.begin(); it != rtrxs.end(); ++it) { |
7c673cae FG |
111 | auto recovered_trx = it->second; |
112 | assert(recovered_trx); | |
11fdf7f2 TL |
113 | assert(recovered_trx->batches_.size() == 1); |
114 | const auto& seq = recovered_trx->batches_.begin()->first; | |
115 | const auto& batch_info = recovered_trx->batches_.begin()->second; | |
116 | assert(batch_info.log_number_); | |
7c673cae FG |
117 | assert(recovered_trx->name_.length()); |
118 | ||
119 | WriteOptions w_options; | |
120 | w_options.sync = true; | |
121 | TransactionOptions t_options; | |
11fdf7f2 TL |
122 | // This would help avoiding deadlock for keys that although exist in the WAL |
123 | // did not go through concurrency control. This includes the merge that | |
124 | // MyRocks uses for auto-inc columns. It is safe to do so, since (i) if | |
125 | // there is a conflict between the keys of two transactions that must be | |
126 | // avoided, it is already avoided by the application, MyRocks, before the | |
127 | // restart (ii) application, MyRocks, guarntees to rollback/commit the | |
128 | // recovered transactions before new transactions start. | |
129 | t_options.skip_concurrency_control = true; | |
7c673cae FG |
130 | |
131 | Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr); | |
132 | assert(real_trx); | |
11fdf7f2 TL |
133 | real_trx->SetLogNumber(batch_info.log_number_); |
134 | assert(seq != kMaxSequenceNumber); | |
494da23a TL |
135 | if (GetTxnDBOptions().write_policy != WRITE_COMMITTED) { |
136 | real_trx->SetId(seq); | |
137 | } | |
7c673cae FG |
138 | |
139 | s = real_trx->SetName(recovered_trx->name_); | |
140 | if (!s.ok()) { | |
141 | break; | |
142 | } | |
143 | ||
11fdf7f2 TL |
144 | s = real_trx->RebuildFromWriteBatch(batch_info.batch_); |
145 | // WriteCommitted set this to to disable this check that is specific to | |
146 | // WritePrepared txns | |
147 | assert(batch_info.batch_cnt_ == 0 || | |
148 | real_trx->GetWriteBatch()->SubBatchCnt() == batch_info.batch_cnt_); | |
7c673cae FG |
149 | real_trx->SetState(Transaction::PREPARED); |
150 | if (!s.ok()) { | |
151 | break; | |
152 | } | |
153 | } | |
154 | if (s.ok()) { | |
155 | dbimpl->DeleteAllRecoveredTransactions(); | |
156 | } | |
157 | return s; | |
158 | } | |
159 | ||
11fdf7f2 | 160 | Transaction* WriteCommittedTxnDB::BeginTransaction( |
7c673cae FG |
161 | const WriteOptions& write_options, const TransactionOptions& txn_options, |
162 | Transaction* old_txn) { | |
163 | if (old_txn != nullptr) { | |
164 | ReinitializeTransaction(old_txn, write_options, txn_options); | |
165 | return old_txn; | |
166 | } else { | |
11fdf7f2 | 167 | return new WriteCommittedTxn(this, write_options, txn_options); |
7c673cae FG |
168 | } |
169 | } | |
170 | ||
11fdf7f2 | 171 | TransactionDBOptions PessimisticTransactionDB::ValidateTxnDBOptions( |
7c673cae FG |
172 | const TransactionDBOptions& txn_db_options) { |
173 | TransactionDBOptions validated = txn_db_options; | |
174 | ||
175 | if (txn_db_options.num_stripes == 0) { | |
176 | validated.num_stripes = 1; | |
177 | } | |
178 | ||
179 | return validated; | |
180 | } | |
181 | ||
182 | Status TransactionDB::Open(const Options& options, | |
183 | const TransactionDBOptions& txn_db_options, | |
184 | const std::string& dbname, TransactionDB** dbptr) { | |
185 | DBOptions db_options(options); | |
186 | ColumnFamilyOptions cf_options(options); | |
187 | std::vector<ColumnFamilyDescriptor> column_families; | |
188 | column_families.push_back( | |
189 | ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); | |
190 | std::vector<ColumnFamilyHandle*> handles; | |
191 | Status s = TransactionDB::Open(db_options, txn_db_options, dbname, | |
192 | column_families, &handles, dbptr); | |
193 | if (s.ok()) { | |
194 | assert(handles.size() == 1); | |
195 | // i can delete the handle since DBImpl is always holding a reference to | |
196 | // default column family | |
197 | delete handles[0]; | |
198 | } | |
199 | ||
200 | return s; | |
201 | } | |
202 | ||
203 | Status TransactionDB::Open( | |
204 | const DBOptions& db_options, const TransactionDBOptions& txn_db_options, | |
205 | const std::string& dbname, | |
206 | const std::vector<ColumnFamilyDescriptor>& column_families, | |
207 | std::vector<ColumnFamilyHandle*>* handles, TransactionDB** dbptr) { | |
208 | Status s; | |
11fdf7f2 | 209 | DB* db = nullptr; |
f67539c2 TL |
210 | if (txn_db_options.write_policy == WRITE_COMMITTED && |
211 | db_options.unordered_write) { | |
212 | return Status::NotSupported( | |
213 | "WRITE_COMMITTED is incompatible with unordered_writes"); | |
214 | } | |
215 | if (txn_db_options.write_policy == WRITE_UNPREPARED && | |
216 | db_options.unordered_write) { | |
217 | // TODO(lth): support it | |
218 | return Status::NotSupported( | |
219 | "WRITE_UNPREPARED is currently incompatible with unordered_writes"); | |
220 | } | |
221 | if (txn_db_options.write_policy == WRITE_PREPARED && | |
222 | db_options.unordered_write && !db_options.two_write_queues) { | |
223 | return Status::NotSupported( | |
224 | "WRITE_PREPARED is incompatible with unordered_writes if " | |
225 | "two_write_queues is not enabled."); | |
226 | } | |
7c673cae FG |
227 | |
228 | std::vector<ColumnFamilyDescriptor> column_families_copy = column_families; | |
229 | std::vector<size_t> compaction_enabled_cf_indices; | |
230 | DBOptions db_options_2pc = db_options; | |
231 | PrepareWrap(&db_options_2pc, &column_families_copy, | |
232 | &compaction_enabled_cf_indices); | |
11fdf7f2 TL |
233 | const bool use_seq_per_batch = |
234 | txn_db_options.write_policy == WRITE_PREPARED || | |
235 | txn_db_options.write_policy == WRITE_UNPREPARED; | |
236 | const bool use_batch_per_txn = | |
237 | txn_db_options.write_policy == WRITE_COMMITTED || | |
238 | txn_db_options.write_policy == WRITE_PREPARED; | |
239 | s = DBImpl::Open(db_options_2pc, dbname, column_families_copy, handles, &db, | |
240 | use_seq_per_batch, use_batch_per_txn); | |
7c673cae | 241 | if (s.ok()) { |
f67539c2 TL |
242 | ROCKS_LOG_WARN(db->GetDBOptions().info_log, |
243 | "Transaction write_policy is %" PRId32, | |
244 | static_cast<int>(txn_db_options.write_policy)); | |
7c673cae FG |
245 | s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles, |
246 | dbptr); | |
247 | } | |
11fdf7f2 TL |
248 | if (!s.ok()) { |
249 | // just in case it was not deleted (and not set to nullptr). | |
250 | delete db; | |
251 | } | |
7c673cae FG |
252 | return s; |
253 | } | |
254 | ||
255 | void TransactionDB::PrepareWrap( | |
256 | DBOptions* db_options, std::vector<ColumnFamilyDescriptor>* column_families, | |
257 | std::vector<size_t>* compaction_enabled_cf_indices) { | |
258 | compaction_enabled_cf_indices->clear(); | |
259 | ||
260 | // Enable MemTable History if not already enabled | |
261 | for (size_t i = 0; i < column_families->size(); i++) { | |
262 | ColumnFamilyOptions* cf_options = &(*column_families)[i].options; | |
263 | ||
f67539c2 TL |
264 | if (cf_options->max_write_buffer_size_to_maintain == 0 && |
265 | cf_options->max_write_buffer_number_to_maintain == 0) { | |
266 | // Setting to -1 will set the History size to | |
267 | // max_write_buffer_number * write_buffer_size. | |
268 | cf_options->max_write_buffer_size_to_maintain = -1; | |
7c673cae FG |
269 | } |
270 | if (!cf_options->disable_auto_compactions) { | |
271 | // Disable compactions momentarily to prevent race with DB::Open | |
272 | cf_options->disable_auto_compactions = true; | |
273 | compaction_enabled_cf_indices->push_back(i); | |
274 | } | |
275 | } | |
276 | db_options->allow_2pc = true; | |
277 | } | |
278 | ||
279 | Status TransactionDB::WrapDB( | |
280 | // make sure this db is already opened with memtable history enabled, | |
281 | // auto compaction distabled and 2 phase commit enabled | |
282 | DB* db, const TransactionDBOptions& txn_db_options, | |
283 | const std::vector<size_t>& compaction_enabled_cf_indices, | |
284 | const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) { | |
11fdf7f2 TL |
285 | assert(db != nullptr); |
286 | assert(dbptr != nullptr); | |
287 | *dbptr = nullptr; | |
288 | std::unique_ptr<PessimisticTransactionDB> txn_db; | |
289 | switch (txn_db_options.write_policy) { | |
290 | case WRITE_UNPREPARED: | |
291 | txn_db.reset(new WriteUnpreparedTxnDB( | |
292 | db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options))); | |
293 | break; | |
294 | case WRITE_PREPARED: | |
295 | txn_db.reset(new WritePreparedTxnDB( | |
296 | db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options))); | |
297 | break; | |
298 | case WRITE_COMMITTED: | |
299 | default: | |
300 | txn_db.reset(new WriteCommittedTxnDB( | |
301 | db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options))); | |
302 | } | |
303 | txn_db->UpdateCFComparatorMap(handles); | |
7c673cae | 304 | Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles); |
11fdf7f2 TL |
305 | // In case of a failure at this point, db is deleted via the txn_db destructor |
306 | // and set to nullptr. | |
307 | if (s.ok()) { | |
308 | *dbptr = txn_db.release(); | |
309 | } | |
7c673cae FG |
310 | return s; |
311 | } | |
312 | ||
313 | Status TransactionDB::WrapStackableDB( | |
314 | // make sure this stackable_db is already opened with memtable history | |
11fdf7f2 | 315 | // enabled, auto compaction distabled and 2 phase commit enabled |
7c673cae FG |
316 | StackableDB* db, const TransactionDBOptions& txn_db_options, |
317 | const std::vector<size_t>& compaction_enabled_cf_indices, | |
318 | const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) { | |
11fdf7f2 TL |
319 | assert(db != nullptr); |
320 | assert(dbptr != nullptr); | |
321 | *dbptr = nullptr; | |
322 | std::unique_ptr<PessimisticTransactionDB> txn_db; | |
323 | ||
324 | switch (txn_db_options.write_policy) { | |
325 | case WRITE_UNPREPARED: | |
326 | txn_db.reset(new WriteUnpreparedTxnDB( | |
327 | db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options))); | |
328 | break; | |
329 | case WRITE_PREPARED: | |
330 | txn_db.reset(new WritePreparedTxnDB( | |
331 | db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options))); | |
332 | break; | |
333 | case WRITE_COMMITTED: | |
334 | default: | |
335 | txn_db.reset(new WriteCommittedTxnDB( | |
336 | db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options))); | |
337 | } | |
338 | txn_db->UpdateCFComparatorMap(handles); | |
7c673cae | 339 | Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles); |
11fdf7f2 TL |
340 | // In case of a failure at this point, db is deleted via the txn_db destructor |
341 | // and set to nullptr. | |
342 | if (s.ok()) { | |
343 | *dbptr = txn_db.release(); | |
344 | } | |
7c673cae FG |
345 | return s; |
346 | } | |
347 | ||
20effc67 | 348 | // Let LockManager know that this column family exists so it can |
7c673cae | 349 | // allocate a LockMap for it. |
11fdf7f2 TL |
350 | void PessimisticTransactionDB::AddColumnFamily( |
351 | const ColumnFamilyHandle* handle) { | |
20effc67 | 352 | lock_manager_->AddColumnFamily(handle); |
7c673cae FG |
353 | } |
354 | ||
11fdf7f2 | 355 | Status PessimisticTransactionDB::CreateColumnFamily( |
7c673cae FG |
356 | const ColumnFamilyOptions& options, const std::string& column_family_name, |
357 | ColumnFamilyHandle** handle) { | |
358 | InstrumentedMutexLock l(&column_family_mutex_); | |
11fdf7f2 TL |
359 | Status s = VerifyCFOptions(options); |
360 | if (!s.ok()) { | |
361 | return s; | |
362 | } | |
7c673cae | 363 | |
11fdf7f2 | 364 | s = db_->CreateColumnFamily(options, column_family_name, handle); |
7c673cae | 365 | if (s.ok()) { |
20effc67 | 366 | lock_manager_->AddColumnFamily(*handle); |
11fdf7f2 | 367 | UpdateCFComparatorMap(*handle); |
7c673cae FG |
368 | } |
369 | ||
370 | return s; | |
371 | } | |
372 | ||
20effc67 | 373 | // Let LockManager know that it can deallocate the LockMap for this |
7c673cae | 374 | // column family. |
11fdf7f2 TL |
375 | Status PessimisticTransactionDB::DropColumnFamily( |
376 | ColumnFamilyHandle* column_family) { | |
7c673cae FG |
377 | InstrumentedMutexLock l(&column_family_mutex_); |
378 | ||
379 | Status s = db_->DropColumnFamily(column_family); | |
380 | if (s.ok()) { | |
20effc67 | 381 | lock_manager_->RemoveColumnFamily(column_family); |
7c673cae FG |
382 | } |
383 | ||
384 | return s; | |
385 | } | |
386 | ||
11fdf7f2 TL |
387 | Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn, |
388 | uint32_t cfh_id, | |
389 | const std::string& key, | |
390 | bool exclusive) { | |
20effc67 | 391 | return lock_manager_->TryLock(txn, cfh_id, key, GetEnv(), exclusive); |
7c673cae FG |
392 | } |
393 | ||
11fdf7f2 | 394 | void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn, |
20effc67 TL |
395 | const LockTracker& keys) { |
396 | lock_manager_->UnLock(txn, keys, GetEnv()); | |
7c673cae FG |
397 | } |
398 | ||
11fdf7f2 TL |
399 | void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn, |
400 | uint32_t cfh_id, const std::string& key) { | |
20effc67 | 401 | lock_manager_->UnLock(txn, cfh_id, key, GetEnv()); |
7c673cae FG |
402 | } |
403 | ||
404 | // Used when wrapping DB write operations in a transaction | |
11fdf7f2 | 405 | Transaction* PessimisticTransactionDB::BeginInternalTransaction( |
7c673cae FG |
406 | const WriteOptions& options) { |
407 | TransactionOptions txn_options; | |
408 | Transaction* txn = BeginTransaction(options, txn_options, nullptr); | |
409 | ||
410 | // Use default timeout for non-transactional writes | |
411 | txn->SetLockTimeout(txn_db_options_.default_lock_timeout); | |
412 | return txn; | |
413 | } | |
414 | ||
415 | // All user Put, Merge, Delete, and Write requests must be intercepted to make | |
416 | // sure that they lock all keys that they are writing to avoid causing conflicts | |
11fdf7f2 | 417 | // with any concurrent transactions. The easiest way to do this is to wrap all |
7c673cae FG |
418 | // write operations in a transaction. |
419 | // | |
420 | // Put(), Merge(), and Delete() only lock a single key per call. Write() will | |
421 | // sort its keys before locking them. This guarantees that TransactionDB write | |
11fdf7f2 | 422 | // methods cannot deadlock with each other (but still could deadlock with a |
7c673cae | 423 | // Transaction). |
11fdf7f2 TL |
424 | Status PessimisticTransactionDB::Put(const WriteOptions& options, |
425 | ColumnFamilyHandle* column_family, | |
426 | const Slice& key, const Slice& val) { | |
7c673cae FG |
427 | Status s; |
428 | ||
429 | Transaction* txn = BeginInternalTransaction(options); | |
430 | txn->DisableIndexing(); | |
431 | ||
432 | // Since the client didn't create a transaction, they don't care about | |
433 | // conflict checking for this write. So we just need to do PutUntracked(). | |
434 | s = txn->PutUntracked(column_family, key, val); | |
435 | ||
436 | if (s.ok()) { | |
437 | s = txn->Commit(); | |
438 | } | |
439 | ||
440 | delete txn; | |
441 | ||
442 | return s; | |
443 | } | |
444 | ||
11fdf7f2 TL |
445 | Status PessimisticTransactionDB::Delete(const WriteOptions& wopts, |
446 | ColumnFamilyHandle* column_family, | |
447 | const Slice& key) { | |
7c673cae FG |
448 | Status s; |
449 | ||
450 | Transaction* txn = BeginInternalTransaction(wopts); | |
451 | txn->DisableIndexing(); | |
452 | ||
453 | // Since the client didn't create a transaction, they don't care about | |
454 | // conflict checking for this write. So we just need to do | |
455 | // DeleteUntracked(). | |
456 | s = txn->DeleteUntracked(column_family, key); | |
457 | ||
458 | if (s.ok()) { | |
459 | s = txn->Commit(); | |
460 | } | |
461 | ||
462 | delete txn; | |
463 | ||
464 | return s; | |
465 | } | |
466 | ||
11fdf7f2 TL |
467 | Status PessimisticTransactionDB::SingleDelete(const WriteOptions& wopts, |
468 | ColumnFamilyHandle* column_family, | |
469 | const Slice& key) { | |
470 | Status s; | |
471 | ||
472 | Transaction* txn = BeginInternalTransaction(wopts); | |
473 | txn->DisableIndexing(); | |
474 | ||
475 | // Since the client didn't create a transaction, they don't care about | |
476 | // conflict checking for this write. So we just need to do | |
477 | // SingleDeleteUntracked(). | |
478 | s = txn->SingleDeleteUntracked(column_family, key); | |
479 | ||
480 | if (s.ok()) { | |
481 | s = txn->Commit(); | |
482 | } | |
483 | ||
484 | delete txn; | |
485 | ||
486 | return s; | |
487 | } | |
488 | ||
489 | Status PessimisticTransactionDB::Merge(const WriteOptions& options, | |
490 | ColumnFamilyHandle* column_family, | |
491 | const Slice& key, const Slice& value) { | |
7c673cae FG |
492 | Status s; |
493 | ||
494 | Transaction* txn = BeginInternalTransaction(options); | |
495 | txn->DisableIndexing(); | |
496 | ||
497 | // Since the client didn't create a transaction, they don't care about | |
498 | // conflict checking for this write. So we just need to do | |
499 | // MergeUntracked(). | |
500 | s = txn->MergeUntracked(column_family, key, value); | |
501 | ||
502 | if (s.ok()) { | |
503 | s = txn->Commit(); | |
504 | } | |
505 | ||
506 | delete txn; | |
507 | ||
508 | return s; | |
509 | } | |
510 | ||
11fdf7f2 TL |
511 | Status PessimisticTransactionDB::Write(const WriteOptions& opts, |
512 | WriteBatch* updates) { | |
f67539c2 TL |
513 | return WriteWithConcurrencyControl(opts, updates); |
514 | } | |
7c673cae | 515 | |
f67539c2 TL |
516 | Status WriteCommittedTxnDB::Write(const WriteOptions& opts, |
517 | WriteBatch* updates) { | |
518 | if (txn_db_options_.skip_concurrency_control) { | |
519 | return db_impl_->Write(opts, updates); | |
520 | } else { | |
521 | return WriteWithConcurrencyControl(opts, updates); | |
522 | } | |
7c673cae FG |
523 | } |
524 | ||
11fdf7f2 TL |
525 | Status WriteCommittedTxnDB::Write( |
526 | const WriteOptions& opts, | |
527 | const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) { | |
528 | if (optimizations.skip_concurrency_control) { | |
529 | return db_impl_->Write(opts, updates); | |
530 | } else { | |
f67539c2 | 531 | return WriteWithConcurrencyControl(opts, updates); |
11fdf7f2 TL |
532 | } |
533 | } | |
534 | ||
535 | void PessimisticTransactionDB::InsertExpirableTransaction( | |
536 | TransactionID tx_id, PessimisticTransaction* tx) { | |
7c673cae FG |
537 | assert(tx->GetExpirationTime() > 0); |
538 | std::lock_guard<std::mutex> lock(map_mutex_); | |
539 | expirable_transactions_map_.insert({tx_id, tx}); | |
540 | } | |
541 | ||
11fdf7f2 | 542 | void PessimisticTransactionDB::RemoveExpirableTransaction(TransactionID tx_id) { |
7c673cae FG |
543 | std::lock_guard<std::mutex> lock(map_mutex_); |
544 | expirable_transactions_map_.erase(tx_id); | |
545 | } | |
546 | ||
11fdf7f2 | 547 | bool PessimisticTransactionDB::TryStealingExpiredTransactionLocks( |
7c673cae FG |
548 | TransactionID tx_id) { |
549 | std::lock_guard<std::mutex> lock(map_mutex_); | |
550 | ||
551 | auto tx_it = expirable_transactions_map_.find(tx_id); | |
552 | if (tx_it == expirable_transactions_map_.end()) { | |
553 | return true; | |
554 | } | |
11fdf7f2 | 555 | PessimisticTransaction& tx = *(tx_it->second); |
7c673cae FG |
556 | return tx.TryStealingLocks(); |
557 | } | |
558 | ||
11fdf7f2 | 559 | void PessimisticTransactionDB::ReinitializeTransaction( |
7c673cae FG |
560 | Transaction* txn, const WriteOptions& write_options, |
561 | const TransactionOptions& txn_options) { | |
20effc67 | 562 | auto txn_impl = static_cast_with_check<PessimisticTransaction>(txn); |
7c673cae FG |
563 | |
564 | txn_impl->Reinitialize(this, write_options, txn_options); | |
565 | } | |
566 | ||
11fdf7f2 | 567 | Transaction* PessimisticTransactionDB::GetTransactionByName( |
7c673cae FG |
568 | const TransactionName& name) { |
569 | std::lock_guard<std::mutex> lock(name_map_mutex_); | |
570 | auto it = transactions_.find(name); | |
571 | if (it == transactions_.end()) { | |
572 | return nullptr; | |
573 | } else { | |
574 | return it->second; | |
575 | } | |
576 | } | |
577 | ||
11fdf7f2 | 578 | void PessimisticTransactionDB::GetAllPreparedTransactions( |
7c673cae FG |
579 | std::vector<Transaction*>* transv) { |
580 | assert(transv); | |
581 | transv->clear(); | |
582 | std::lock_guard<std::mutex> lock(name_map_mutex_); | |
f67539c2 | 583 | for (auto it = transactions_.begin(); it != transactions_.end(); ++it) { |
7c673cae FG |
584 | if (it->second->GetState() == Transaction::PREPARED) { |
585 | transv->push_back(it->second); | |
586 | } | |
587 | } | |
588 | } | |
589 | ||
20effc67 TL |
590 | LockManager::PointLockStatus PessimisticTransactionDB::GetLockStatusData() { |
591 | return lock_manager_->GetPointLockStatus(); | |
7c673cae FG |
592 | } |
593 | ||
11fdf7f2 | 594 | std::vector<DeadlockPath> PessimisticTransactionDB::GetDeadlockInfoBuffer() { |
20effc67 | 595 | return lock_manager_->GetDeadlockInfoBuffer(); |
11fdf7f2 TL |
596 | } |
597 | ||
598 | void PessimisticTransactionDB::SetDeadlockInfoBufferSize(uint32_t target_size) { | |
20effc67 | 599 | lock_manager_->Resize(target_size); |
11fdf7f2 TL |
600 | } |
601 | ||
602 | void PessimisticTransactionDB::RegisterTransaction(Transaction* txn) { | |
7c673cae FG |
603 | assert(txn); |
604 | assert(txn->GetName().length() > 0); | |
605 | assert(GetTransactionByName(txn->GetName()) == nullptr); | |
606 | assert(txn->GetState() == Transaction::STARTED); | |
607 | std::lock_guard<std::mutex> lock(name_map_mutex_); | |
608 | transactions_[txn->GetName()] = txn; | |
609 | } | |
610 | ||
11fdf7f2 | 611 | void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) { |
7c673cae FG |
612 | assert(txn); |
613 | std::lock_guard<std::mutex> lock(name_map_mutex_); | |
614 | auto it = transactions_.find(txn->GetName()); | |
615 | assert(it != transactions_.end()); | |
616 | transactions_.erase(it); | |
617 | } | |
618 | ||
f67539c2 | 619 | } // namespace ROCKSDB_NAMESPACE |
7c673cae | 620 | #endif // ROCKSDB_LITE |