]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/utilities/transactions/pessimistic_transaction_db.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / utilities / transactions / pessimistic_transaction_db.cc
index 6b016ef72a821b568476e41443a7a89b16daf29b..73520f9abbf98364e91126a5da15e0053c0f0c51 100644 (file)
@@ -5,42 +5,33 @@
 
 #ifndef ROCKSDB_LITE
 
-#ifndef __STDC_FORMAT_MACROS
-#define __STDC_FORMAT_MACROS
-#endif
-
 #include "utilities/transactions/pessimistic_transaction_db.h"
 
-#include <inttypes.h>
+#include <cinttypes>
 #include <string>
 #include <unordered_set>
 #include <vector>
 
-#include "db/db_impl.h"
+#include "db/db_impl/db_impl.h"
 #include "rocksdb/db.h"
 #include "rocksdb/options.h"
 #include "rocksdb/utilities/transaction_db.h"
+#include "test_util/sync_point.h"
 #include "util/cast_util.h"
 #include "util/mutexlock.h"
-#include "util/sync_point.h"
 #include "utilities/transactions/pessimistic_transaction.h"
 #include "utilities/transactions/transaction_db_mutex_impl.h"
 #include "utilities/transactions/write_prepared_txn_db.h"
 #include "utilities/transactions/write_unprepared_txn_db.h"
 
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
 
 PessimisticTransactionDB::PessimisticTransactionDB(
     DB* db, const TransactionDBOptions& txn_db_options)
     : TransactionDB(db),
-      db_impl_(static_cast_with_check<DBImpl, DB>(db)),
+      db_impl_(static_cast_with_check<DBImpl>(db)),
       txn_db_options_(txn_db_options),
