X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=ceph%2Fsrc%2Frocksdb%2Futilities%2Ftransactions%2Fpessimistic_transaction_db.cc;h=73520f9abbf98364e91126a5da15e0053c0f0c51;hb=20effc670b57271cb089376d6d0800990e5218d5;hp=6b016ef72a821b568476e41443a7a89b16daf29b;hpb=11fdf7f228cb605e22a0e495ebabd3329db96b81;p=ceph.git diff --git a/ceph/src/rocksdb/utilities/transactions/pessimistic_transaction_db.cc b/ceph/src/rocksdb/utilities/transactions/pessimistic_transaction_db.cc index 6b016ef72..73520f9ab 100644 --- a/ceph/src/rocksdb/utilities/transactions/pessimistic_transaction_db.cc +++ b/ceph/src/rocksdb/utilities/transactions/pessimistic_transaction_db.cc @@ -5,42 +5,33 @@ #ifndef ROCKSDB_LITE -#ifndef __STDC_FORMAT_MACROS -#define __STDC_FORMAT_MACROS -#endif - #include "utilities/transactions/pessimistic_transaction_db.h" -#include +#include #include #include #include -#include "db/db_impl.h" +#include "db/db_impl/db_impl.h" #include "rocksdb/db.h" #include "rocksdb/options.h" #include "rocksdb/utilities/transaction_db.h" +#include "test_util/sync_point.h" #include "util/cast_util.h" #include "util/mutexlock.h" -#include "util/sync_point.h" #include "utilities/transactions/pessimistic_transaction.h" #include "utilities/transactions/transaction_db_mutex_impl.h" #include "utilities/transactions/write_prepared_txn_db.h" #include "utilities/transactions/write_unprepared_txn_db.h" -namespace rocksdb { +namespace ROCKSDB_NAMESPACE { PessimisticTransactionDB::PessimisticTransactionDB( DB* db, const TransactionDBOptions& txn_db_options) : TransactionDB(db), - db_impl_(static_cast_with_check(db)), + db_impl_(static_cast_with_check(db)), txn_db_options_(txn_db_options), - lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks, - txn_db_options_.max_num_deadlocks, - txn_db_options_.custom_mutex_factory - ? txn_db_options_.custom_mutex_factory - : std::shared_ptr( - new TransactionDBMutexFactoryImpl())) { + lock_manager_(NewLockManager(this, txn_db_options)) { assert(db_impl_ != nullptr); info_log_ = db_impl_->GetDBOptions().info_log; } @@ -64,14 +55,9 @@ PessimisticTransactionDB::PessimisticTransactionDB( PessimisticTransactionDB::PessimisticTransactionDB( StackableDB* db, const TransactionDBOptions& txn_db_options) : TransactionDB(db), - db_impl_(static_cast_with_check(db->GetRootDB())), + db_impl_(static_cast_with_check(db->GetRootDB())), txn_db_options_(txn_db_options), - lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks, - txn_db_options_.max_num_deadlocks, - txn_db_options_.custom_mutex_factory - ? txn_db_options_.custom_mutex_factory - : std::shared_ptr( - new TransactionDBMutexFactoryImpl())) { + lock_manager_(NewLockManager(this, txn_db_options)) { assert(db_impl_ != nullptr); } @@ -117,11 +103,11 @@ Status PessimisticTransactionDB::Initialize( Status s = EnableAutoCompaction(compaction_enabled_cf_handles); // create 'real' transactions from recovered shell transactions - auto dbimpl = reinterpret_cast(GetRootDB()); + auto dbimpl = static_cast_with_check(GetRootDB()); assert(dbimpl != nullptr); auto rtrxs = dbimpl->recovered_transactions(); - for (auto it = rtrxs.begin(); it != rtrxs.end(); it++) { + for (auto it = rtrxs.begin(); it != rtrxs.end(); ++it) { auto recovered_trx = it->second; assert(recovered_trx); assert(recovered_trx->batches_.size() == 1); @@ -146,7 +132,9 @@ Status PessimisticTransactionDB::Initialize( assert(real_trx); real_trx->SetLogNumber(batch_info.log_number_); assert(seq != kMaxSequenceNumber); - real_trx->SetId(seq); + if (GetTxnDBOptions().write_policy != WRITE_COMMITTED) { + real_trx->SetId(seq); + } s = real_trx->SetName(recovered_trx->name_); if (!s.ok()) { @@ -219,9 +207,24 @@ Status TransactionDB::Open( std::vector* handles, TransactionDB** dbptr) { Status s; DB* db = nullptr; + if (txn_db_options.write_policy == WRITE_COMMITTED && + db_options.unordered_write) { + return Status::NotSupported( + "WRITE_COMMITTED is incompatible with unordered_writes"); + } + if (txn_db_options.write_policy == WRITE_UNPREPARED && + db_options.unordered_write) { + // TODO(lth): support it + return Status::NotSupported( + "WRITE_UNPREPARED is currently incompatible with unordered_writes"); + } + if (txn_db_options.write_policy == WRITE_PREPARED && + db_options.unordered_write && !db_options.two_write_queues) { + return Status::NotSupported( + "WRITE_PREPARED is incompatible with unordered_writes if " + "two_write_queues is not enabled."); + } - ROCKS_LOG_WARN(db_options.info_log, "Transaction write_policy is %" PRId32, - static_cast(txn_db_options.write_policy)); std::vector column_families_copy = column_families; std::vector compaction_enabled_cf_indices; DBOptions db_options_2pc = db_options; @@ -236,6 +239,9 @@ Status TransactionDB::Open( s = DBImpl::Open(db_options_2pc, dbname, column_families_copy, handles, &db, use_seq_per_batch, use_batch_per_txn); if (s.ok()) { + ROCKS_LOG_WARN(db->GetDBOptions().info_log, + "Transaction write_policy is %" PRId32, + static_cast(txn_db_options.write_policy)); s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles, dbptr); } @@ -255,9 +261,11 @@ void TransactionDB::PrepareWrap( for (size_t i = 0; i < column_families->size(); i++) { ColumnFamilyOptions* cf_options = &(*column_families)[i].options; - if (cf_options->max_write_buffer_number_to_maintain == 0) { - // Setting to -1 will set the History size to max_write_buffer_number. - cf_options->max_write_buffer_number_to_maintain = -1; + if (cf_options->max_write_buffer_size_to_maintain == 0 && + cf_options->max_write_buffer_number_to_maintain == 0) { + // Setting to -1 will set the History size to + // max_write_buffer_number * write_buffer_size. + cf_options->max_write_buffer_size_to_maintain = -1; } if (!cf_options->disable_auto_compactions) { // Disable compactions momentarily to prevent race with DB::Open @@ -337,11 +345,11 @@ Status TransactionDB::WrapStackableDB( return s; } -// Let TransactionLockMgr know that this column family exists so it can +// Let LockManager know that this column family exists so it can // allocate a LockMap for it. void PessimisticTransactionDB::AddColumnFamily( const ColumnFamilyHandle* handle) { - lock_mgr_.AddColumnFamily(handle->GetID()); + lock_manager_->AddColumnFamily(handle); } Status PessimisticTransactionDB::CreateColumnFamily( @@ -355,14 +363,14 @@ Status PessimisticTransactionDB::CreateColumnFamily( s = db_->CreateColumnFamily(options, column_family_name, handle); if (s.ok()) { - lock_mgr_.AddColumnFamily((*handle)->GetID()); + lock_manager_->AddColumnFamily(*handle); UpdateCFComparatorMap(*handle); } return s; } -// Let TransactionLockMgr know that it can deallocate the LockMap for this +// Let LockManager know that it can deallocate the LockMap for this // column family. Status PessimisticTransactionDB::DropColumnFamily( ColumnFamilyHandle* column_family) { @@ -370,7 +378,7 @@ Status PessimisticTransactionDB::DropColumnFamily( Status s = db_->DropColumnFamily(column_family); if (s.ok()) { - lock_mgr_.RemoveColumnFamily(column_family->GetID()); + lock_manager_->RemoveColumnFamily(column_family); } return s; @@ -380,17 +388,17 @@ Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn, uint32_t cfh_id, const std::string& key, bool exclusive) { - return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv(), exclusive); + return lock_manager_->TryLock(txn, cfh_id, key, GetEnv(), exclusive); } void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn, - const TransactionKeyMap* keys) { - lock_mgr_.UnLock(txn, keys, GetEnv()); + const LockTracker& keys) { + lock_manager_->UnLock(txn, keys, GetEnv()); } void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn, uint32_t cfh_id, const std::string& key) { - lock_mgr_.UnLock(txn, cfh_id, key, GetEnv()); + lock_manager_->UnLock(txn, cfh_id, key, GetEnv()); } // Used when wrapping DB write operations in a transaction @@ -502,23 +510,16 @@ Status PessimisticTransactionDB::Merge(const WriteOptions& options, Status PessimisticTransactionDB::Write(const WriteOptions& opts, WriteBatch* updates) { - // Need to lock all keys in this batch to prevent write conflicts with - // concurrent transactions. - Transaction* txn = BeginInternalTransaction(opts); - txn->DisableIndexing(); - - auto txn_impl = - static_cast_with_check(txn); - - // Since commitBatch sorts the keys before locking, concurrent Write() - // operations will not cause a deadlock. - // In order to avoid a deadlock with a concurrent Transaction, Transactions - // should use a lock timeout. - Status s = txn_impl->CommitBatch(updates); - - delete txn; + return WriteWithConcurrencyControl(opts, updates); +} - return s; +Status WriteCommittedTxnDB::Write(const WriteOptions& opts, + WriteBatch* updates) { + if (txn_db_options_.skip_concurrency_control) { + return db_impl_->Write(opts, updates); + } else { + return WriteWithConcurrencyControl(opts, updates); + } } Status WriteCommittedTxnDB::Write( @@ -527,7 +528,7 @@ Status WriteCommittedTxnDB::Write( if (optimizations.skip_concurrency_control) { return db_impl_->Write(opts, updates); } else { - return Write(opts, updates); + return WriteWithConcurrencyControl(opts, updates); } } @@ -558,8 +559,7 @@ bool PessimisticTransactionDB::TryStealingExpiredTransactionLocks( void PessimisticTransactionDB::ReinitializeTransaction( Transaction* txn, const WriteOptions& write_options, const TransactionOptions& txn_options) { - auto txn_impl = - static_cast_with_check(txn); + auto txn_impl = static_cast_with_check(txn); txn_impl->Reinitialize(this, write_options, txn_options); } @@ -580,24 +580,23 @@ void PessimisticTransactionDB::GetAllPreparedTransactions( assert(transv); transv->clear(); std::lock_guard lock(name_map_mutex_); - for (auto it = transactions_.begin(); it != transactions_.end(); it++) { + for (auto it = transactions_.begin(); it != transactions_.end(); ++it) { if (it->second->GetState() == Transaction::PREPARED) { transv->push_back(it->second); } } } -TransactionLockMgr::LockStatusData -PessimisticTransactionDB::GetLockStatusData() { - return lock_mgr_.GetLockStatusData(); +LockManager::PointLockStatus PessimisticTransactionDB::GetLockStatusData() { + return lock_manager_->GetPointLockStatus(); } std::vector PessimisticTransactionDB::GetDeadlockInfoBuffer() { - return lock_mgr_.GetDeadlockInfoBuffer(); + return lock_manager_->GetDeadlockInfoBuffer(); } void PessimisticTransactionDB::SetDeadlockInfoBufferSize(uint32_t target_size) { - lock_mgr_.Resize(target_size); + lock_manager_->Resize(target_size); } void PessimisticTransactionDB::RegisterTransaction(Transaction* txn) { @@ -617,5 +616,5 @@ void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) { transactions_.erase(it); } -} // namespace rocksdb +} // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_LITE