1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #include "utilities/transactions/pessimistic_transaction_db.h"
12 #include <unordered_set>
15 #include "db/db_impl/db_impl.h"
16 #include "rocksdb/db.h"
17 #include "rocksdb/options.h"
18 #include "rocksdb/utilities/transaction_db.h"
19 #include "test_util/sync_point.h"
20 #include "util/cast_util.h"
21 #include "util/mutexlock.h"
22 #include "utilities/transactions/pessimistic_transaction.h"
23 #include "utilities/transactions/transaction_db_mutex_impl.h"
24 #include "utilities/transactions/write_prepared_txn_db.h"
25 #include "utilities/transactions/write_unprepared_txn_db.h"
27 namespace ROCKSDB_NAMESPACE
{
29 PessimisticTransactionDB::PessimisticTransactionDB(
30 DB
* db
, const TransactionDBOptions
& txn_db_options
)
32 db_impl_(static_cast_with_check
<DBImpl
, DB
>(db
)),
33 txn_db_options_(txn_db_options
),
34 lock_mgr_(this, txn_db_options_
.num_stripes
, txn_db_options
.max_num_locks
,
35 txn_db_options_
.max_num_deadlocks
,
36 txn_db_options_
.custom_mutex_factory
37 ? txn_db_options_
.custom_mutex_factory
38 : std::shared_ptr
<TransactionDBMutexFactory
>(
39 new TransactionDBMutexFactoryImpl())) {
40 assert(db_impl_
!= nullptr);
41 info_log_
= db_impl_
->GetDBOptions().info_log
;
44 // Support initiliazing PessimisticTransactionDB from a stackable db
46 // PessimisticTransactionDB
60 PessimisticTransactionDB::PessimisticTransactionDB(
61 StackableDB
* db
, const TransactionDBOptions
& txn_db_options
)
63 db_impl_(static_cast_with_check
<DBImpl
, DB
>(db
->GetRootDB())),
64 txn_db_options_(txn_db_options
),
65 lock_mgr_(this, txn_db_options_
.num_stripes
, txn_db_options
.max_num_locks
,
66 txn_db_options_
.max_num_deadlocks
,
67 txn_db_options_
.custom_mutex_factory
68 ? txn_db_options_
.custom_mutex_factory
69 : std::shared_ptr
<TransactionDBMutexFactory
>(
70 new TransactionDBMutexFactoryImpl())) {
71 assert(db_impl_
!= nullptr);
74 PessimisticTransactionDB::~PessimisticTransactionDB() {
75 while (!transactions_
.empty()) {
76 delete transactions_
.begin()->second
;
77 // TODO(myabandeh): this seems to be an unsafe approach as it is not quite
78 // clear whether delete would also remove the entry from transactions_.
82 Status
PessimisticTransactionDB::VerifyCFOptions(const ColumnFamilyOptions
&) {
86 Status
PessimisticTransactionDB::Initialize(
87 const std::vector
<size_t>& compaction_enabled_cf_indices
,
88 const std::vector
<ColumnFamilyHandle
*>& handles
) {
89 for (auto cf_ptr
: handles
) {
90 AddColumnFamily(cf_ptr
);
93 for (auto handle
: handles
) {
94 ColumnFamilyDescriptor cfd
;
95 Status s
= handle
->GetDescriptor(&cfd
);
99 s
= VerifyCFOptions(cfd
.options
);
105 // Re-enable compaction for the column families that initially had
106 // compaction enabled.
107 std::vector
<ColumnFamilyHandle
*> compaction_enabled_cf_handles
;
108 compaction_enabled_cf_handles
.reserve(compaction_enabled_cf_indices
.size());
109 for (auto index
: compaction_enabled_cf_indices
) {
110 compaction_enabled_cf_handles
.push_back(handles
[index
]);
113 Status s
= EnableAutoCompaction(compaction_enabled_cf_handles
);
115 // create 'real' transactions from recovered shell transactions
116 auto dbimpl
= static_cast_with_check
<DBImpl
, DB
>(GetRootDB());
117 assert(dbimpl
!= nullptr);
118 auto rtrxs
= dbimpl
->recovered_transactions();
120 for (auto it
= rtrxs
.begin(); it
!= rtrxs
.end(); ++it
) {
121 auto recovered_trx
= it
->second
;
122 assert(recovered_trx
);
123 assert(recovered_trx
->batches_
.size() == 1);
124 const auto& seq
= recovered_trx
->batches_
.begin()->first
;
125 const auto& batch_info
= recovered_trx
->batches_
.begin()->second
;
126 assert(batch_info
.log_number_
);
127 assert(recovered_trx
->name_
.length());
129 WriteOptions w_options
;
130 w_options
.sync
= true;
131 TransactionOptions t_options
;
132 // This would help avoiding deadlock for keys that although exist in the WAL
133 // did not go through concurrency control. This includes the merge that
134 // MyRocks uses for auto-inc columns. It is safe to do so, since (i) if
135 // there is a conflict between the keys of two transactions that must be
136 // avoided, it is already avoided by the application, MyRocks, before the
137 // restart (ii) application, MyRocks, guarntees to rollback/commit the
138 // recovered transactions before new transactions start.
139 t_options
.skip_concurrency_control
= true;
141 Transaction
* real_trx
= BeginTransaction(w_options
, t_options
, nullptr);
143 real_trx
->SetLogNumber(batch_info
.log_number_
);
144 assert(seq
!= kMaxSequenceNumber
);
145 if (GetTxnDBOptions().write_policy
!= WRITE_COMMITTED
) {
146 real_trx
->SetId(seq
);
149 s
= real_trx
->SetName(recovered_trx
->name_
);
154 s
= real_trx
->RebuildFromWriteBatch(batch_info
.batch_
);
155 // WriteCommitted set this to to disable this check that is specific to
156 // WritePrepared txns
157 assert(batch_info
.batch_cnt_
== 0 ||
158 real_trx
->GetWriteBatch()->SubBatchCnt() == batch_info
.batch_cnt_
);
159 real_trx
->SetState(Transaction::PREPARED
);
165 dbimpl
->DeleteAllRecoveredTransactions();
170 Transaction
* WriteCommittedTxnDB::BeginTransaction(
171 const WriteOptions
& write_options
, const TransactionOptions
& txn_options
,
172 Transaction
* old_txn
) {
173 if (old_txn
!= nullptr) {
174 ReinitializeTransaction(old_txn
, write_options
, txn_options
);
177 return new WriteCommittedTxn(this, write_options
, txn_options
);
181 TransactionDBOptions
PessimisticTransactionDB::ValidateTxnDBOptions(
182 const TransactionDBOptions
& txn_db_options
) {
183 TransactionDBOptions validated
= txn_db_options
;
185 if (txn_db_options
.num_stripes
== 0) {
186 validated
.num_stripes
= 1;
192 Status
TransactionDB::Open(const Options
& options
,
193 const TransactionDBOptions
& txn_db_options
,
194 const std::string
& dbname
, TransactionDB
** dbptr
) {
195 DBOptions
db_options(options
);
196 ColumnFamilyOptions
cf_options(options
);
197 std::vector
<ColumnFamilyDescriptor
> column_families
;
198 column_families
.push_back(
199 ColumnFamilyDescriptor(kDefaultColumnFamilyName
, cf_options
));
200 std::vector
<ColumnFamilyHandle
*> handles
;
201 Status s
= TransactionDB::Open(db_options
, txn_db_options
, dbname
,
202 column_families
, &handles
, dbptr
);
204 assert(handles
.size() == 1);
205 // i can delete the handle since DBImpl is always holding a reference to
206 // default column family
213 Status
TransactionDB::Open(
214 const DBOptions
& db_options
, const TransactionDBOptions
& txn_db_options
,
215 const std::string
& dbname
,
216 const std::vector
<ColumnFamilyDescriptor
>& column_families
,
217 std::vector
<ColumnFamilyHandle
*>* handles
, TransactionDB
** dbptr
) {
220 if (txn_db_options
.write_policy
== WRITE_COMMITTED
&&
221 db_options
.unordered_write
) {
222 return Status::NotSupported(
223 "WRITE_COMMITTED is incompatible with unordered_writes");
225 if (txn_db_options
.write_policy
== WRITE_UNPREPARED
&&
226 db_options
.unordered_write
) {
227 // TODO(lth): support it
228 return Status::NotSupported(
229 "WRITE_UNPREPARED is currently incompatible with unordered_writes");
231 if (txn_db_options
.write_policy
== WRITE_PREPARED
&&
232 db_options
.unordered_write
&& !db_options
.two_write_queues
) {
233 return Status::NotSupported(
234 "WRITE_PREPARED is incompatible with unordered_writes if "
235 "two_write_queues is not enabled.");
238 std::vector
<ColumnFamilyDescriptor
> column_families_copy
= column_families
;
239 std::vector
<size_t> compaction_enabled_cf_indices
;
240 DBOptions db_options_2pc
= db_options
;
241 PrepareWrap(&db_options_2pc
, &column_families_copy
,
242 &compaction_enabled_cf_indices
);
243 const bool use_seq_per_batch
=
244 txn_db_options
.write_policy
== WRITE_PREPARED
||
245 txn_db_options
.write_policy
== WRITE_UNPREPARED
;
246 const bool use_batch_per_txn
=
247 txn_db_options
.write_policy
== WRITE_COMMITTED
||
248 txn_db_options
.write_policy
== WRITE_PREPARED
;
249 s
= DBImpl::Open(db_options_2pc
, dbname
, column_families_copy
, handles
, &db
,
250 use_seq_per_batch
, use_batch_per_txn
);
252 ROCKS_LOG_WARN(db
->GetDBOptions().info_log
,
253 "Transaction write_policy is %" PRId32
,
254 static_cast<int>(txn_db_options
.write_policy
));
255 s
= WrapDB(db
, txn_db_options
, compaction_enabled_cf_indices
, *handles
,
259 // just in case it was not deleted (and not set to nullptr).
265 void TransactionDB::PrepareWrap(
266 DBOptions
* db_options
, std::vector
<ColumnFamilyDescriptor
>* column_families
,
267 std::vector
<size_t>* compaction_enabled_cf_indices
) {
268 compaction_enabled_cf_indices
->clear();
270 // Enable MemTable History if not already enabled
271 for (size_t i
= 0; i
< column_families
->size(); i
++) {
272 ColumnFamilyOptions
* cf_options
= &(*column_families
)[i
].options
;
274 if (cf_options
->max_write_buffer_size_to_maintain
== 0 &&
275 cf_options
->max_write_buffer_number_to_maintain
== 0) {
276 // Setting to -1 will set the History size to
277 // max_write_buffer_number * write_buffer_size.
278 cf_options
->max_write_buffer_size_to_maintain
= -1;
280 if (!cf_options
->disable_auto_compactions
) {
281 // Disable compactions momentarily to prevent race with DB::Open
282 cf_options
->disable_auto_compactions
= true;
283 compaction_enabled_cf_indices
->push_back(i
);
286 db_options
->allow_2pc
= true;
289 Status
TransactionDB::WrapDB(
290 // make sure this db is already opened with memtable history enabled,
291 // auto compaction distabled and 2 phase commit enabled
292 DB
* db
, const TransactionDBOptions
& txn_db_options
,
293 const std::vector
<size_t>& compaction_enabled_cf_indices
,
294 const std::vector
<ColumnFamilyHandle
*>& handles
, TransactionDB
** dbptr
) {
295 assert(db
!= nullptr);
296 assert(dbptr
!= nullptr);
298 std::unique_ptr
<PessimisticTransactionDB
> txn_db
;
299 switch (txn_db_options
.write_policy
) {
300 case WRITE_UNPREPARED
:
301 txn_db
.reset(new WriteUnpreparedTxnDB(
302 db
, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options
)));
305 txn_db
.reset(new WritePreparedTxnDB(
306 db
, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options
)));
308 case WRITE_COMMITTED
:
310 txn_db
.reset(new WriteCommittedTxnDB(
311 db
, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options
)));
313 txn_db
->UpdateCFComparatorMap(handles
);
314 Status s
= txn_db
->Initialize(compaction_enabled_cf_indices
, handles
);
315 // In case of a failure at this point, db is deleted via the txn_db destructor
316 // and set to nullptr.
318 *dbptr
= txn_db
.release();
323 Status
TransactionDB::WrapStackableDB(
324 // make sure this stackable_db is already opened with memtable history
325 // enabled, auto compaction distabled and 2 phase commit enabled
326 StackableDB
* db
, const TransactionDBOptions
& txn_db_options
,
327 const std::vector
<size_t>& compaction_enabled_cf_indices
,
328 const std::vector
<ColumnFamilyHandle
*>& handles
, TransactionDB
** dbptr
) {
329 assert(db
!= nullptr);
330 assert(dbptr
!= nullptr);
332 std::unique_ptr
<PessimisticTransactionDB
> txn_db
;
334 switch (txn_db_options
.write_policy
) {
335 case WRITE_UNPREPARED
:
336 txn_db
.reset(new WriteUnpreparedTxnDB(
337 db
, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options
)));
340 txn_db
.reset(new WritePreparedTxnDB(
341 db
, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options
)));
343 case WRITE_COMMITTED
:
345 txn_db
.reset(new WriteCommittedTxnDB(
346 db
, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options
)));
348 txn_db
->UpdateCFComparatorMap(handles
);
349 Status s
= txn_db
->Initialize(compaction_enabled_cf_indices
, handles
);
350 // In case of a failure at this point, db is deleted via the txn_db destructor
351 // and set to nullptr.
353 *dbptr
= txn_db
.release();
358 // Let TransactionLockMgr know that this column family exists so it can
359 // allocate a LockMap for it.
360 void PessimisticTransactionDB::AddColumnFamily(
361 const ColumnFamilyHandle
* handle
) {
362 lock_mgr_
.AddColumnFamily(handle
->GetID());
365 Status
PessimisticTransactionDB::CreateColumnFamily(
366 const ColumnFamilyOptions
& options
, const std::string
& column_family_name
,
367 ColumnFamilyHandle
** handle
) {
368 InstrumentedMutexLock
l(&column_family_mutex_
);
369 Status s
= VerifyCFOptions(options
);
374 s
= db_
->CreateColumnFamily(options
, column_family_name
, handle
);
376 lock_mgr_
.AddColumnFamily((*handle
)->GetID());
377 UpdateCFComparatorMap(*handle
);
383 // Let TransactionLockMgr know that it can deallocate the LockMap for this
385 Status
PessimisticTransactionDB::DropColumnFamily(
386 ColumnFamilyHandle
* column_family
) {
387 InstrumentedMutexLock
l(&column_family_mutex_
);
389 Status s
= db_
->DropColumnFamily(column_family
);
391 lock_mgr_
.RemoveColumnFamily(column_family
->GetID());
397 Status
PessimisticTransactionDB::TryLock(PessimisticTransaction
* txn
,
399 const std::string
& key
,
401 return lock_mgr_
.TryLock(txn
, cfh_id
, key
, GetEnv(), exclusive
);
404 void PessimisticTransactionDB::UnLock(PessimisticTransaction
* txn
,
405 const TransactionKeyMap
* keys
) {
406 lock_mgr_
.UnLock(txn
, keys
, GetEnv());
409 void PessimisticTransactionDB::UnLock(PessimisticTransaction
* txn
,
410 uint32_t cfh_id
, const std::string
& key
) {
411 lock_mgr_
.UnLock(txn
, cfh_id
, key
, GetEnv());
414 // Used when wrapping DB write operations in a transaction
415 Transaction
* PessimisticTransactionDB::BeginInternalTransaction(
416 const WriteOptions
& options
) {
417 TransactionOptions txn_options
;
418 Transaction
* txn
= BeginTransaction(options
, txn_options
, nullptr);
420 // Use default timeout for non-transactional writes
421 txn
->SetLockTimeout(txn_db_options_
.default_lock_timeout
);
425 // All user Put, Merge, Delete, and Write requests must be intercepted to make
426 // sure that they lock all keys that they are writing to avoid causing conflicts
427 // with any concurrent transactions. The easiest way to do this is to wrap all
428 // write operations in a transaction.
430 // Put(), Merge(), and Delete() only lock a single key per call. Write() will
431 // sort its keys before locking them. This guarantees that TransactionDB write
432 // methods cannot deadlock with each other (but still could deadlock with a
434 Status
PessimisticTransactionDB::Put(const WriteOptions
& options
,
435 ColumnFamilyHandle
* column_family
,
436 const Slice
& key
, const Slice
& val
) {
439 Transaction
* txn
= BeginInternalTransaction(options
);
440 txn
->DisableIndexing();
442 // Since the client didn't create a transaction, they don't care about
443 // conflict checking for this write. So we just need to do PutUntracked().
444 s
= txn
->PutUntracked(column_family
, key
, val
);
455 Status
PessimisticTransactionDB::Delete(const WriteOptions
& wopts
,
456 ColumnFamilyHandle
* column_family
,
460 Transaction
* txn
= BeginInternalTransaction(wopts
);
461 txn
->DisableIndexing();
463 // Since the client didn't create a transaction, they don't care about
464 // conflict checking for this write. So we just need to do
465 // DeleteUntracked().
466 s
= txn
->DeleteUntracked(column_family
, key
);
477 Status
PessimisticTransactionDB::SingleDelete(const WriteOptions
& wopts
,
478 ColumnFamilyHandle
* column_family
,
482 Transaction
* txn
= BeginInternalTransaction(wopts
);
483 txn
->DisableIndexing();
485 // Since the client didn't create a transaction, they don't care about
486 // conflict checking for this write. So we just need to do
487 // SingleDeleteUntracked().
488 s
= txn
->SingleDeleteUntracked(column_family
, key
);
499 Status
PessimisticTransactionDB::Merge(const WriteOptions
& options
,
500 ColumnFamilyHandle
* column_family
,
501 const Slice
& key
, const Slice
& value
) {
504 Transaction
* txn
= BeginInternalTransaction(options
);
505 txn
->DisableIndexing();
507 // Since the client didn't create a transaction, they don't care about
508 // conflict checking for this write. So we just need to do
510 s
= txn
->MergeUntracked(column_family
, key
, value
);
521 Status
PessimisticTransactionDB::Write(const WriteOptions
& opts
,
522 WriteBatch
* updates
) {
523 return WriteWithConcurrencyControl(opts
, updates
);
526 Status
WriteCommittedTxnDB::Write(const WriteOptions
& opts
,
527 WriteBatch
* updates
) {
528 if (txn_db_options_
.skip_concurrency_control
) {
529 return db_impl_
->Write(opts
, updates
);
531 return WriteWithConcurrencyControl(opts
, updates
);
535 Status
WriteCommittedTxnDB::Write(
536 const WriteOptions
& opts
,
537 const TransactionDBWriteOptimizations
& optimizations
, WriteBatch
* updates
) {
538 if (optimizations
.skip_concurrency_control
) {
539 return db_impl_
->Write(opts
, updates
);
541 return WriteWithConcurrencyControl(opts
, updates
);
545 void PessimisticTransactionDB::InsertExpirableTransaction(
546 TransactionID tx_id
, PessimisticTransaction
* tx
) {
547 assert(tx
->GetExpirationTime() > 0);
548 std::lock_guard
<std::mutex
> lock(map_mutex_
);
549 expirable_transactions_map_
.insert({tx_id
, tx
});
552 void PessimisticTransactionDB::RemoveExpirableTransaction(TransactionID tx_id
) {
553 std::lock_guard
<std::mutex
> lock(map_mutex_
);
554 expirable_transactions_map_
.erase(tx_id
);
557 bool PessimisticTransactionDB::TryStealingExpiredTransactionLocks(
558 TransactionID tx_id
) {
559 std::lock_guard
<std::mutex
> lock(map_mutex_
);
561 auto tx_it
= expirable_transactions_map_
.find(tx_id
);
562 if (tx_it
== expirable_transactions_map_
.end()) {
565 PessimisticTransaction
& tx
= *(tx_it
->second
);
566 return tx
.TryStealingLocks();
569 void PessimisticTransactionDB::ReinitializeTransaction(
570 Transaction
* txn
, const WriteOptions
& write_options
,
571 const TransactionOptions
& txn_options
) {
573 static_cast_with_check
<PessimisticTransaction
, Transaction
>(txn
);
575 txn_impl
->Reinitialize(this, write_options
, txn_options
);
578 Transaction
* PessimisticTransactionDB::GetTransactionByName(
579 const TransactionName
& name
) {
580 std::lock_guard
<std::mutex
> lock(name_map_mutex_
);
581 auto it
= transactions_
.find(name
);
582 if (it
== transactions_
.end()) {
589 void PessimisticTransactionDB::GetAllPreparedTransactions(
590 std::vector
<Transaction
*>* transv
) {
593 std::lock_guard
<std::mutex
> lock(name_map_mutex_
);
594 for (auto it
= transactions_
.begin(); it
!= transactions_
.end(); ++it
) {
595 if (it
->second
->GetState() == Transaction::PREPARED
) {
596 transv
->push_back(it
->second
);
601 TransactionLockMgr::LockStatusData
602 PessimisticTransactionDB::GetLockStatusData() {
603 return lock_mgr_
.GetLockStatusData();
606 std::vector
<DeadlockPath
> PessimisticTransactionDB::GetDeadlockInfoBuffer() {
607 return lock_mgr_
.GetDeadlockInfoBuffer();
610 void PessimisticTransactionDB::SetDeadlockInfoBufferSize(uint32_t target_size
) {
611 lock_mgr_
.Resize(target_size
);
614 void PessimisticTransactionDB::RegisterTransaction(Transaction
* txn
) {
616 assert(txn
->GetName().length() > 0);
617 assert(GetTransactionByName(txn
->GetName()) == nullptr);
618 assert(txn
->GetState() == Transaction::STARTED
);
619 std::lock_guard
<std::mutex
> lock(name_map_mutex_
);
620 transactions_
[txn
->GetName()] = txn
;
623 void PessimisticTransactionDB::UnregisterTransaction(Transaction
* txn
) {
625 std::lock_guard
<std::mutex
> lock(name_map_mutex_
);
626 auto it
= transactions_
.find(txn
->GetName());
627 assert(it
!= transactions_
.end());
628 transactions_
.erase(it
);
631 } // namespace ROCKSDB_NAMESPACE
632 #endif // ROCKSDB_LITE