-      lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks,
-                txn_db_options_.max_num_deadlocks,
-                txn_db_options_.custom_mutex_factory
-                    ? txn_db_options_.custom_mutex_factory
-                    : std::shared_ptr<TransactionDBMutexFactory>(
-                          new TransactionDBMutexFactoryImpl())) {
+      lock_manager_(NewLockManager(this, txn_db_options)) {
   assert(db_impl_ != nullptr);
   info_log_ = db_impl_->GetDBOptions().info_log;
 }
@@ -64,14 +55,9 @@ PessimisticTransactionDB::PessimisticTransactionDB(
 PessimisticTransactionDB::PessimisticTransactionDB(
     StackableDB* db, const TransactionDBOptions& txn_db_options)
     : TransactionDB(db),
-      db_impl_(static_cast_with_check<DBImpl, DB>(db->GetRootDB())),
+      db_impl_(static_cast_with_check<DBImpl>(db->GetRootDB())),
       txn_db_options_(txn_db_options),
-      lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks,
-                txn_db_options_.max_num_deadlocks,
-                txn_db_options_.custom_mutex_factory
-                    ? txn_db_options_.custom_mutex_factory
-                    : std::shared_ptr<TransactionDBMutexFactory>(
-                          new TransactionDBMutexFactoryImpl())) {
+      lock_manager_(NewLockManager(this, txn_db_options)) {
   assert(db_impl_ != nullptr);
 }
 
@@ -117,11 +103,11 @@ Status PessimisticTransactionDB::Initialize(
   Status s = EnableAutoCompaction(compaction_enabled_cf_handles);
 
   // create 'real' transactions from recovered shell transactions
-  auto dbimpl = reinterpret_cast<DBImpl*>(GetRootDB());
+  auto dbimpl = static_cast_with_check<DBImpl>(GetRootDB());
   assert(dbimpl != nullptr);
   auto rtrxs = dbimpl->recovered_transactions();
 
-  for (auto it = rtrxs.begin(); it != rtrxs.end(); it++) {
+  for (auto it = rtrxs.begin(); it != rtrxs.end(); ++it) {
     auto recovered_trx = it->second;
     assert(recovered_trx);
     assert(recovered_trx->batches_.size() == 1);
@@ -146,7 +132,9 @@ Status PessimisticTransactionDB::Initialize(
     assert(real_trx);
     real_trx->SetLogNumber(batch_info.log_number_);
     assert(seq != kMaxSequenceNumber);
-    real_trx->SetId(seq);
+    if (GetTxnDBOptions().write_policy != WRITE_COMMITTED) {
+      real_trx->SetId(seq);
+    }
 
     s = real_trx->SetName(recovered_trx->name_);
     if (!s.ok()) {
@@ -219,9 +207,24 @@ Status TransactionDB::Open(
     std::vector<ColumnFamilyHandle*>* handles, TransactionDB** dbptr) {
   Status s;
   DB* db = nullptr;
+  if (txn_db_options.write_policy == WRITE_COMMITTED &&
+      db_options.unordered_write) {
+    return Status::NotSupported(
+        "WRITE_COMMITTED is incompatible with unordered_writes");
+  }
+  if (txn_db_options.write_policy == WRITE_UNPREPARED &&
+      db_options.unordered_write) {
+    // TODO(lth): support it
+    return Status::NotSupported(
+        "WRITE_UNPREPARED is currently incompatible with unordered_writes");
+  }
+  if (txn_db_options.write_policy == WRITE_PREPARED &&
+      db_options.unordered_write && !db_options.two_write_queues) {
+    return Status::NotSupported(
+        "WRITE_PREPARED is incompatible with unordered_writes if "
+        "two_write_queues is not enabled.");
+  }
 
-  ROCKS_LOG_WARN(db_options.info_log, "Transaction write_policy is %" PRId32,
-                 static_cast<int>(txn_db_options.write_policy));
   std::vector<ColumnFamilyDescriptor> column_families_copy = column_families;
   std::vector<size_t> compaction_enabled_cf_indices;
   DBOptions db_options_2pc = db_options;
@@ -236,6 +239,9 @@ Status TransactionDB::Open(
   s = DBImpl::Open(db_options_2pc, dbname, column_families_copy, handles, &db,
                    use_seq_per_batch, use_batch_per_txn);
   if (s.ok()) {
+    ROCKS_LOG_WARN(db->GetDBOptions().info_log,
+                   "Transaction write_policy is %" PRId32,
+                   static_cast<int>(txn_db_options.write_policy));
     s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles,
                dbptr);
   }
@@ -255,9 +261,11 @@ void TransactionDB::PrepareWrap(
   for (size_t i = 0; i < column_families->size(); i++) {
     ColumnFamilyOptions* cf_options = &(*column_families)[i].options;
 
-    if (cf_options->max_write_buffer_number_to_maintain == 0) {
-      // Setting to -1 will set the History size to max_write_buffer_number.
-      cf_options->max_write_buffer_number_to_maintain = -1;
+    if (cf_options->max_write_buffer_size_to_maintain == 0 &&
+        cf_options->max_write_buffer_number_to_maintain == 0) {
+      // Setting to -1 will set the History size to
+      // max_write_buffer_number * write_buffer_size.
+      cf_options->max_write_buffer_size_to_maintain = -1;
     }
     if (!cf_options->disable_auto_compactions) {
       // Disable compactions momentarily to prevent race with DB::Open
@@ -337,11 +345,11 @@ Status TransactionDB::WrapStackableDB(
   return s;
 }
 
-// Let TransactionLockMgr know that this column family exists so it can
+// Let LockManager know that this column family exists so it can
 // allocate a LockMap for it.
 void PessimisticTransactionDB::AddColumnFamily(
     const ColumnFamilyHandle* handle) {
-  lock_mgr_.AddColumnFamily(handle->GetID());
+  lock_manager_->AddColumnFamily(handle);
 }
 
 Status PessimisticTransactionDB::CreateColumnFamily(
@@ -355,14 +363,14 @@ Status PessimisticTransactionDB::CreateColumnFamily(
 
   s = db_->CreateColumnFamily(options, column_family_name, handle);
   if (s.ok()) {
-    lock_mgr_.AddColumnFamily((*handle)->GetID());
+    lock_manager_->AddColumnFamily(*handle);
     UpdateCFComparatorMap(*handle);
   }
 
   return s;
 }
 
-// Let TransactionLockMgr know that it can deallocate the LockMap for this
+// Let LockManager know that it can deallocate the LockMap for this
 // column family.
 Status PessimisticTransactionDB::DropColumnFamily(
     ColumnFamilyHandle* column_family) {
@@ -370,7 +378,7 @@ Status PessimisticTransactionDB::DropColumnFamily(
 
   Status s = db_->DropColumnFamily(column_family);
   if (s.ok()) {
-    lock_mgr_.RemoveColumnFamily(column_family->GetID());
+    lock_manager_->RemoveColumnFamily(column_family);
   }
 
   return s;
@@ -380,17 +388,17 @@ Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn,
                                          uint32_t cfh_id,
                                          const std::string& key,
                                          bool exclusive) {
-  return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv(), exclusive);
+  return lock_manager_->TryLock(txn, cfh_id, key, GetEnv(), exclusive);
 }
 
 void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
-                                      const TransactionKeyMap* keys) {
-  lock_mgr_.UnLock(txn, keys, GetEnv());
+                                      const LockTracker& keys) {
+  lock_manager_->UnLock(txn, keys, GetEnv());
 }
 
 void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
                                       uint32_t cfh_id, const std::string& key) {
-  lock_mgr_.UnLock(txn, cfh_id, key, GetEnv());
+  lock_manager_->UnLock(txn, cfh_id, key, GetEnv());
 }
 
 // Used when wrapping DB write operations in a transaction
@@ -502,23 +510,16 @@ Status PessimisticTransactionDB::Merge(const WriteOptions& options,
 
 Status PessimisticTransactionDB::Write(const WriteOptions& opts,
                                        WriteBatch* updates) {
-  // Need to lock all keys in this batch to prevent write conflicts with
-  // concurrent transactions.
-  Transaction* txn = BeginInternalTransaction(opts);
-  txn->DisableIndexing();
-
-  auto txn_impl =
-      static_cast_with_check<PessimisticTransaction, Transaction>(txn);
-
-  // Since commitBatch sorts the keys before locking, concurrent Write()
-  // operations will not cause a deadlock.
-  // In order to avoid a deadlock with a concurrent Transaction, Transactions
-  // should use a lock timeout.
-  Status s = txn_impl->CommitBatch(updates);
-
-  delete txn;
+  return WriteWithConcurrencyControl(opts, updates);
+}
 
-  return s;
+Status WriteCommittedTxnDB::Write(const WriteOptions& opts,
+                                  WriteBatch* updates) {
+  if (txn_db_options_.skip_concurrency_control) {
+    return db_impl_->Write(opts, updates);
+  } else {
+    return WriteWithConcurrencyControl(opts, updates);
+  }
 }
 
 Status WriteCommittedTxnDB::Write(
@@ -527,7 +528,7 @@ Status WriteCommittedTxnDB::Write(
   if (optimizations.skip_concurrency_control) {
     return db_impl_->Write(opts, updates);
   } else {
-    return Write(opts, updates);
+    return WriteWithConcurrencyControl(opts, updates);
   }
 }
 
@@ -558,8 +559,7 @@ bool PessimisticTransactionDB::TryStealingExpiredTransactionLocks(
 void PessimisticTransactionDB::ReinitializeTransaction(
     Transaction* txn, const WriteOptions& write_options,
     const TransactionOptions& txn_options) {
-  auto txn_impl =
-      static_cast_with_check<PessimisticTransaction, Transaction>(txn);
+  auto txn_impl = static_cast_with_check<PessimisticTransaction>(txn);
 
   txn_impl->Reinitialize(this, write_options, txn_options);
 }
@@ -580,24 +580,23 @@ void PessimisticTransactionDB::GetAllPreparedTransactions(
   assert(transv);
   transv->clear();
   std::lock_guard<std::mutex> lock(name_map_mutex_);
-  for (auto it = transactions_.begin(); it != transactions_.end(); it++) {
+  for (auto it = transactions_.begin(); it != transactions_.end(); ++it) {
     if (it->second->GetState() == Transaction::PREPARED) {
       transv->push_back(it->second);
     }
   }
 }
 
-TransactionLockMgr::LockStatusData
-PessimisticTransactionDB::GetLockStatusData() {
-  return lock_mgr_.GetLockStatusData();
+LockManager::PointLockStatus PessimisticTransactionDB::GetLockStatusData() {
+  return lock_manager_->GetPointLockStatus();
 }
 
 std::vector<DeadlockPath> PessimisticTransactionDB::GetDeadlockInfoBuffer() {
-  return lock_mgr_.GetDeadlockInfoBuffer();
+  return lock_manager_->GetDeadlockInfoBuffer();
 }
 
 void PessimisticTransactionDB::SetDeadlockInfoBufferSize(uint32_t target_size) {
-  lock_mgr_.Resize(target_size);
+  lock_manager_->Resize(target_size);
 }
 
 void PessimisticTransactionDB::RegisterTransaction(Transaction* txn) {
@@ -617,5 +616,5 @@ void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) {
   transactions_.erase(it);
 }
 
-}  //  namespace rocksdb
+}  // namespace ROCKSDB_NAMESPACE
 #endif  // ROCKSDB_LITE