#include <inttypes.h>
#include <algorithm>
+#include <atomic>
#include <functional>
#include <string>
#include <thread>
#include "rocksdb/utilities/transaction_db.h"
#include "table/mock_table.h"
#include "util/fault_injection_test_env.h"
+#include "util/mutexlock.h"
#include "util/random.h"
#include "util/string_util.h"
#include "util/sync_point.h"
public:
WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt)
: WritePreparedTxnDB(db_impl, opt) {}
- WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt,
- size_t snapshot_cache_size)
- : WritePreparedTxnDB(db_impl, opt, snapshot_cache_size) {}
- WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt,
- size_t snapshot_cache_size, size_t commit_cache_size)
- : WritePreparedTxnDB(db_impl, opt, snapshot_cache_size,
- commit_cache_size) {}
void SetDBSnapshots(const std::vector<SequenceNumber>& snapshots) {
snapshots_ = snapshots;
}
void TakeSnapshot(SequenceNumber seq) { snapshots_.push_back(seq); }
protected:
- virtual const std::vector<SequenceNumber> GetSnapshotListFromDB(
+ const std::vector<SequenceNumber> GetSnapshotListFromDB(
SequenceNumber /* unused */) override {
return snapshots_;
}
: TransactionTestBase(use_stackable_db, two_write_queue, write_policy){};
protected:
+ void UpdateTransactionDBOptions(size_t snapshot_cache_bits,
+ size_t commit_cache_bits) {
+ txn_db_options.wp_snapshot_cache_bits = snapshot_cache_bits;
+ txn_db_options.wp_commit_cache_bits = commit_cache_bits;
+ }
+ void UpdateTransactionDBOptions(size_t snapshot_cache_bits) {
+ txn_db_options.wp_snapshot_cache_bits = snapshot_cache_bits;
+ }
// If expect_update is set, check if it actually updated old_commit_map_. If
// it did not and yet suggested not to check the next snapshot, do the
// opposite to check if it was not a bad suggestion.
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
}
+// Reproduce the bug with two snapshots with the same seuqence number and test
+// that the release of the first snapshot will not affect the reads by the other
+// snapshot
+TEST_P(WritePreparedTransactionTest, DoubleSnapshot) {
+ TransactionOptions txn_options;
+ Status s;
+
+ // Insert initial value
+ ASSERT_OK(db->Put(WriteOptions(), "key", "value1"));
+
+ WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
+ Transaction* txn =
+ wp_db->BeginTransaction(WriteOptions(), txn_options, nullptr);
+ ASSERT_OK(txn->SetName("txn"));
+ ASSERT_OK(txn->Put("key", "value2"));
+ ASSERT_OK(txn->Prepare());
+ // Three snapshots with the same seq number
+ const Snapshot* snapshot0 = wp_db->GetSnapshot();
+ const Snapshot* snapshot1 = wp_db->GetSnapshot();
+ const Snapshot* snapshot2 = wp_db->GetSnapshot();
+ ASSERT_OK(txn->Commit());
+ SequenceNumber cache_size = wp_db->COMMIT_CACHE_SIZE;
+ SequenceNumber overlap_seq = txn->GetId() + cache_size;
+ delete txn;
+
+ // 4th snapshot with a larger seq
+ const Snapshot* snapshot3 = wp_db->GetSnapshot();
+ // Cause an eviction to advance max evicted seq number
+ // This also fetches the 4 snapshots from db since their seq is lower than the
+ // new max
+ wp_db->AddCommitted(overlap_seq, overlap_seq);
+
+ ReadOptions ropt;
+ // It should see the value before commit
+ ropt.snapshot = snapshot2;
+ PinnableSlice pinnable_val;
+ s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val);
+ ASSERT_OK(s);
+ ASSERT_TRUE(pinnable_val == "value1");
+ pinnable_val.Reset();
+
+ wp_db->ReleaseSnapshot(snapshot1);
+
+ // It should still see the value before commit
+ s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val);
+ ASSERT_OK(s);
+ ASSERT_TRUE(pinnable_val == "value1");
+ pinnable_val.Reset();
+
+ // Cause an eviction to advance max evicted seq number and trigger updating
+ // the snapshot list
+ overlap_seq += cache_size;
+ wp_db->AddCommitted(overlap_seq, overlap_seq);
+
+ // It should still see the value before commit
+ s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val);
+ ASSERT_OK(s);
+ ASSERT_TRUE(pinnable_val == "value1");
+ pinnable_val.Reset();
+
+ wp_db->ReleaseSnapshot(snapshot0);
+ wp_db->ReleaseSnapshot(snapshot2);
+ wp_db->ReleaseSnapshot(snapshot3);
+}
+
+size_t UniqueCnt(std::vector<SequenceNumber> vec) {
+ std::set<SequenceNumber> aset;
+ for (auto i : vec) {
+ aset.insert(i);
+ }
+ return aset.size();
+}
// Test that the entries in old_commit_map_ get garbage collected properly
TEST_P(WritePreparedTransactionTest, OldCommitMapGC) {
const size_t snapshot_cache_bits = 0;
const size_t commit_cache_bits = 0;
DBImpl* mock_db = new DBImpl(options, dbname);
- std::unique_ptr<WritePreparedTxnDBMock> wp_db(new WritePreparedTxnDBMock(
- mock_db, txn_db_options, snapshot_cache_bits, commit_cache_bits));
+ UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
+ std::unique_ptr<WritePreparedTxnDBMock> wp_db(
+ new WritePreparedTxnDBMock(mock_db, txn_db_options));
SequenceNumber seq = 0;
// Take the first snapshot that overlaps with two txn
ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
ReadLock rl(&wp_db->old_commit_map_mutex_);
ASSERT_EQ(3, wp_db->old_commit_map_.size());
- ASSERT_EQ(2, wp_db->old_commit_map_[snap_seq1].size());
- ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq2].size());
- ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq3].size());
+ ASSERT_EQ(2, UniqueCnt(wp_db->old_commit_map_[snap_seq1]));
+ ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq2]));
+ ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3]));
}
// Verify that the 2nd snapshot is cleaned up after the release
ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
ReadLock rl(&wp_db->old_commit_map_mutex_);
ASSERT_EQ(2, wp_db->old_commit_map_.size());
- ASSERT_EQ(2, wp_db->old_commit_map_[snap_seq1].size());
- ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq3].size());
+ ASSERT_EQ(2, UniqueCnt(wp_db->old_commit_map_[snap_seq1]));
+ ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3]));
}
// Verify that the 1st snapshot is cleaned up after the release
ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
ReadLock rl(&wp_db->old_commit_map_mutex_);
ASSERT_EQ(1, wp_db->old_commit_map_.size());
- ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq3].size());
+ ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3]));
}
// Verify that the 3rd snapshot is cleaned up after the release
std::vector<SequenceNumber> snapshots = {100l, 200l, 300l, 400l, 500l,
600l, 700l, 800l, 900l};
const size_t snapshot_cache_bits = 2;
+ const uint64_t cache_size = 1ul << snapshot_cache_bits;
// Safety check to express the intended size in the test. Can be adjusted if
// the snapshots lists changed.
assert((1ul << snapshot_cache_bits) * 2 + 1 == snapshots.size());
DBImpl* mock_db = new DBImpl(options, dbname);
+ UpdateTransactionDBOptions(snapshot_cache_bits);
std::unique_ptr<WritePreparedTxnDBMock> wp_db(
- new WritePreparedTxnDBMock(mock_db, txn_db_options, snapshot_cache_bits));
+ new WritePreparedTxnDBMock(mock_db, txn_db_options));
SequenceNumber version = 1000l;
ASSERT_EQ(0, wp_db->snapshots_total_);
wp_db->UpdateSnapshots(snapshots, version);
commit_entry.prep_seq <= snapshots.back();
ASSERT_EQ(expect_update, !wp_db->old_commit_map_empty_);
}
+
+ // Test that search will include multiple snapshot from snapshot cache
+ {
+ // exclude first and last item in the cache
+ CommitEntry commit_entry = {snapshots.front() + 1,
+ snapshots[cache_size - 1] - 1};
+ wp_db->old_commit_map_empty_ = true; // reset
+ wp_db->old_commit_map_.clear();
+ wp_db->CheckAgainstSnapshots(commit_entry);
+ ASSERT_EQ(wp_db->old_commit_map_.size(), cache_size - 2);
+ }
+
+ // Test that search will include multiple snapshot from old snapshots
+ {
+ // include two in the middle
+ CommitEntry commit_entry = {snapshots[cache_size] + 1,
+ snapshots[cache_size + 2] + 1};
+ wp_db->old_commit_map_empty_ = true; // reset
+ wp_db->old_commit_map_.clear();
+ wp_db->CheckAgainstSnapshots(commit_entry);
+ ASSERT_EQ(wp_db->old_commit_map_.size(), 2);
+ }
+
+ // Test that search will include both snapshot cache and old snapshots
+ // Case 1: includes all in snapshot cache
+ {
+ CommitEntry commit_entry = {snapshots.front() - 1, snapshots.back() + 1};
+ wp_db->old_commit_map_empty_ = true; // reset
+ wp_db->old_commit_map_.clear();
+ wp_db->CheckAgainstSnapshots(commit_entry);
+ ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size());
+ }
+
+ // Case 2: includes all snapshot caches except the smallest
+ {
+ CommitEntry commit_entry = {snapshots.front() + 1, snapshots.back() + 1};
+ wp_db->old_commit_map_empty_ = true; // reset
+ wp_db->old_commit_map_.clear();
+ wp_db->CheckAgainstSnapshots(commit_entry);
+ ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size() - 1);
+ }
+
+ // Case 3: includes only the largest of snapshot cache
+ {
+ CommitEntry commit_entry = {snapshots[cache_size - 1] - 1,
+ snapshots.back() + 1};
+ wp_db->old_commit_map_empty_ = true; // reset
+ wp_db->old_commit_map_.clear();
+ wp_db->CheckAgainstSnapshots(commit_entry);
+ ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size() - cache_size + 1);
+ }
}
// This test is too slow for travis
// Choose the cache size so that the new snapshot list could replace all the
// existing items in the cache and also have some overflow.
DBImpl* mock_db = new DBImpl(options, dbname);
+ UpdateTransactionDBOptions(snapshot_cache_bits);
std::unique_ptr<WritePreparedTxnDBMock> wp_db(
- new WritePreparedTxnDBMock(mock_db, txn_db_options, snapshot_cache_bits));
+ new WritePreparedTxnDBMock(mock_db, txn_db_options));
const size_t extra = 2;
size_t loop_id = 0;
// Add up to extra items that do not fit into the cache
}
}
+// A new snapshot should always be always larger than max_evicted_seq_
+// Otherwise the snapshot does not go through AdvanceMaxEvictedSeq
+TEST_P(WritePreparedTransactionTest, NewSnapshotLargerThanMax) {
+ WriteOptions woptions;
+ TransactionOptions txn_options;
+ WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
+ Transaction* txn0 = db->BeginTransaction(woptions, txn_options);
+ ASSERT_OK(txn0->Put(Slice("key"), Slice("value")));
+ ASSERT_OK(txn0->Commit());
+ const SequenceNumber seq = txn0->GetId(); // is also prepare seq
+ delete txn0;
+ std::vector<Transaction*> txns;
+ // Inc seq without committing anything
+ for (int i = 0; i < 10; i++) {
+ Transaction* txn = db->BeginTransaction(woptions, txn_options);
+ ASSERT_OK(txn->SetName("xid" + std::to_string(i)));
+ ASSERT_OK(txn->Put(Slice("key" + std::to_string(i)), Slice("value")));
+ ASSERT_OK(txn->Prepare());
+ txns.push_back(txn);
+ }
+
+ // The new commit is seq + 10
+ ASSERT_OK(db->Put(woptions, "key", "value"));
+ auto snap = wp_db->GetSnapshot();
+ const SequenceNumber last_seq = snap->GetSequenceNumber();
+ wp_db->ReleaseSnapshot(snap);
+ ASSERT_LT(seq, last_seq);
+ // Otherwise our test is not effective
+ ASSERT_LT(last_seq - seq, wp_db->INC_STEP_FOR_MAX_EVICTED);
+
+ // Evict seq out of commit cache
+ const SequenceNumber overwrite_seq = seq + wp_db->COMMIT_CACHE_SIZE;
+ // Check that the next write could make max go beyond last
+ auto last_max = wp_db->max_evicted_seq_.load();
+ wp_db->AddCommitted(overwrite_seq, overwrite_seq);
+ // Check that eviction has advanced the max
+ ASSERT_LT(last_max, wp_db->max_evicted_seq_.load());
+ // Check that the new max has not advanced the last seq
+ ASSERT_LT(wp_db->max_evicted_seq_.load(), last_seq);
+ for (auto txn : txns) {
+ txn->Rollback();
+ delete txn;
+ }
+}
+
+// A new snapshot should always be always larger than max_evicted_seq_
+// In very rare cases max could be below last published seq. Test that
+// taking snapshot will wait for max to catch up.
+TEST_P(WritePreparedTransactionTest, MaxCatchupWithNewSnapshot) {
+ const size_t snapshot_cache_bits = 7; // same as default
+ const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction
+ UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
+ ReOpen();
+ WriteOptions woptions;
+ WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
+
+ const int writes = 50;
+ const int batch_cnt = 4;
+ rocksdb::port::Thread t1([&]() {
+ for (int i = 0; i < writes; i++) {
+ WriteBatch batch;
+ // For duplicate keys cause 4 commit entries, each evicting an entry that
+ // is not published yet, thus causing max ecited seq go higher than last
+ // published.
+ for (int b = 0; b < batch_cnt; b++) {
+ batch.Put("foo", "foo");
+ }
+ db->Write(woptions, &batch);
+ }
+ });
+
+ rocksdb::port::Thread t2([&]() {
+ while (wp_db->max_evicted_seq_ == 0) { // wait for insert thread
+ std::this_thread::yield();
+ }
+ for (int i = 0; i < 10; i++) {
+ auto snap = db->GetSnapshot();
+ if (snap->GetSequenceNumber() != 0) {
+ ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
+ } // seq 0 is ok to be less than max since nothing is visible to it
+ db->ReleaseSnapshot(snap);
+ }
+ });
+
+ t1.join();
+ t2.join();
+
+ // Make sure that the test has worked and seq number has advanced as we
+ // thought
+ auto snap = db->GetSnapshot();
+ ASSERT_GT(snap->GetSequenceNumber(), batch_cnt * writes - 1);
+ db->ReleaseSnapshot(snap);
+}
+
+// Check that old_commit_map_ cleanup works correctly if the snapshot equals
+// max_evicted_seq_.
+TEST_P(WritePreparedTransactionTest, CleanupSnapshotEqualToMax) {
+ const size_t snapshot_cache_bits = 7; // same as default
+ const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction
+ UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
+ ReOpen();
+ WriteOptions woptions;
+ WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
+ // Insert something to increase seq
+ ASSERT_OK(db->Put(woptions, "key", "value"));
+ auto snap = db->GetSnapshot();
+ auto snap_seq = snap->GetSequenceNumber();
+ // Another insert should trigger eviction + load snapshot from db
+ ASSERT_OK(db->Put(woptions, "key", "value"));
+ // This is the scenario that we check agaisnt
+ ASSERT_EQ(snap_seq, wp_db->max_evicted_seq_);
+ // old_commit_map_ now has some data that needs gc
+ ASSERT_EQ(1, wp_db->snapshots_total_);
+ ASSERT_EQ(1, wp_db->old_commit_map_.size());
+
+ db->ReleaseSnapshot(snap);
+
+ // Another insert should trigger eviction + load snapshot from db
+ ASSERT_OK(db->Put(woptions, "key", "value"));
+
+ // the snapshot and related metadata must be properly garbage collected
+ ASSERT_EQ(0, wp_db->snapshots_total_);
+ ASSERT_TRUE(wp_db->snapshots_all_.empty());
+ ASSERT_EQ(0, wp_db->old_commit_map_.size());
+}
+
+TEST_P(WritePreparedTransactionTest, AdvanceSeqByOne) {
+ auto snap = db->GetSnapshot();
+ auto seq1 = snap->GetSequenceNumber();
+ db->ReleaseSnapshot(snap);
+
+ WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
+ wp_db->AdvanceSeqByOne();
+
+ snap = db->GetSnapshot();
+ auto seq2 = snap->GetSequenceNumber();
+ db->ReleaseSnapshot(snap);
+
+ ASSERT_LT(seq1, seq2);
+}
+
+// Test that the txn Initilize calls the overridden functions
+TEST_P(WritePreparedTransactionTest, TxnInitialize) {
+ TransactionOptions txn_options;
+ WriteOptions write_options;
+ ASSERT_OK(db->Put(write_options, "key", "value"));
+ Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
+ ASSERT_OK(txn0->SetName("xid"));
+ ASSERT_OK(txn0->Put(Slice("key"), Slice("value1")));
+ ASSERT_OK(txn0->Prepare());
+
+ // SetSnapshot is overridden to update min_uncommitted_
+ txn_options.set_snapshot = true;
+ Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
+ auto snap = txn1->GetSnapshot();
+ auto snap_impl = reinterpret_cast<const SnapshotImpl*>(snap);
+ // If ::Initialize calls the overriden SetSnapshot, min_uncommitted_ must be
+ // udpated
+ ASSERT_GT(snap_impl->min_uncommitted_, kMinUnCommittedSeq);
+
+ txn0->Rollback();
+ txn1->Rollback();
+ delete txn0;
+ delete txn1;
+}
+
// This tests that transactions with duplicate keys perform correctly after max
// is advancing their prepared sequence numbers. This will not be the case if
// for example the txn does not add the prepared seq for the second sub-batch to
-// the PrepareHeap structure.
+// the PreparedHeap structure.
TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqWithDuplicatesTest) {
WriteOptions write_options;
TransactionOptions txn_options;
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
// Ensure that all the prepared sequence numbers will be removed from the
- // PrepareHeap.
+ // PreparedHeap.
SequenceNumber new_max = wp_db->COMMIT_CACHE_SIZE;
wp_db->AdvanceMaxEvictedSeq(0, new_max);
assert(db != nullptr);
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
seq = db_impl->TEST_GetLastVisibleSequence();
- ASSERT_EQ(exp_seq, seq);
+ ASSERT_LE(exp_seq, seq);
// Check if flush preserves the last sequence number
db_impl->Flush(fopt);
seq = db_impl->GetLatestSequenceNumber();
- ASSERT_EQ(exp_seq, seq);
+ ASSERT_LE(exp_seq, seq);
// Check if recovery after flush preserves the last sequence number
db_impl->FlushWAL(true);
assert(db != nullptr);
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
seq = db_impl->GetLatestSequenceNumber();
- ASSERT_EQ(exp_seq, seq);
+ ASSERT_LE(exp_seq, seq);
}
}
}
// After recovery the commit map is empty while the max is set. The code would
-// go through a different path which requires a separate test.
+// go through a different path which requires a separate test. Test that the
+// committed data before the restart is visible to all snapshots.
TEST_P(WritePreparedTransactionTest, IsInSnapshotEmptyMapTest) {
+ for (bool end_with_prepare : {false, true}) {
+ ReOpen();
+ WriteOptions woptions;
+ ASSERT_OK(db->Put(woptions, "key", "value"));
+ ASSERT_OK(db->Put(woptions, "key", "value"));
+ ASSERT_OK(db->Put(woptions, "key", "value"));
+ SequenceNumber prepare_seq = kMaxSequenceNumber;
+ if (end_with_prepare) {
+ TransactionOptions txn_options;
+ Transaction* txn = db->BeginTransaction(woptions, txn_options);
+ ASSERT_OK(txn->SetName("xid0"));
+ ASSERT_OK(txn->Prepare());
+ prepare_seq = txn->GetId();
+ delete txn;
+ }
+ dynamic_cast<WritePreparedTxnDB*>(db)->TEST_Crash();
+ auto db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
+ db_impl->FlushWAL(true);
+ ReOpenNoDelete();
+ WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
+ assert(wp_db != nullptr);
+ ASSERT_GT(wp_db->max_evicted_seq_, 0); // max after recovery
+ // Take a snapshot right after recovery
+ const Snapshot* snap = db->GetSnapshot();
+ auto snap_seq = snap->GetSequenceNumber();
+ ASSERT_GT(snap_seq, 0);
+
+ for (SequenceNumber seq = 0;
+ seq <= wp_db->max_evicted_seq_ && seq != prepare_seq; seq++) {
+ ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq));
+ }
+ if (end_with_prepare) {
+ ASSERT_FALSE(wp_db->IsInSnapshot(prepare_seq, snap_seq));
+ }
+ // trivial check
+ ASSERT_FALSE(wp_db->IsInSnapshot(snap_seq + 1, snap_seq));
+
+ db->ReleaseSnapshot(snap);
+
+ ASSERT_OK(db->Put(woptions, "key", "value"));
+ // Take a snapshot after some writes
+ snap = db->GetSnapshot();
+ snap_seq = snap->GetSequenceNumber();
+ for (SequenceNumber seq = 0;
+ seq <= wp_db->max_evicted_seq_ && seq != prepare_seq; seq++) {
+ ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq));
+ }
+ if (end_with_prepare) {
+ ASSERT_FALSE(wp_db->IsInSnapshot(prepare_seq, snap_seq));
+ }
+ // trivial check
+ ASSERT_FALSE(wp_db->IsInSnapshot(snap_seq + 1, snap_seq));
+
+ db->ReleaseSnapshot(snap);
+ }
+}
+
+// Shows the contract of IsInSnapshot when called on invalid/released snapshots
+TEST_P(WritePreparedTransactionTest, IsInSnapshotReleased) {
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
- wp_db->max_evicted_seq_ = 100;
- ASSERT_FALSE(wp_db->IsInSnapshot(50, 40));
- ASSERT_TRUE(wp_db->IsInSnapshot(50, 50));
- ASSERT_TRUE(wp_db->IsInSnapshot(50, 100));
- ASSERT_TRUE(wp_db->IsInSnapshot(50, 150));
- ASSERT_FALSE(wp_db->IsInSnapshot(100, 80));
- ASSERT_TRUE(wp_db->IsInSnapshot(100, 100));
- ASSERT_TRUE(wp_db->IsInSnapshot(100, 150));
+ WriteOptions woptions;
+ ASSERT_OK(db->Put(woptions, "key", "value"));
+ // snap seq = 1
+ const Snapshot* snap1 = db->GetSnapshot();
+ ASSERT_OK(db->Put(woptions, "key", "value"));
+ ASSERT_OK(db->Put(woptions, "key", "value"));
+ // snap seq = 3
+ const Snapshot* snap2 = db->GetSnapshot();
+ const SequenceNumber seq = 1;
+ // Evict seq out of commit cache
+ size_t overwrite_seq = wp_db->COMMIT_CACHE_SIZE + seq;
+ wp_db->AddCommitted(overwrite_seq, overwrite_seq);
+ SequenceNumber snap_seq;
+ uint64_t min_uncommitted = kMinUnCommittedSeq;
+ bool released;
+
+ released = false;
+ snap_seq = snap1->GetSequenceNumber();
+ ASSERT_LE(seq, snap_seq);
+ // Valid snapshot lower than max
+ ASSERT_LE(snap_seq, wp_db->max_evicted_seq_);
+ ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
+ ASSERT_FALSE(released);
+
+ released = false;
+ snap_seq = snap1->GetSequenceNumber();
+ // Invaid snapshot lower than max
+ ASSERT_LE(snap_seq + 1, wp_db->max_evicted_seq_);
+ ASSERT_TRUE(
+ wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released));
+ ASSERT_TRUE(released);
+
+ db->ReleaseSnapshot(snap1);
+
+ released = false;
+ // Released snapshot lower than max
+ ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
+ // The release does not take affect until the next max advance
+ ASSERT_FALSE(released);
+
+ released = false;
+ // Invaid snapshot lower than max
+ ASSERT_TRUE(
+ wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released));
+ ASSERT_TRUE(released);
+
+ // This make the snapshot release to reflect in txn db structures
+ wp_db->AdvanceMaxEvictedSeq(wp_db->max_evicted_seq_,
+ wp_db->max_evicted_seq_ + 1);
+
+ released = false;
+ // Released snapshot lower than max
+ ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
+ ASSERT_TRUE(released);
+
+ released = false;
+ // Invaid snapshot lower than max
+ ASSERT_TRUE(
+ wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released));
+ ASSERT_TRUE(released);
+
+ snap_seq = snap2->GetSequenceNumber();
+
+ released = false;
+ // Unreleased snapshot lower than max
+ ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
+ ASSERT_FALSE(released);
+
+ db->ReleaseSnapshot(snap2);
}
// Test WritePreparedTxnDB's IsInSnapshot against different ordering of
// The set of commit seq numbers to be excluded from IsInSnapshot queries
std::set<uint64_t> commit_seqs;
DBImpl* mock_db = new DBImpl(options, dbname);
- std::unique_ptr<WritePreparedTxnDBMock> wp_db(new WritePreparedTxnDBMock(
- mock_db, txn_db_options, snapshot_cache_bits, commit_cache_bits));
+ UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
+ std::unique_ptr<WritePreparedTxnDBMock> wp_db(
+ new WritePreparedTxnDBMock(mock_db, txn_db_options));
// We continue until max advances a bit beyond the snapshot.
while (!snapshot || wp_db->max_evicted_seq_ < snapshot + 100) {
// do prepare for a transaction
db->ReleaseSnapshot(snapshot2);
}
+TEST_P(WritePreparedTransactionTest, SmallestUncommittedOptimization) {
+ const size_t snapshot_cache_bits = 7; // same as default
+ const size_t commit_cache_bits = 0; // disable commit cache
+ for (bool has_recent_prepare : {true, false}) {
+ UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
+ ReOpen();
+
+ ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
+ auto* transaction =
+ db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
+ ASSERT_OK(transaction->SetName("txn"));
+ ASSERT_OK(transaction->Delete("key1"));
+ ASSERT_OK(transaction->Prepare());
+ // snapshot1 should get min_uncommitted from prepared_txns_ heap.
+ auto snapshot1 = db->GetSnapshot();
+ ASSERT_EQ(transaction->GetId(),
+ ((SnapshotImpl*)snapshot1)->min_uncommitted_);
+ // Add a commit to advance max_evicted_seq and move the prepared transaction
+ // into delayed_prepared_ set.
+ ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
+ Transaction* txn2 = nullptr;
+ if (has_recent_prepare) {
+ txn2 =
+ db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
+ ASSERT_OK(txn2->SetName("txn2"));
+ ASSERT_OK(txn2->Put("key3", "value3"));
+ ASSERT_OK(txn2->Prepare());
+ }
+ // snapshot2 should get min_uncommitted from delayed_prepared_ set.
+ auto snapshot2 = db->GetSnapshot();
+ ASSERT_EQ(transaction->GetId(),
+ ((SnapshotImpl*)snapshot1)->min_uncommitted_);
+ ASSERT_OK(transaction->Commit());
+ delete transaction;
+ if (has_recent_prepare) {
+ ASSERT_OK(txn2->Commit());
+ delete txn2;
+ }
+ VerifyKeys({{"key1", "NOT_FOUND"}});
+ VerifyKeys({{"key1", "value1"}}, snapshot1);
+ VerifyKeys({{"key1", "value1"}}, snapshot2);
+ db->ReleaseSnapshot(snapshot1);
+ db->ReleaseSnapshot(snapshot2);
+ }
+}
+
+// Insert two values, v1 and v2, for a key. Between prepare and commit of v2
+// take two snapshots, s1 and s2. Release s1 during compaction.
+// Test to make sure compaction doesn't get confused and think s1 can see both
+// values, and thus compact out the older value by mistake.
+TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction) {
+ const size_t snapshot_cache_bits = 7; // same as default
+ const size_t commit_cache_bits = 0; // minimum commit cache
+ UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
+ ReOpen();
+
+ ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_1"));
+ auto* transaction =
+ db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
+ ASSERT_OK(transaction->SetName("txn"));
+ ASSERT_OK(transaction->Put("key1", "value1_2"));
+ ASSERT_OK(transaction->Prepare());
+ auto snapshot1 = db->GetSnapshot();
+ // Increment sequence number.
+ ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
+ auto snapshot2 = db->GetSnapshot();
+ ASSERT_OK(transaction->Commit());
+ delete transaction;
+ VerifyKeys({{"key1", "value1_2"}});
+ VerifyKeys({{"key1", "value1_1"}}, snapshot1);
+ VerifyKeys({{"key1", "value1_1"}}, snapshot2);
+ // Add a flush to avoid compaction to fallback to trivial move.
+
+ auto callback = [&](void*) {
+ // Release snapshot1 after CompactionIterator init.
+ // CompactionIterator need to figure out the earliest snapshot
+ // that can see key1:value1_2 is kMaxSequenceNumber, not
+ // snapshot1 or snapshot2.
+ db->ReleaseSnapshot(snapshot1);
+ // Add some keys to advance max_evicted_seq.
+ ASSERT_OK(db->Put(WriteOptions(), "key3", "value3"));
+ ASSERT_OK(db->Put(WriteOptions(), "key4", "value4"));
+ };
+ SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
+ callback);
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ ASSERT_OK(db->Flush(FlushOptions()));
+ VerifyKeys({{"key1", "value1_2"}});
+ VerifyKeys({{"key1", "value1_1"}}, snapshot2);
+ db->ReleaseSnapshot(snapshot2);
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+// Insert two values, v1 and v2, for a key. Take two snapshots, s1 and s2,
+// after committing v2. Release s1 during compaction, right after compaction
+// processes v2 and before processes v1. Test to make sure compaction doesn't
+// get confused and believe v1 and v2 are visible to different snapshot
+// (v1 by s2, v2 by s1) and refuse to compact out v1.
+TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction2) {
+ const size_t snapshot_cache_bits = 7; // same as default
+ const size_t commit_cache_bits = 0; // minimum commit cache
+ UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
+ ReOpen();
+
+ ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
+ ASSERT_OK(db->Put(WriteOptions(), "key1", "value2"));
+ SequenceNumber v2_seq = db->GetLatestSequenceNumber();
+ auto* s1 = db->GetSnapshot();
+ // Advance sequence number.
+ ASSERT_OK(db->Put(WriteOptions(), "key2", "dummy"));
+ auto* s2 = db->GetSnapshot();
+
+ int count_value = 0;
+ auto callback = [&](void* arg) {
+ auto* ikey = reinterpret_cast<ParsedInternalKey*>(arg);
+ if (ikey->user_key == "key1") {
+ count_value++;
+ if (count_value == 2) {
+ // Processing v1.
+ db->ReleaseSnapshot(s1);
+ // Add some keys to advance max_evicted_seq and update
+ // old_commit_map.
+ ASSERT_OK(db->Put(WriteOptions(), "key3", "dummy"));
+ ASSERT_OK(db->Put(WriteOptions(), "key4", "dummy"));
+ }
+ }
+ };
+ SyncPoint::GetInstance()->SetCallBack("CompactionIterator:ProcessKV",
+ callback);
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ ASSERT_OK(db->Flush(FlushOptions()));
+ // value1 should be compact out.
+ VerifyInternalKeys({{"key1", "value2", v2_seq, kTypeValue}});
+
+ // cleanup
+ db->ReleaseSnapshot(s2);
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+// Insert two values, v1 and v2, for a key. Insert another dummy key
+// so to evict the commit cache for v2, while v1 is still in commit cache.
+// Take two snapshots, s1 and s2. Release s1 during compaction.
+// Since commit cache for v2 is evicted, and old_commit_map don't have
+// s1 (it is released),
+// TODO(myabandeh): how can we be sure that the v2's commit info is evicted
+// (and not v1's)? Instead of putting a dummy, we can directly call
+// AddCommitted(v2_seq + cache_size, ...) to evict v2's entry from commit cache.
+TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction3) {
+ const size_t snapshot_cache_bits = 7; // same as default
+ const size_t commit_cache_bits = 1; // commit cache size = 2
+ UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
+ ReOpen();
+
+ // Add a dummy key to evict v2 commit cache, but keep v1 commit cache.
+ // It also advance max_evicted_seq and can trigger old_commit_map cleanup.
+ auto add_dummy = [&]() {
+ auto* txn_dummy =
+ db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
+ ASSERT_OK(txn_dummy->SetName("txn_dummy"));
+ ASSERT_OK(txn_dummy->Put("dummy", "dummy"));
+ ASSERT_OK(txn_dummy->Prepare());
+ ASSERT_OK(txn_dummy->Commit());
+ delete txn_dummy;
+ };
+
+ ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
+ auto* txn =
+ db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
+ ASSERT_OK(txn->SetName("txn"));
+ ASSERT_OK(txn->Put("key1", "value2"));
+ ASSERT_OK(txn->Prepare());
+ // TODO(myabandeh): replace it with GetId()?
+ auto v2_seq = db->GetLatestSequenceNumber();
+ ASSERT_OK(txn->Commit());
+ delete txn;
+ auto* s1 = db->GetSnapshot();
+ // Dummy key to advance sequence number.
+ add_dummy();
+ auto* s2 = db->GetSnapshot();
+
+ auto callback = [&](void*) {
+ db->ReleaseSnapshot(s1);
+ // Add some dummy entries to trigger s1 being cleanup from old_commit_map.
+ add_dummy();
+ add_dummy();
+ };
+ SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
+ callback);
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ ASSERT_OK(db->Flush(FlushOptions()));
+ // value1 should be compact out.
+ VerifyInternalKeys({{"key1", "value2", v2_seq, kTypeValue}});
+
+ db->ReleaseSnapshot(s2);
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotDuringCompaction) {
+ const size_t snapshot_cache_bits = 7; // same as default
+ const size_t commit_cache_bits = 0; // minimum commit cache
+ UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
+ ReOpen();
+
+ ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
+ auto* transaction =
+ db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
+ ASSERT_OK(transaction->SetName("txn"));
+ ASSERT_OK(transaction->Delete("key1"));
+ ASSERT_OK(transaction->Prepare());
+ SequenceNumber del_seq = db->GetLatestSequenceNumber();
+ auto snapshot1 = db->GetSnapshot();
+ // Increment sequence number.
+ ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
+ auto snapshot2 = db->GetSnapshot();
+ ASSERT_OK(transaction->Commit());
+ delete transaction;
+ VerifyKeys({{"key1", "NOT_FOUND"}});
+ VerifyKeys({{"key1", "value1"}}, snapshot1);
+ VerifyKeys({{"key1", "value1"}}, snapshot2);
+ ASSERT_OK(db->Flush(FlushOptions()));
+
+ auto callback = [&](void* compaction) {
+ // Release snapshot1 after CompactionIterator init.
+ // CompactionIterator need to double check and find out snapshot2 is now
+ // the earliest existing snapshot.
+ if (compaction != nullptr) {
+ db->ReleaseSnapshot(snapshot1);
+ // Add some keys to advance max_evicted_seq.
+ ASSERT_OK(db->Put(WriteOptions(), "key3", "value3"));
+ ASSERT_OK(db->Put(WriteOptions(), "key4", "value4"));
+ }
+ };
+ SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
+ callback);
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ // Dummy keys to avoid compaction trivially move files and get around actual
+ // compaction logic.
+ ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
+ ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
+ ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ // Only verify for key1. Both the put and delete for the key should be kept.
+ // Since the delete tombstone is not visible to snapshot2, we need to keep
+ // at least one version of the key, for write-conflict check.
+ VerifyInternalKeys({{"key1", "", del_seq, kTypeDeletion},
+ {"key1", "value1", 0, kTypeValue}});
+ db->ReleaseSnapshot(snapshot2);
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
// A more complex test to verify compaction/flush should keep keys visible
// to snapshots.
TEST_P(WritePreparedTransactionTest,
delete transaction;
}
+TEST_P(WritePreparedTransactionTest, CommitAndSnapshotDuringCompaction) {
+ options.disable_auto_compactions = true;
+ ReOpen();
+
+ const Snapshot* snapshot = nullptr;
+ ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
+ auto* txn = db->BeginTransaction(WriteOptions());
+ ASSERT_OK(txn->SetName("txn"));
+ ASSERT_OK(txn->Put("key1", "value2"));
+ ASSERT_OK(txn->Prepare());
+
+ auto callback = [&](void*) {
+ // Snapshot is taken after compaction start. It should be taken into
+ // consideration for whether to compact out value1.
+ snapshot = db->GetSnapshot();
+ ASSERT_OK(txn->Commit());
+ delete txn;
+ };
+ SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
+ callback);
+ SyncPoint::GetInstance()->EnableProcessing();
+ ASSERT_OK(db->Flush(FlushOptions()));
+ ASSERT_NE(nullptr, snapshot);
+ VerifyKeys({{"key1", "value2"}});
+ VerifyKeys({{"key1", "value1"}}, snapshot);
+ db->ReleaseSnapshot(snapshot);
+}
+
TEST_P(WritePreparedTransactionTest, Iterate) {
auto verify_state = [](Iterator* iter, const std::string& key,
const std::string& value) {
delete iter;
}
+// Committing an delayed prepared has two non-atomic steps: update commit cache,
+// remove seq from delayed_prepared_. The read in IsInSnapshot also involves two
+// non-atomic steps of checking these two data structures. This test breaks each
+// in the middle to ensure correctness in spite of non-atomic execution.
+// Note: This test is limitted to the case where snapshot is larger than the
+// max_evicted_seq_.
+TEST_P(WritePreparedTransactionTest, NonAtomicCommitOfDelayedPrepared) {
+ const size_t snapshot_cache_bits = 7; // same as default
+ const size_t commit_cache_bits = 3; // 8 entries
+ for (auto split_read : {true, false}) {
+ std::vector<bool> split_options = {false};
+ if (split_read) {
+ // Also test for break before mutex
+ split_options.push_back(true);
+ }
+ for (auto split_before_mutex : split_options) {
+ UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
+ ReOpen();
+ WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
+ DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
+ // Fill up the commit cache
+ std::string init_value("value1");
+ for (int i = 0; i < 10; i++) {
+ db->Put(WriteOptions(), Slice("key1"), Slice(init_value));
+ }
+ // Prepare a transaction but do not commit it
+ Transaction* txn =
+ db->BeginTransaction(WriteOptions(), TransactionOptions());
+ ASSERT_OK(txn->SetName("xid"));
+ ASSERT_OK(txn->Put(Slice("key1"), Slice("value2")));
+ ASSERT_OK(txn->Prepare());
+ // Commit a bunch of entries to advance max evicted seq and make the
+ // prepared a delayed prepared
+ for (int i = 0; i < 10; i++) {
+ db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
+ }
+ // The snapshot should not see the delayed prepared entry
+ auto snap = db->GetSnapshot();
+
+ if (split_read) {
+ if (split_before_mutex) {
+ // split before acquiring prepare_mutex_
+ rocksdb::SyncPoint::GetInstance()->LoadDependency(
+ {{"WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:pause",
+ "AtomicCommitOfDelayedPrepared:Commit:before"},
+ {"AtomicCommitOfDelayedPrepared:Commit:after",
+ "WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:resume"}});
+ } else {
+ // split right after reading from the commit cache
+ rocksdb::SyncPoint::GetInstance()->LoadDependency(
+ {{"WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause",
+ "AtomicCommitOfDelayedPrepared:Commit:before"},
+ {"AtomicCommitOfDelayedPrepared:Commit:after",
+ "WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume"}});
+ }
+ } else { // split commit
+ // split right before removing from delayed_prepared_
+ rocksdb::SyncPoint::GetInstance()->LoadDependency(
+ {{"WritePreparedTxnDB::RemovePrepared:pause",
+ "AtomicCommitOfDelayedPrepared:Read:before"},
+ {"AtomicCommitOfDelayedPrepared:Read:after",
+ "WritePreparedTxnDB::RemovePrepared:resume"}});
+ }
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ rocksdb::port::Thread commit_thread([&]() {
+ TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Commit:before");
+ ASSERT_OK(txn->Commit());
+ if (split_before_mutex) {
+ // Do bunch of inserts to evict the commit entry from the cache. This
+ // would prevent the 2nd look into commit cache under prepare_mutex_
+ // to see the commit entry.
+ auto seq = db_impl->TEST_GetLastVisibleSequence();
+ size_t tries = 0;
+ while (wp_db->max_evicted_seq_ < seq && tries < 50) {
+ db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
+ tries++;
+ };
+ ASSERT_LT(tries, 50);
+ }
+ TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Commit:after");
+ delete txn;
+ });
+
+ rocksdb::port::Thread read_thread([&]() {
+ TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Read:before");
+ ReadOptions roptions;
+ roptions.snapshot = snap;
+ PinnableSlice value;
+ auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value);
+ ASSERT_OK(s);
+ // It should not see the commit of delayed prepared
+ ASSERT_TRUE(value == init_value);
+ TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Read:after");
+ db->ReleaseSnapshot(snap);
+ });
+
+ read_thread.join();
+ commit_thread.join();
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+ rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
+ } // for split_before_mutex
+ } // for split_read
+}
+
+// When max evicted seq advances a prepared seq, it involves two updates: i)
+// adding prepared seq to delayed_prepared_, ii) updating max_evicted_seq_.
+// ::IsInSnapshot also reads these two values in a non-atomic way. This test
+// ensures correctness if the update occurs after ::IsInSnapshot reads
+// delayed_prepared_empty_ and before it reads max_evicted_seq_.
+// Note: this test focuses on read snapshot larger than max_evicted_seq_.
+TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfDelayedPrepared) {
+ const size_t snapshot_cache_bits = 7; // same as default
+ const size_t commit_cache_bits = 3; // 8 entries
+ UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
+ ReOpen();
+ WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
+ // Fill up the commit cache
+ std::string init_value("value1");
+ for (int i = 0; i < 10; i++) {
+ db->Put(WriteOptions(), Slice("key1"), Slice(init_value));
+ }
+ // Prepare a transaction but do not commit it
+ Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
+ ASSERT_OK(txn->SetName("xid"));
+ ASSERT_OK(txn->Put(Slice("key1"), Slice("value2")));
+ ASSERT_OK(txn->Prepare());
+ // Create a gap between prepare seq and snapshot seq
+ db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
+ db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
+ // The snapshot should not see the delayed prepared entry
+ auto snap = db->GetSnapshot();
+ ASSERT_LT(txn->GetId(), snap->GetSequenceNumber());
+
+ // split right after reading delayed_prepared_empty_
+ rocksdb::SyncPoint::GetInstance()->LoadDependency(
+ {{"WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:pause",
+ "AtomicUpdateOfDelayedPrepared:before"},
+ {"AtomicUpdateOfDelayedPrepared:after",
+ "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:resume"}});
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ rocksdb::port::Thread commit_thread([&]() {
+ TEST_SYNC_POINT("AtomicUpdateOfDelayedPrepared:before");
+ // Commit a bunch of entries to advance max evicted seq and make the
+ // prepared a delayed prepared
+ size_t tries = 0;
+ while (wp_db->max_evicted_seq_ < txn->GetId() && tries < 50) {
+ db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
+ tries++;
+ };
+ ASSERT_LT(tries, 50);
+ // This is the case on which the test focuses
+ ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
+ TEST_SYNC_POINT("AtomicUpdateOfDelayedPrepared:after");
+ });
+
+ rocksdb::port::Thread read_thread([&]() {
+ ReadOptions roptions;
+ roptions.snapshot = snap;
+ PinnableSlice value;
+ auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value);
+ ASSERT_OK(s);
+ // It should not see the uncommitted value of delayed prepared
+ ASSERT_TRUE(value == init_value);
+ db->ReleaseSnapshot(snap);
+ });
+
+ read_thread.join();
+ commit_thread.join();
+ ASSERT_OK(txn->Commit());
+ delete txn;
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+ rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+// Eviction from commit cache and update of max evicted seq are two non-atomic
+// steps. Similarly the read of max_evicted_seq_ in ::IsInSnapshot and reading
+// from commit cache are two non-atomic steps. This tests if the update occurs
+// after reading max_evicted_seq_ and before reading the commit cache.
+// Note: the test focuses on snapshot larger than max_evicted_seq_
+TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfMaxEvictedSeq) {
+ const size_t snapshot_cache_bits = 7; // same as default
+ const size_t commit_cache_bits = 3; // 8 entries
+ UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
+ ReOpen();
+ WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
+ // Fill up the commit cache
+ std::string init_value("value1");
+ std::string last_value("value_final");
+ for (int i = 0; i < 10; i++) {
+ db->Put(WriteOptions(), Slice("key1"), Slice(init_value));
+ }
+ // Do an uncommitted write to prevent min_uncommitted optimization
+ Transaction* txn1 =
+ db->BeginTransaction(WriteOptions(), TransactionOptions());
+ ASSERT_OK(txn1->SetName("xid1"));
+ ASSERT_OK(txn1->Put(Slice("key0"), last_value));
+ ASSERT_OK(txn1->Prepare());
+ // Do a write with prepare to get the prepare seq
+ Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
+ ASSERT_OK(txn->SetName("xid"));
+ ASSERT_OK(txn->Put(Slice("key1"), last_value));
+ ASSERT_OK(txn->Prepare());
+ ASSERT_OK(txn->Commit());
+ // Create a gap between commit entry and snapshot seq
+ db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
+ db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
+ // The snapshot should see the last commit
+ auto snap = db->GetSnapshot();
+ ASSERT_LE(txn->GetId(), snap->GetSequenceNumber());
+
+ // split right after reading max_evicted_seq_
+ rocksdb::SyncPoint::GetInstance()->LoadDependency(
+ {{"WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:pause",
+ "NonAtomicUpdateOfMaxEvictedSeq:before"},
+ {"NonAtomicUpdateOfMaxEvictedSeq:after",
+ "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:resume"}});
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ rocksdb::port::Thread commit_thread([&]() {
+ TEST_SYNC_POINT("NonAtomicUpdateOfMaxEvictedSeq:before");
+ // Commit a bunch of entries to advance max evicted seq beyond txn->GetId()
+ size_t tries = 0;
+ while (wp_db->max_evicted_seq_ < txn->GetId() && tries < 50) {
+ db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
+ tries++;
+ };
+ ASSERT_LT(tries, 50);
+ // This is the case on which the test focuses
+ ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
+ TEST_SYNC_POINT("NonAtomicUpdateOfMaxEvictedSeq:after");
+ });
+
+ rocksdb::port::Thread read_thread([&]() {
+ ReadOptions roptions;
+ roptions.snapshot = snap;
+ PinnableSlice value;
+ auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value);
+ ASSERT_OK(s);
+ // It should see the committed value of the evicted entry
+ ASSERT_TRUE(value == last_value);
+ db->ReleaseSnapshot(snap);
+ });
+
+ read_thread.join();
+ commit_thread.join();
+ delete txn;
+ txn1->Commit();
+ delete txn1;
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+ rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+// Test when we add a prepared seq when the max_evicted_seq_ already goes beyond
+// that. The test focuses on a race condition between AddPrepared and
+// AdvanceMaxEvictedSeq functions.
+TEST_P(WritePreparedTransactionTest, AddPreparedBeforeMax) {
+ if (!options.two_write_queues) {
+ // This test is only for two write queues
+ return;
+ }
+ const size_t snapshot_cache_bits = 7; // same as default
+ // 1 entry to advance max after the 2nd commit
+ const size_t commit_cache_bits = 0;
+ UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
+ ReOpen();
+ WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
+ std::string some_value("value_some");
+ std::string uncommitted_value("value_uncommitted");
+ // Prepare two uncommitted transactions
+ Transaction* txn1 =
+ db->BeginTransaction(WriteOptions(), TransactionOptions());
+ ASSERT_OK(txn1->SetName("xid1"));
+ ASSERT_OK(txn1->Put(Slice("key1"), some_value));
+ ASSERT_OK(txn1->Prepare());
+ Transaction* txn2 =
+ db->BeginTransaction(WriteOptions(), TransactionOptions());
+ ASSERT_OK(txn2->SetName("xid2"));
+ ASSERT_OK(txn2->Put(Slice("key2"), some_value));
+ ASSERT_OK(txn2->Prepare());
+ // Start the txn here so the other thread could get its id
+ Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
+ ASSERT_OK(txn->SetName("xid"));
+ ASSERT_OK(txn->Put(Slice("key0"), uncommitted_value));
+ port::Mutex txn_mutex_;
+
+ // t1) Insert prepared entry, t2) commit other entires to advance max
+ // evicted sec and finish checking the existing prepared entires, t1)
+ // AddPrepared, t2) update max_evicted_seq_
+ rocksdb::SyncPoint::GetInstance()->LoadDependency({
+ {"AddPrepared::begin:pause", "AddPreparedBeforeMax::read_thread:start"},
+ {"AdvanceMaxEvictedSeq::update_max:pause", "AddPrepared::begin:resume"},
+ {"AddPrepared::end", "AdvanceMaxEvictedSeq::update_max:resume"},
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ rocksdb::port::Thread write_thread([&]() {
+ txn_mutex_.Lock();
+ ASSERT_OK(txn->Prepare());
+ txn_mutex_.Unlock();
+ });
+
+ rocksdb::port::Thread read_thread([&]() {
+ TEST_SYNC_POINT("AddPreparedBeforeMax::read_thread:start");
+ // Publish seq number with a commit
+ ASSERT_OK(txn1->Commit());
+ // Since the commit cache size is one the 2nd commit evict the 1st one and
+ // invokes AdcanceMaxEvictedSeq
+ ASSERT_OK(txn2->Commit());
+
+ ReadOptions roptions;
+ PinnableSlice value;
+ // The snapshot should not see the uncommitted value from write_thread
+ auto snap = db->GetSnapshot();
+ ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
+ // This is the scenario that we test for
+ txn_mutex_.Lock();
+ ASSERT_GT(wp_db->max_evicted_seq_, txn->GetId());
+ txn_mutex_.Unlock();
+ roptions.snapshot = snap;
+ auto s = db->Get(roptions, db->DefaultColumnFamily(), "key0", &value);
+ ASSERT_TRUE(s.IsNotFound());
+ db->ReleaseSnapshot(snap);
+ });
+
+ read_thread.join();
+ write_thread.join();
+ delete txn1;
+ delete txn2;
+ ASSERT_OK(txn->Commit());
+ delete txn;
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+ rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+// When an old prepared entry gets committed, there is a gap between the time
+// that it is published and when it is cleaned up from old_prepared_. This test
+// stresses such cases.
+TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) {
+ const size_t snapshot_cache_bits = 7; // same as default
+ for (const size_t commit_cache_bits : {0, 2, 3}) {
+ for (const size_t sub_batch_cnt : {1, 2, 3}) {
+ UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
+ ReOpen();
+ std::atomic<const Snapshot*> snap = {nullptr};
+ std::atomic<SequenceNumber> exp_prepare = {0};
+ // Value is synchronized via snap
+ PinnableSlice value;
+ // Take a snapshot after publish and before RemovePrepared:Start
+ auto callback = [&](void* param) {
+ SequenceNumber prep_seq = *((SequenceNumber*)param);
+ if (prep_seq == exp_prepare.load()) { // only for write_thread
+ ASSERT_EQ(nullptr, snap.load());
+ snap.store(db->GetSnapshot());
+ ReadOptions roptions;
+ roptions.snapshot = snap.load();
+ auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &value);
+ ASSERT_OK(s);
+ }
+ };
+ SyncPoint::GetInstance()->SetCallBack("RemovePrepared:Start", callback);
+ SyncPoint::GetInstance()->EnableProcessing();
+ // Thread to cause frequent evictions
+ rocksdb::port::Thread eviction_thread([&]() {
+ // Too many txns might cause commit_seq - prepare_seq in another thread
+ // to go beyond DELTA_UPPERBOUND
+ for (int i = 0; i < 25 * (1 << commit_cache_bits); i++) {
+ db->Put(WriteOptions(), Slice("key1"), Slice("value1"));
+ }
+ });
+ rocksdb::port::Thread write_thread([&]() {
+ for (int i = 0; i < 25 * (1 << commit_cache_bits); i++) {
+ Transaction* txn =
+ db->BeginTransaction(WriteOptions(), TransactionOptions());
+ ASSERT_OK(txn->SetName("xid"));
+ std::string val_str = "value" + ToString(i);
+ for (size_t b = 0; b < sub_batch_cnt; b++) {
+ ASSERT_OK(txn->Put(Slice("key"), val_str));
+ }
+ ASSERT_OK(txn->Prepare());
+ // Let an eviction to kick in
+ std::this_thread::yield();
+
+ exp_prepare.store(txn->GetId());
+ ASSERT_OK(txn->Commit());
+ delete txn;
+
+ // Read with the snapshot taken before delayed_prepared_ cleanup
+ ReadOptions roptions;
+ roptions.snapshot = snap.load();
+ ASSERT_NE(nullptr, roptions.snapshot);
+ PinnableSlice value2;
+ auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &value2);
+ ASSERT_OK(s);
+ // It should see its own write
+ ASSERT_TRUE(val_str == value2);
+ // The value read by snapshot should not change
+ ASSERT_STREQ(value2.ToString().c_str(), value.ToString().c_str());
+
+ db->ReleaseSnapshot(roptions.snapshot);
+ snap.store(nullptr);
+ }
+ });
+ write_thread.join();
+ eviction_thread.join();
+ }
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+ rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
+ }
+}
+
// Test that updating the commit map will not affect the existing snapshots
TEST_P(WritePreparedTransactionTest, AtomicCommit) {
for (bool skip_prepare : {true, false}) {