#ifndef ROCKSDB_LITE
-#ifndef __STDC_FORMAT_MACROS
-#define __STDC_FORMAT_MACROS
-#endif
-
#include "utilities/transactions/pessimistic_transaction_db.h"
-#include <inttypes.h>
+#include <cinttypes>
#include <string>
#include <unordered_set>
#include <vector>
-#include "db/db_impl.h"
+#include "db/db_impl/db_impl.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/transaction_db.h"
+#include "test_util/sync_point.h"
#include "util/cast_util.h"
#include "util/mutexlock.h"
-#include "util/sync_point.h"
#include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/transaction_db_mutex_impl.h"
#include "utilities/transactions/write_prepared_txn_db.h"
#include "utilities/transactions/write_unprepared_txn_db.h"
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
PessimisticTransactionDB::PessimisticTransactionDB(
DB* db, const TransactionDBOptions& txn_db_options)
: TransactionDB(db),
- db_impl_(static_cast_with_check<DBImpl, DB>(db)),
+ db_impl_(static_cast_with_check<DBImpl>(db)),
txn_db_options_(txn_db_options),
- lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks,
- txn_db_options_.max_num_deadlocks,
- txn_db_options_.custom_mutex_factory
- ? txn_db_options_.custom_mutex_factory
- : std::shared_ptr<TransactionDBMutexFactory>(
- new TransactionDBMutexFactoryImpl())) {
+ lock_manager_(NewLockManager(this, txn_db_options)) {
assert(db_impl_ != nullptr);
info_log_ = db_impl_->GetDBOptions().info_log;
}
PessimisticTransactionDB::PessimisticTransactionDB(
StackableDB* db, const TransactionDBOptions& txn_db_options)
: TransactionDB(db),
- db_impl_(static_cast_with_check<DBImpl, DB>(db->GetRootDB())),
+ db_impl_(static_cast_with_check<DBImpl>(db->GetRootDB())),
txn_db_options_(txn_db_options),
- lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks,
- txn_db_options_.max_num_deadlocks,
- txn_db_options_.custom_mutex_factory
- ? txn_db_options_.custom_mutex_factory
- : std::shared_ptr<TransactionDBMutexFactory>(
- new TransactionDBMutexFactoryImpl())) {
+ lock_manager_(NewLockManager(this, txn_db_options)) {
assert(db_impl_ != nullptr);
}
Status s = EnableAutoCompaction(compaction_enabled_cf_handles);
// create 'real' transactions from recovered shell transactions
- auto dbimpl = reinterpret_cast<DBImpl*>(GetRootDB());
+ auto dbimpl = static_cast_with_check<DBImpl>(GetRootDB());
assert(dbimpl != nullptr);
auto rtrxs = dbimpl->recovered_transactions();
- for (auto it = rtrxs.begin(); it != rtrxs.end(); it++) {
+ for (auto it = rtrxs.begin(); it != rtrxs.end(); ++it) {
auto recovered_trx = it->second;
assert(recovered_trx);
assert(recovered_trx->batches_.size() == 1);
assert(real_trx);
real_trx->SetLogNumber(batch_info.log_number_);
assert(seq != kMaxSequenceNumber);
- real_trx->SetId(seq);
+ if (GetTxnDBOptions().write_policy != WRITE_COMMITTED) {
+ real_trx->SetId(seq);
+ }
s = real_trx->SetName(recovered_trx->name_);
if (!s.ok()) {
std::vector<ColumnFamilyHandle*>* handles, TransactionDB** dbptr) {
Status s;
DB* db = nullptr;
+ if (txn_db_options.write_policy == WRITE_COMMITTED &&
+ db_options.unordered_write) {
+ return Status::NotSupported(
+ "WRITE_COMMITTED is incompatible with unordered_writes");
+ }
+ if (txn_db_options.write_policy == WRITE_UNPREPARED &&
+ db_options.unordered_write) {
+ // TODO(lth): support it
+ return Status::NotSupported(
+ "WRITE_UNPREPARED is currently incompatible with unordered_writes");
+ }
+ if (txn_db_options.write_policy == WRITE_PREPARED &&
+ db_options.unordered_write && !db_options.two_write_queues) {
+ return Status::NotSupported(
+ "WRITE_PREPARED is incompatible with unordered_writes if "
+ "two_write_queues is not enabled.");
+ }
- ROCKS_LOG_WARN(db_options.info_log, "Transaction write_policy is %" PRId32,
- static_cast<int>(txn_db_options.write_policy));
std::vector<ColumnFamilyDescriptor> column_families_copy = column_families;
std::vector<size_t> compaction_enabled_cf_indices;
DBOptions db_options_2pc = db_options;
s = DBImpl::Open(db_options_2pc, dbname, column_families_copy, handles, &db,
use_seq_per_batch, use_batch_per_txn);
if (s.ok()) {
+ ROCKS_LOG_WARN(db->GetDBOptions().info_log,
+ "Transaction write_policy is %" PRId32,
+ static_cast<int>(txn_db_options.write_policy));
s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles,
dbptr);
}
for (size_t i = 0; i < column_families->size(); i++) {
ColumnFamilyOptions* cf_options = &(*column_families)[i].options;
- if (cf_options->max_write_buffer_number_to_maintain == 0) {
- // Setting to -1 will set the History size to max_write_buffer_number.
- cf_options->max_write_buffer_number_to_maintain = -1;
+ if (cf_options->max_write_buffer_size_to_maintain == 0 &&
+ cf_options->max_write_buffer_number_to_maintain == 0) {
+ // Setting to -1 will set the History size to
+ // max_write_buffer_number * write_buffer_size.
+ cf_options->max_write_buffer_size_to_maintain = -1;
}
if (!cf_options->disable_auto_compactions) {
// Disable compactions momentarily to prevent race with DB::Open
return s;
}
-// Let TransactionLockMgr know that this column family exists so it can
+// Let LockManager know that this column family exists so it can
// allocate a LockMap for it.
void PessimisticTransactionDB::AddColumnFamily(
const ColumnFamilyHandle* handle) {
- lock_mgr_.AddColumnFamily(handle->GetID());
+ lock_manager_->AddColumnFamily(handle);
}
Status PessimisticTransactionDB::CreateColumnFamily(
s = db_->CreateColumnFamily(options, column_family_name, handle);
if (s.ok()) {
- lock_mgr_.AddColumnFamily((*handle)->GetID());
+ lock_manager_->AddColumnFamily(*handle);
UpdateCFComparatorMap(*handle);
}
return s;
}
-// Let TransactionLockMgr know that it can deallocate the LockMap for this
+// Let LockManager know that it can deallocate the LockMap for this
// column family.
Status PessimisticTransactionDB::DropColumnFamily(
ColumnFamilyHandle* column_family) {
Status s = db_->DropColumnFamily(column_family);
if (s.ok()) {
- lock_mgr_.RemoveColumnFamily(column_family->GetID());
+ lock_manager_->RemoveColumnFamily(column_family);
}
return s;
uint32_t cfh_id,
const std::string& key,
bool exclusive) {
- return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv(), exclusive);
+ return lock_manager_->TryLock(txn, cfh_id, key, GetEnv(), exclusive);
}
void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
- const TransactionKeyMap* keys) {
- lock_mgr_.UnLock(txn, keys, GetEnv());
+ const LockTracker& keys) {
+ lock_manager_->UnLock(txn, keys, GetEnv());
}
void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
uint32_t cfh_id, const std::string& key) {
- lock_mgr_.UnLock(txn, cfh_id, key, GetEnv());
+ lock_manager_->UnLock(txn, cfh_id, key, GetEnv());
}
// Used when wrapping DB write operations in a transaction
Status PessimisticTransactionDB::Write(const WriteOptions& opts,
WriteBatch* updates) {
- // Need to lock all keys in this batch to prevent write conflicts with
- // concurrent transactions.
- Transaction* txn = BeginInternalTransaction(opts);
- txn->DisableIndexing();
-
- auto txn_impl =
- static_cast_with_check<PessimisticTransaction, Transaction>(txn);
-
- // Since commitBatch sorts the keys before locking, concurrent Write()
- // operations will not cause a deadlock.
- // In order to avoid a deadlock with a concurrent Transaction, Transactions
- // should use a lock timeout.
- Status s = txn_impl->CommitBatch(updates);
-
- delete txn;
+ return WriteWithConcurrencyControl(opts, updates);
+}
- return s;
+Status WriteCommittedTxnDB::Write(const WriteOptions& opts,
+ WriteBatch* updates) {
+ if (txn_db_options_.skip_concurrency_control) {
+ return db_impl_->Write(opts, updates);
+ } else {
+ return WriteWithConcurrencyControl(opts, updates);
+ }
}
Status WriteCommittedTxnDB::Write(
if (optimizations.skip_concurrency_control) {
return db_impl_->Write(opts, updates);
} else {
- return Write(opts, updates);
+ return WriteWithConcurrencyControl(opts, updates);
}
}
void PessimisticTransactionDB::ReinitializeTransaction(
Transaction* txn, const WriteOptions& write_options,
const TransactionOptions& txn_options) {
- auto txn_impl =
- static_cast_with_check<PessimisticTransaction, Transaction>(txn);
+ auto txn_impl = static_cast_with_check<PessimisticTransaction>(txn);
txn_impl->Reinitialize(this, write_options, txn_options);
}
assert(transv);
transv->clear();
std::lock_guard<std::mutex> lock(name_map_mutex_);
- for (auto it = transactions_.begin(); it != transactions_.end(); it++) {
+ for (auto it = transactions_.begin(); it != transactions_.end(); ++it) {
if (it->second->GetState() == Transaction::PREPARED) {
transv->push_back(it->second);
}
}
}
-TransactionLockMgr::LockStatusData
-PessimisticTransactionDB::GetLockStatusData() {
- return lock_mgr_.GetLockStatusData();
+LockManager::PointLockStatus PessimisticTransactionDB::GetLockStatusData() {
+ return lock_manager_->GetPointLockStatus();
}
std::vector<DeadlockPath> PessimisticTransactionDB::GetDeadlockInfoBuffer() {
- return lock_mgr_.GetDeadlockInfoBuffer();
+ return lock_manager_->GetDeadlockInfoBuffer();
}
void PessimisticTransactionDB::SetDeadlockInfoBufferSize(uint32_t target_size) {
- lock_mgr_.Resize(target_size);
+ lock_manager_->Resize(target_size);
}
void PessimisticTransactionDB::RegisterTransaction(Transaction* txn) {
transactions_.erase(it);
}
-} // namespace rocksdb
+} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE