#include "rocksdb/utilities/transaction_db.h"
#include "util/cast_util.h"
#include "utilities/transactions/lock/lock_manager.h"
+#include "utilities/transactions/lock/range/range_lock_manager.h"
#include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/write_prepared_txn.h"
virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
inline Status WriteWithConcurrencyControl(const WriteOptions& opts,
WriteBatch* updates) {
- // Need to lock all keys in this batch to prevent write conflicts with
- // concurrent transactions.
- Transaction* txn = BeginInternalTransaction(opts);
- txn->DisableIndexing();
-
- auto txn_impl = static_cast_with_check<PessimisticTransaction>(txn);
-
- // Since commitBatch sorts the keys before locking, concurrent Write()
- // operations will not cause a deadlock.
- // In order to avoid a deadlock with a concurrent Transaction, Transactions
- // should use a lock timeout.
- Status s = txn_impl->CommitBatch(updates);
-
- delete txn;
+ Status s;
+ if (opts.protection_bytes_per_key > 0) {
+ s = WriteBatchInternal::UpdateProtectionInfo(
+ updates, opts.protection_bytes_per_key);
+ }
+ if (s.ok()) {
+ // Need to lock all keys in this batch to prevent write conflicts with
+ // concurrent transactions.
+ Transaction* txn = BeginInternalTransaction(opts);
+ txn->DisableIndexing();
+
+ auto txn_impl = static_cast_with_check<PessimisticTransaction>(txn);
+
+ // Since commitBatch sorts the keys before locking, concurrent Write()
+ // operations will not cause a deadlock.
+ // In order to avoid a deadlock with a concurrent Transaction,
+ // Transactions should use a lock timeout.
+ s = txn_impl->CommitBatch(updates);
+
+ delete txn;
+ }
return s;
}
const std::string& column_family_name,
ColumnFamilyHandle** handle) override;
+ Status CreateColumnFamilies(
+ const ColumnFamilyOptions& options,
+ const std::vector<std::string>& column_family_names,
+ std::vector<ColumnFamilyHandle*>* handles) override;
+
+ Status CreateColumnFamilies(
+ const std::vector<ColumnFamilyDescriptor>& column_families,
+ std::vector<ColumnFamilyHandle*>* handles) override;
+
using StackableDB::DropColumnFamily;
virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override;
+ Status DropColumnFamilies(
+ const std::vector<ColumnFamilyHandle*>& column_families) override;
+
Status TryLock(PessimisticTransaction* txn, uint32_t cfh_id,
const std::string& key, bool exclusive);
+ Status TryRangeLock(PessimisticTransaction* txn, uint32_t cfh_id,
+ const Endpoint& start_endp, const Endpoint& end_endp);
void UnLock(PessimisticTransaction* txn, const LockTracker& keys);
void UnLock(PessimisticTransaction* txn, uint32_t cfh_id,
return lock_manager_->GetLockTrackerFactory();
}
+ std::pair<Status, std::shared_ptr<const Snapshot>> CreateTimestampedSnapshot(
+ TxnTimestamp ts) override;
+
+ std::shared_ptr<const Snapshot> GetTimestampedSnapshot(
+ TxnTimestamp ts) const override;
+
+ void ReleaseTimestampedSnapshotsOlderThan(TxnTimestamp ts) override;
+
+ Status GetTimestampedSnapshots(TxnTimestamp ts_lb, TxnTimestamp ts_ub,
+ std::vector<std::shared_ptr<const Snapshot>>&
+ timestamped_snapshots) const override;
+
protected:
DBImpl* db_impl_;
std::shared_ptr<Logger> info_log_;
const TransactionDBOptions txn_db_options_;
+ static Status FailIfBatchHasTs(const WriteBatch* wb);
+
+ static Status FailIfCfEnablesTs(const DB* db,
+ const ColumnFamilyHandle* column_family);
+
void ReinitializeTransaction(
Transaction* txn, const WriteOptions& write_options,
const TransactionOptions& txn_options = TransactionOptions());
friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
friend class WriteUnpreparedTransactionTest_MarkLogWithPrepSection_Test;
- std::unique_ptr<LockManager> lock_manager_;
+ Transaction* BeginInternalTransaction(const WriteOptions& options);
+
+ std::shared_ptr<LockManager> lock_manager_;
// Must be held when adding/dropping column families.
InstrumentedMutex column_family_mutex_;
- Transaction* BeginInternalTransaction(const WriteOptions& options);
// Used to ensure that no locks are stolen from an expirable transaction
// that has started a commit. Only transactions with an expiration time
virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
};
+inline Status PessimisticTransactionDB::FailIfBatchHasTs(
+ const WriteBatch* batch) {
+ if (batch != nullptr && WriteBatchInternal::HasKeyWithTimestamp(*batch)) {
+ return Status::NotSupported(
+ "Writes with timestamp must go through transaction API instead of "
+ "TransactionDB.");
+ }
+ return Status::OK();
+}
+
+inline Status PessimisticTransactionDB::FailIfCfEnablesTs(
+ const DB* db, const ColumnFamilyHandle* column_family) {
+ assert(db);
+ column_family = column_family ? column_family : db->DefaultColumnFamily();
+ assert(column_family);
+ const Comparator* const ucmp = column_family->GetComparator();
+ assert(ucmp);
+ if (ucmp->timestamp_size() > 0) {
+ return Status::NotSupported(
+ "Write operation with user timestamp must go through the transaction "
+ "API instead of TransactionDB.");
+ }
+ return Status::OK();
+}
+
+class SnapshotCreationCallback : public PostMemTableCallback {
+ public:
+ explicit SnapshotCreationCallback(
+ DBImpl* dbi, TxnTimestamp commit_ts,
+ const std::shared_ptr<TransactionNotifier>& notifier,
+ std::shared_ptr<const Snapshot>& snapshot)
+ : db_impl_(dbi),
+ commit_ts_(commit_ts),
+ snapshot_notifier_(notifier),
+ snapshot_(snapshot) {
+ assert(db_impl_);
+ }
+
+ ~SnapshotCreationCallback() override {
+ snapshot_creation_status_.PermitUncheckedError();
+ }
+
+ Status operator()(SequenceNumber seq, bool disable_memtable) override;
+
+ private:
+ DBImpl* const db_impl_;
+ const TxnTimestamp commit_ts_;
+ std::shared_ptr<TransactionNotifier> snapshot_notifier_;
+ std::shared_ptr<const Snapshot>& snapshot_;
+
+ Status snapshot_creation_status_;
+};
+
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE