]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/utilities/transactions/write_prepared_transaction_test.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / utilities / transactions / write_prepared_transaction_test.cc
index 127f8cc8648b21d123ff1954b97b1b86712e15e7..c0f5a10682adb18debd2c9adfe0ee4946c00a4f6 100644 (file)
@@ -13,6 +13,7 @@
 
 #include <inttypes.h>
 #include <algorithm>
+#include <atomic>
 #include <functional>
 #include <string>
 #include <thread>
@@ -27,6 +28,7 @@
 #include "rocksdb/utilities/transaction_db.h"
 #include "table/mock_table.h"
 #include "util/fault_injection_test_env.h"
+#include "util/mutexlock.h"
 #include "util/random.h"
 #include "util/string_util.h"
 #include "util/sync_point.h"
@@ -322,20 +324,13 @@ class WritePreparedTxnDBMock : public WritePreparedTxnDB {
  public:
   WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt)
       : WritePreparedTxnDB(db_impl, opt) {}
-  WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt,
-                         size_t snapshot_cache_size)
-      : WritePreparedTxnDB(db_impl, opt, snapshot_cache_size) {}
-  WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt,
-                         size_t snapshot_cache_size, size_t commit_cache_size)
-      : WritePreparedTxnDB(db_impl, opt, snapshot_cache_size,
-                           commit_cache_size) {}
   void SetDBSnapshots(const std::vector<SequenceNumber>& snapshots) {
     snapshots_ = snapshots;
   }
   void TakeSnapshot(SequenceNumber seq) { snapshots_.push_back(seq); }
 
  protected:
-  virtual const std::vector<SequenceNumber> GetSnapshotListFromDB(
+  const std::vector<SequenceNumber> GetSnapshotListFromDB(
       SequenceNumber /* unused */) override {
     return snapshots_;
   }
@@ -351,6 +346,14 @@ class WritePreparedTransactionTestBase : public TransactionTestBase {
       : TransactionTestBase(use_stackable_db, two_write_queue, write_policy){};
 
  protected:
+  void UpdateTransactionDBOptions(size_t snapshot_cache_bits,
+                                  size_t commit_cache_bits) {
+    txn_db_options.wp_snapshot_cache_bits = snapshot_cache_bits;
+    txn_db_options.wp_commit_cache_bits = commit_cache_bits;
+  }
+  void UpdateTransactionDBOptions(size_t snapshot_cache_bits) {
+    txn_db_options.wp_snapshot_cache_bits = snapshot_cache_bits;
+  }
   // If expect_update is set, check if it actually updated old_commit_map_. If
   // it did not and yet suggested not to check the next snapshot, do the
   // opposite to check if it was not a bad suggestion.
@@ -731,13 +734,86 @@ TEST_P(WritePreparedTransactionTest, MaybeUpdateOldCommitMap) {
   MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
 }
 
+// Reproduce the bug with two snapshots with the same seuqence number and test
+// that the release of the first snapshot will not affect the reads by the other
+// snapshot
+TEST_P(WritePreparedTransactionTest, DoubleSnapshot) {
+  TransactionOptions txn_options;
+  Status s;
+
+  // Insert initial value
+  ASSERT_OK(db->Put(WriteOptions(), "key", "value1"));
+
+  WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
+  Transaction* txn =
+      wp_db->BeginTransaction(WriteOptions(), txn_options, nullptr);
+  ASSERT_OK(txn->SetName("txn"));
+  ASSERT_OK(txn->Put("key", "value2"));
+  ASSERT_OK(txn->Prepare());
+  // Three snapshots with the same seq number
+  const Snapshot* snapshot0 = wp_db->GetSnapshot();
+  const Snapshot* snapshot1 = wp_db->GetSnapshot();
+  const Snapshot* snapshot2 = wp_db->GetSnapshot();
+  ASSERT_OK(txn->Commit());
+  SequenceNumber cache_size = wp_db->COMMIT_CACHE_SIZE;
+  SequenceNumber overlap_seq = txn->GetId() + cache_size;
+  delete txn;
+
+  // 4th snapshot with a larger seq
+  const Snapshot* snapshot3 = wp_db->GetSnapshot();
+  // Cause an eviction to advance max evicted seq number
+  // This also fetches the 4 snapshots from db since their seq is lower than the
+  // new max
+  wp_db->AddCommitted(overlap_seq, overlap_seq);
+
+  ReadOptions ropt;
+  // It should see the value before commit
+  ropt.snapshot = snapshot2;
+  PinnableSlice pinnable_val;
+  s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val);
+  ASSERT_OK(s);
+  ASSERT_TRUE(pinnable_val == "value1");
+  pinnable_val.Reset();
+
+  wp_db->ReleaseSnapshot(snapshot1);
+
+  // It should still see the value before commit
+  s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val);
+  ASSERT_OK(s);
+  ASSERT_TRUE(pinnable_val == "value1");
+  pinnable_val.Reset();
+
+  // Cause an eviction to advance max evicted seq number and trigger updating
+  // the snapshot list
+  overlap_seq += cache_size;
+  wp_db->AddCommitted(overlap_seq, overlap_seq);
+
+  // It should still see the value before commit
+  s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val);
+  ASSERT_OK(s);
+  ASSERT_TRUE(pinnable_val == "value1");
+  pinnable_val.Reset();
+
+  wp_db->ReleaseSnapshot(snapshot0);
+  wp_db->ReleaseSnapshot(snapshot2);
+  wp_db->ReleaseSnapshot(snapshot3);
+}
+
+size_t UniqueCnt(std::vector<SequenceNumber> vec) {
+  std::set<SequenceNumber> aset;
+  for (auto i : vec) {
+    aset.insert(i);
+  }
+  return aset.size();
+}
 // Test that the entries in old_commit_map_ get garbage collected properly
 TEST_P(WritePreparedTransactionTest, OldCommitMapGC) {
   const size_t snapshot_cache_bits = 0;
   const size_t commit_cache_bits = 0;
   DBImpl* mock_db = new DBImpl(options, dbname);
-  std::unique_ptr<WritePreparedTxnDBMock> wp_db(new WritePreparedTxnDBMock(
-      mock_db, txn_db_options, snapshot_cache_bits, commit_cache_bits));
+  UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
+  std::unique_ptr<WritePreparedTxnDBMock> wp_db(
+      new WritePreparedTxnDBMock(mock_db, txn_db_options));
 
   SequenceNumber seq = 0;
   // Take the first snapshot that overlaps with two txn
@@ -779,9 +855,9 @@ TEST_P(WritePreparedTransactionTest, OldCommitMapGC) {
     ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
     ReadLock rl(&wp_db->old_commit_map_mutex_);
     ASSERT_EQ(3, wp_db->old_commit_map_.size());
-    ASSERT_EQ(2, wp_db->old_commit_map_[snap_seq1].size());
-    ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq2].size());
-    ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq3].size());
+    ASSERT_EQ(2, UniqueCnt(wp_db->old_commit_map_[snap_seq1]));
+    ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq2]));
+    ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3]));
   }
 
   // Verify that the 2nd snapshot is cleaned up after the release
@@ -790,8 +866,8 @@ TEST_P(WritePreparedTransactionTest, OldCommitMapGC) {
     ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
     ReadLock rl(&wp_db->old_commit_map_mutex_);
     ASSERT_EQ(2, wp_db->old_commit_map_.size());
-    ASSERT_EQ(2, wp_db->old_commit_map_[snap_seq1].size());
-    ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq3].size());
+    ASSERT_EQ(2, UniqueCnt(wp_db->old_commit_map_[snap_seq1]));
+    ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3]));
   }
 
   // Verify that the 1st snapshot is cleaned up after the release
@@ -800,7 +876,7 @@ TEST_P(WritePreparedTransactionTest, OldCommitMapGC) {
     ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
     ReadLock rl(&wp_db->old_commit_map_mutex_);
     ASSERT_EQ(1, wp_db->old_commit_map_.size());
-    ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq3].size());
+    ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3]));
   }
 
   // Verify that the 3rd snapshot is cleaned up after the release
@@ -816,12 +892,14 @@ TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshotsTest) {
   std::vector<SequenceNumber> snapshots = {100l, 200l, 300l, 400l, 500l,
                                            600l, 700l, 800l, 900l};
   const size_t snapshot_cache_bits = 2;
+  const uint64_t cache_size = 1ul << snapshot_cache_bits;
   // Safety check to express the intended size in the test. Can be adjusted if
   // the snapshots lists changed.
   assert((1ul << snapshot_cache_bits) * 2 + 1 == snapshots.size());
   DBImpl* mock_db = new DBImpl(options, dbname);
+  UpdateTransactionDBOptions(snapshot_cache_bits);
   std::unique_ptr<WritePreparedTxnDBMock> wp_db(
-      new WritePreparedTxnDBMock(mock_db, txn_db_options, snapshot_cache_bits));
+      new WritePreparedTxnDBMock(mock_db, txn_db_options));
   SequenceNumber version = 1000l;
   ASSERT_EQ(0, wp_db->snapshots_total_);
   wp_db->UpdateSnapshots(snapshots, version);
@@ -843,6 +921,57 @@ TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshotsTest) {
                          commit_entry.prep_seq <= snapshots.back();
     ASSERT_EQ(expect_update, !wp_db->old_commit_map_empty_);
   }
+
+  // Test that search will include multiple snapshot from snapshot cache
+  {
+    // exclude first and last item in the cache
+    CommitEntry commit_entry = {snapshots.front() + 1,
+                                snapshots[cache_size - 1] - 1};
+    wp_db->old_commit_map_empty_ = true;  // reset
+    wp_db->old_commit_map_.clear();
+    wp_db->CheckAgainstSnapshots(commit_entry);
+    ASSERT_EQ(wp_db->old_commit_map_.size(), cache_size - 2);
+  }
+
+  // Test that search will include multiple snapshot from old snapshots
+  {
+    // include two in the middle
+    CommitEntry commit_entry = {snapshots[cache_size] + 1,
+                                snapshots[cache_size + 2] + 1};
+    wp_db->old_commit_map_empty_ = true;  // reset
+    wp_db->old_commit_map_.clear();
+    wp_db->CheckAgainstSnapshots(commit_entry);
+    ASSERT_EQ(wp_db->old_commit_map_.size(), 2);
+  }
+
+  // Test that search will include both snapshot cache and old snapshots
+  // Case 1: includes all in snapshot cache
+  {
+    CommitEntry commit_entry = {snapshots.front() - 1, snapshots.back() + 1};
+    wp_db->old_commit_map_empty_ = true;  // reset
+    wp_db->old_commit_map_.clear();
+    wp_db->CheckAgainstSnapshots(commit_entry);
+    ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size());
+  }
+
+  // Case 2: includes all snapshot caches except the smallest
+  {
+    CommitEntry commit_entry = {snapshots.front() + 1, snapshots.back() + 1};
+    wp_db->old_commit_map_empty_ = true;  // reset
+    wp_db->old_commit_map_.clear();
+    wp_db->CheckAgainstSnapshots(commit_entry);
+    ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size() - 1);
+  }
+
+  // Case 3: includes only the largest of snapshot cache
+  {
+    CommitEntry commit_entry = {snapshots[cache_size - 1] - 1,
+                                snapshots.back() + 1};
+    wp_db->old_commit_map_empty_ = true;  // reset
+    wp_db->old_commit_map_.clear();
+    wp_db->CheckAgainstSnapshots(commit_entry);
+    ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size() - cache_size + 1);
+  }
 }
 
 // This test is too slow for travis
@@ -864,8 +993,9 @@ TEST_P(SnapshotConcurrentAccessTest, SnapshotConcurrentAccessTest) {
   // Choose the cache size so that the new snapshot list could replace all the
   // existing items in the cache and also have some overflow.
   DBImpl* mock_db = new DBImpl(options, dbname);
+  UpdateTransactionDBOptions(snapshot_cache_bits);
   std::unique_ptr<WritePreparedTxnDBMock> wp_db(
-      new WritePreparedTxnDBMock(mock_db, txn_db_options, snapshot_cache_bits));
+      new WritePreparedTxnDBMock(mock_db, txn_db_options));
   const size_t extra = 2;
   size_t loop_id = 0;
   // Add up to extra items that do not fit into the cache
@@ -987,10 +1117,176 @@ TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqBasicTest) {
   }
 }
 
+// A new snapshot should always be always larger than max_evicted_seq_
+// Otherwise the snapshot does not go through AdvanceMaxEvictedSeq
+TEST_P(WritePreparedTransactionTest, NewSnapshotLargerThanMax) {
+  WriteOptions woptions;
+  TransactionOptions txn_options;
+  WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
+  Transaction* txn0 = db->BeginTransaction(woptions, txn_options);
+  ASSERT_OK(txn0->Put(Slice("key"), Slice("value")));
+  ASSERT_OK(txn0->Commit());
+  const SequenceNumber seq = txn0->GetId();  // is also prepare seq
+  delete txn0;
+  std::vector<Transaction*> txns;
+  // Inc seq without committing anything
+  for (int i = 0; i < 10; i++) {
+    Transaction* txn = db->BeginTransaction(woptions, txn_options);
+    ASSERT_OK(txn->SetName("xid" + std::to_string(i)));
+    ASSERT_OK(txn->Put(Slice("key" + std::to_string(i)), Slice("value")));
+    ASSERT_OK(txn->Prepare());
+    txns.push_back(txn);
+  }
+
+  // The new commit is seq + 10
+  ASSERT_OK(db->Put(woptions, "key", "value"));
+  auto snap = wp_db->GetSnapshot();
+  const SequenceNumber last_seq = snap->GetSequenceNumber();
+  wp_db->ReleaseSnapshot(snap);
+  ASSERT_LT(seq, last_seq);
+  // Otherwise our test is not effective
+  ASSERT_LT(last_seq - seq, wp_db->INC_STEP_FOR_MAX_EVICTED);
+
+  // Evict seq out of commit cache
+  const SequenceNumber overwrite_seq = seq + wp_db->COMMIT_CACHE_SIZE;
+  // Check that the next write could make max go beyond last
+  auto last_max = wp_db->max_evicted_seq_.load();
+  wp_db->AddCommitted(overwrite_seq, overwrite_seq);
+  // Check that eviction has advanced the max
+  ASSERT_LT(last_max, wp_db->max_evicted_seq_.load());
+  // Check that the new max has not advanced the last seq
+  ASSERT_LT(wp_db->max_evicted_seq_.load(), last_seq);
+  for (auto txn : txns) {
+    txn->Rollback();
+    delete txn;
+  }
+}
+
+// A new snapshot should always be always larger than max_evicted_seq_
+// In very rare cases max could be below last published seq. Test that
+// taking snapshot will wait for max to catch up.
+TEST_P(WritePreparedTransactionTest, MaxCatchupWithNewSnapshot) {
+  const size_t snapshot_cache_bits = 7;  // same as default
+  const size_t commit_cache_bits = 0;    // only 1 entry => frequent eviction
+  UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
+  ReOpen();
+  WriteOptions woptions;
+  WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
+
+  const int writes = 50;
+  const int batch_cnt = 4;
+  rocksdb::port::Thread t1([&]() {
+    for (int i = 0; i < writes; i++) {
+      WriteBatch batch;
+      // For duplicate keys cause 4 commit entries, each evicting an entry that
+      // is not published yet, thus causing max ecited seq go higher than last
+      // published.
+      for (int b = 0; b < batch_cnt; b++) {
+        batch.Put("foo", "foo");
+      }
+      db->Write(woptions, &batch);
+    }
+  });
+
+  rocksdb::port::Thread t2([&]() {
+    while (wp_db->max_evicted_seq_ == 0) {  // wait for insert thread
+      std::this_thread::yield();
+    }
+    for (int i = 0; i < 10; i++) {
+      auto snap = db->GetSnapshot();
+      if (snap->GetSequenceNumber() != 0) {
+        ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
+      }  // seq 0 is ok to be less than max since nothing is visible to it
+      db->ReleaseSnapshot(snap);
+    }
+  });
+
+  t1.join();
+  t2.join();
+
+  // Make sure that the test has worked and seq number has advanced as we
+  // thought
+  auto snap = db->GetSnapshot();
+  ASSERT_GT(snap->GetSequenceNumber(), batch_cnt * writes - 1);
+  db->ReleaseSnapshot(snap);
+}
+
+// Check that old_commit_map_ cleanup works correctly if the snapshot equals
+// max_evicted_seq_.
+TEST_P(WritePreparedTransactionTest, CleanupSnapshotEqualToMax) {
+  const size_t snapshot_cache_bits = 7;  // same as default
+  const size_t commit_cache_bits = 0;    // only 1 entry => frequent eviction
+  UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
+  ReOpen();
+  WriteOptions woptions;
+  WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
+  // Insert something to increase seq
+  ASSERT_OK(db->Put(woptions, "key", "value"));
+  auto snap = db->GetSnapshot();
+  auto snap_seq = snap->GetSequenceNumber();
+  // Another insert should trigger eviction + load snapshot from db
+  ASSERT_OK(db->Put(woptions, "key", "value"));
+  // This is the scenario that we check agaisnt
+  ASSERT_EQ(snap_seq, wp_db->max_evicted_seq_);
+  // old_commit_map_ now has some data that needs gc
+  ASSERT_EQ(1, wp_db->snapshots_total_);
+  ASSERT_EQ(1, wp_db->old_commit_map_.size());
+
+  db->ReleaseSnapshot(snap);
+
+  // Another insert should trigger eviction + load snapshot from db
+  ASSERT_OK(db->Put(woptions, "key", "value"));
+
+  // the snapshot and related metadata must be properly garbage collected
+  ASSERT_EQ(0, wp_db->snapshots_total_);
+  ASSERT_TRUE(wp_db->snapshots_all_.empty());
+  ASSERT_EQ(0, wp_db->old_commit_map_.size());
+}
+
+TEST_P(WritePreparedTransactionTest, AdvanceSeqByOne) {
+  auto snap = db->GetSnapshot();
+  auto seq1 = snap->GetSequenceNumber();
+  db->ReleaseSnapshot(snap);
+
+  WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
+  wp_db->AdvanceSeqByOne();
+
+  snap = db->GetSnapshot();
+  auto seq2 = snap->GetSequenceNumber();
+  db->ReleaseSnapshot(snap);
+
+  ASSERT_LT(seq1, seq2);
+}
+
+// Test that the txn Initilize calls the overridden functions
+TEST_P(WritePreparedTransactionTest, TxnInitialize) {
+  TransactionOptions txn_options;
+  WriteOptions write_options;
+  ASSERT_OK(db->Put(write_options, "key", "value"));
+  Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
+  ASSERT_OK(txn0->SetName("xid"));
+  ASSERT_OK(txn0->Put(Slice("key"), Slice("value1")));
+  ASSERT_OK(txn0->Prepare());
+
+  // SetSnapshot is overridden to update min_uncommitted_
+  txn_options.set_snapshot = true;
+  Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
+  auto snap = txn1->GetSnapshot();
+  auto snap_impl = reinterpret_cast<const SnapshotImpl*>(snap);
+  // If ::Initialize calls the overriden SetSnapshot, min_uncommitted_ must be
+  // udpated
+  ASSERT_GT(snap_impl->min_uncommitted_, kMinUnCommittedSeq);
+
+  txn0->Rollback();
+  txn1->Rollback();
+  delete txn0;
+  delete txn1;
+}
+
 // This tests that transactions with duplicate keys perform correctly after max
 // is advancing their prepared sequence numbers. This will not be the case if
 // for example the txn does not add the prepared seq for the second sub-batch to
-// the PrepareHeap structure.
+// the PreparedHeap structure.
 TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqWithDuplicatesTest) {
   WriteOptions write_options;
   TransactionOptions txn_options;
@@ -1002,7 +1298,7 @@ TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqWithDuplicatesTest) {
 
   WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
   // Ensure that all the prepared sequence numbers will be removed from the
-  // PrepareHeap.
+  // PreparedHeap.
   SequenceNumber new_max = wp_db->COMMIT_CACHE_SIZE;
   wp_db->AdvanceMaxEvictedSeq(0, new_max);
 
@@ -1150,12 +1446,12 @@ TEST_P(SeqAdvanceConcurrentTest, SeqAdvanceConcurrentTest) {
     assert(db != nullptr);
     db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
     seq = db_impl->TEST_GetLastVisibleSequence();
-    ASSERT_EQ(exp_seq, seq);
+    ASSERT_LE(exp_seq, seq);
 
     // Check if flush preserves the last sequence number
     db_impl->Flush(fopt);
     seq = db_impl->GetLatestSequenceNumber();
-    ASSERT_EQ(exp_seq, seq);
+    ASSERT_LE(exp_seq, seq);
 
     // Check if recovery after flush preserves the last sequence number
     db_impl->FlushWAL(true);
@@ -1163,7 +1459,7 @@ TEST_P(SeqAdvanceConcurrentTest, SeqAdvanceConcurrentTest) {
     assert(db != nullptr);
     db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
     seq = db_impl->GetLatestSequenceNumber();
-    ASSERT_EQ(exp_seq, seq);
+    ASSERT_LE(exp_seq, seq);
   }
 }
 
@@ -1308,17 +1604,138 @@ TEST_P(WritePreparedTransactionTest, BasicRecoveryTest) {
 }
 
 // After recovery the commit map is empty while the max is set. The code would
-// go through a different path which requires a separate test.
+// go through a different path which requires a separate test. Test that the
+// committed data before the restart is visible to all snapshots.
 TEST_P(WritePreparedTransactionTest, IsInSnapshotEmptyMapTest) {
+  for (bool end_with_prepare : {false, true}) {
+    ReOpen();
+    WriteOptions woptions;
+    ASSERT_OK(db->Put(woptions, "key", "value"));
+    ASSERT_OK(db->Put(woptions, "key", "value"));
+    ASSERT_OK(db->Put(woptions, "key", "value"));
+    SequenceNumber prepare_seq = kMaxSequenceNumber;
+    if (end_with_prepare) {
+      TransactionOptions txn_options;
+      Transaction* txn = db->BeginTransaction(woptions, txn_options);
+      ASSERT_OK(txn->SetName("xid0"));
+      ASSERT_OK(txn->Prepare());
+      prepare_seq = txn->GetId();
+      delete txn;
+    }
+    dynamic_cast<WritePreparedTxnDB*>(db)->TEST_Crash();
+    auto db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
+    db_impl->FlushWAL(true);
+    ReOpenNoDelete();
+    WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
+    assert(wp_db != nullptr);
+    ASSERT_GT(wp_db->max_evicted_seq_, 0);  // max after recovery
+    // Take a snapshot right after recovery
+    const Snapshot* snap = db->GetSnapshot();
+    auto snap_seq = snap->GetSequenceNumber();
+    ASSERT_GT(snap_seq, 0);
+
+    for (SequenceNumber seq = 0;
+         seq <= wp_db->max_evicted_seq_ && seq != prepare_seq; seq++) {
+      ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq));
+    }
+    if (end_with_prepare) {
+      ASSERT_FALSE(wp_db->IsInSnapshot(prepare_seq, snap_seq));
+    }
+    // trivial check
+    ASSERT_FALSE(wp_db->IsInSnapshot(snap_seq + 1, snap_seq));
+
+    db->ReleaseSnapshot(snap);
+
+    ASSERT_OK(db->Put(woptions, "key", "value"));
+    // Take a snapshot after some writes
+    snap = db->GetSnapshot();
+    snap_seq = snap->GetSequenceNumber();
+    for (SequenceNumber seq = 0;
+         seq <= wp_db->max_evicted_seq_ && seq != prepare_seq; seq++) {
+      ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq));
+    }
+    if (end_with_prepare) {
+      ASSERT_FALSE(wp_db->IsInSnapshot(prepare_seq, snap_seq));
+    }
+    // trivial check
+    ASSERT_FALSE(wp_db->IsInSnapshot(snap_seq + 1, snap_seq));
+
+    db->ReleaseSnapshot(snap);
+  }
+}
+
+// Shows the contract of IsInSnapshot when called on invalid/released snapshots
+TEST_P(WritePreparedTransactionTest, IsInSnapshotReleased) {
   WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
-  wp_db->max_evicted_seq_ = 100;
-  ASSERT_FALSE(wp_db->IsInSnapshot(50, 40));
-  ASSERT_TRUE(wp_db->IsInSnapshot(50, 50));
-  ASSERT_TRUE(wp_db->IsInSnapshot(50, 100));
-  ASSERT_TRUE(wp_db->IsInSnapshot(50, 150));
-  ASSERT_FALSE(wp_db->IsInSnapshot(100, 80));
-  ASSERT_TRUE(wp_db->IsInSnapshot(100, 100));
-  ASSERT_TRUE(wp_db->IsInSnapshot(100, 150));
+  WriteOptions woptions;
+  ASSERT_OK(db->Put(woptions, "key", "value"));
+  // snap seq = 1
+  const Snapshot* snap1 = db->GetSnapshot();
+  ASSERT_OK(db->Put(woptions, "key", "value"));
+  ASSERT_OK(db->Put(woptions, "key", "value"));
+  // snap seq = 3
+  const Snapshot* snap2 = db->GetSnapshot();
+  const SequenceNumber seq = 1;
+  // Evict seq out of commit cache
+  size_t overwrite_seq = wp_db->COMMIT_CACHE_SIZE + seq;
+  wp_db->AddCommitted(overwrite_seq, overwrite_seq);
+  SequenceNumber snap_seq;
+  uint64_t min_uncommitted = kMinUnCommittedSeq;
+  bool released;
+
+  released = false;
+  snap_seq = snap1->GetSequenceNumber();
+  ASSERT_LE(seq, snap_seq);
+  // Valid snapshot lower than max
+  ASSERT_LE(snap_seq, wp_db->max_evicted_seq_);
+  ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
+  ASSERT_FALSE(released);
+
+  released = false;
+  snap_seq = snap1->GetSequenceNumber();
+  // Invaid snapshot lower than max
+  ASSERT_LE(snap_seq + 1, wp_db->max_evicted_seq_);
+  ASSERT_TRUE(
+      wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released));
+  ASSERT_TRUE(released);
+
+  db->ReleaseSnapshot(snap1);
+
+  released = false;
+  // Released snapshot lower than max
+  ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
+  // The release does not take affect until the next max advance
+  ASSERT_FALSE(released);
+
+  released = false;
+  // Invaid snapshot lower than max
+  ASSERT_TRUE(
+      wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released));
+  ASSERT_TRUE(released);
+
+  // This make the snapshot release to reflect in txn db structures
+  wp_db->AdvanceMaxEvictedSeq(wp_db->max_evicted_seq_,
+                              wp_db->max_evicted_seq_ + 1);
+
+  released = false;
+  // Released snapshot lower than max
+  ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
+  ASSERT_TRUE(released);
+
+  released = false;
+  // Invaid snapshot lower than max
+  ASSERT_TRUE(
+      wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released));
+  ASSERT_TRUE(released);
+
+  snap_seq = snap2->GetSequenceNumber();
+
+  released = false;
+  // Unreleased snapshot lower than max
+  ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
+  ASSERT_FALSE(released);
+
+  db->ReleaseSnapshot(snap2);
 }
 
 // Test WritePreparedTxnDB's IsInSnapshot against different ordering of
@@ -1364,8 +1781,9 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
       // The set of commit seq numbers to be excluded from IsInSnapshot queries
       std::set<uint64_t> commit_seqs;
       DBImpl* mock_db = new DBImpl(options, dbname);
-      std::unique_ptr<WritePreparedTxnDBMock> wp_db(new WritePreparedTxnDBMock(
-          mock_db, txn_db_options, snapshot_cache_bits, commit_cache_bits));
+      UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
+      std::unique_ptr<WritePreparedTxnDBMock> wp_db(
+          new WritePreparedTxnDBMock(mock_db, txn_db_options));
       // We continue until max advances a bit beyond the snapshot.
       while (!snapshot || wp_db->max_evicted_seq_ < snapshot + 100) {
         // do prepare for a transaction
@@ -1785,6 +2203,259 @@ TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) {
   db->ReleaseSnapshot(snapshot2);
 }
 
+TEST_P(WritePreparedTransactionTest, SmallestUncommittedOptimization) {
+  const size_t snapshot_cache_bits = 7;  // same as default
+  const size_t commit_cache_bits = 0;    // disable commit cache
+  for (bool has_recent_prepare : {true, false}) {
+    UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
+    ReOpen();
+
+    ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
+    auto* transaction =
+        db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
+    ASSERT_OK(transaction->SetName("txn"));
+    ASSERT_OK(transaction->Delete("key1"));
+    ASSERT_OK(transaction->Prepare());
+    // snapshot1 should get min_uncommitted from prepared_txns_ heap.
+    auto snapshot1 = db->GetSnapshot();
+    ASSERT_EQ(transaction->GetId(),
+              ((SnapshotImpl*)snapshot1)->min_uncommitted_);
+    // Add a commit to advance max_evicted_seq and move the prepared transaction
+    // into delayed_prepared_ set.
+    ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
+    Transaction* txn2 = nullptr;
+    if (has_recent_prepare) {
+      txn2 =
+          db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
+      ASSERT_OK(txn2->SetName("txn2"));
+      ASSERT_OK(txn2->Put("key3", "value3"));
+      ASSERT_OK(txn2->Prepare());
+    }
+    // snapshot2 should get min_uncommitted from delayed_prepared_ set.
+    auto snapshot2 = db->GetSnapshot();
+    ASSERT_EQ(transaction->GetId(),
+              ((SnapshotImpl*)snapshot1)->min_uncommitted_);
+    ASSERT_OK(transaction->Commit());
+    delete transaction;
+    if (has_recent_prepare) {
+      ASSERT_OK(txn2->Commit());
+      delete txn2;
+    }
+    VerifyKeys({{"key1", "NOT_FOUND"}});
+    VerifyKeys({{"key1", "value1"}}, snapshot1);
+    VerifyKeys({{"key1", "value1"}}, snapshot2);
+    db->ReleaseSnapshot(snapshot1);
+    db->ReleaseSnapshot(snapshot2);
+  }
+}
+
+// Insert two values, v1 and v2, for a key. Between prepare and commit of v2
+// take two snapshots, s1 and s2. Release s1 during compaction.
+// Test to make sure compaction doesn't get confused and think s1 can see both
+// values, and thus compact out the older value by mistake.
+TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction) {
+  const size_t snapshot_cache_bits = 7;  // same as default
+  const size_t commit_cache_bits = 0;    // minimum commit cache
+  UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
+  ReOpen();
+
+  ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_1"));
+  auto* transaction =
+      db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
+  ASSERT_OK(transaction->SetName("txn"));
+  ASSERT_OK(transaction->Put("key1", "value1_2"));
+  ASSERT_OK(transaction->Prepare());
+  auto snapshot1 = db->GetSnapshot();
+  // Increment sequence number.
+  ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
+  auto snapshot2 = db->GetSnapshot();
+  ASSERT_OK(transaction->Commit());
+  delete transaction;
+  VerifyKeys({{"key1", "value1_2"}});
+  VerifyKeys({{"key1", "value1_1"}}, snapshot1);
+  VerifyKeys({{"key1", "value1_1"}}, snapshot2);
+  // Add a flush to avoid compaction to fallback to trivial move.
+
+  auto callback = [&](void*) {
+    // Release snapshot1 after CompactionIterator init.
+    // CompactionIterator need to figure out the earliest snapshot
+    // that can see key1:value1_2 is kMaxSequenceNumber, not
+    // snapshot1 or snapshot2.
+    db->ReleaseSnapshot(snapshot1);
+    // Add some keys to advance max_evicted_seq.
+    ASSERT_OK(db->Put(WriteOptions(), "key3", "value3"));
+    ASSERT_OK(db->Put(WriteOptions(), "key4", "value4"));
+  };
+  SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
+                                        callback);
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  ASSERT_OK(db->Flush(FlushOptions()));
+  VerifyKeys({{"key1", "value1_2"}});
+  VerifyKeys({{"key1", "value1_1"}}, snapshot2);
+  db->ReleaseSnapshot(snapshot2);
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+// Insert two values, v1 and v2, for a key. Take two snapshots, s1 and s2,
+// after committing v2. Release s1 during compaction, right after compaction
+// processes v2 and before processes v1. Test to make sure compaction doesn't
+// get confused and believe v1 and v2 are visible to different snapshot
+// (v1 by s2, v2 by s1) and refuse to compact out v1.
+TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction2) {
+  const size_t snapshot_cache_bits = 7;  // same as default
+  const size_t commit_cache_bits = 0;    // minimum commit cache
+  UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
+  ReOpen();
+
+  ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
+  ASSERT_OK(db->Put(WriteOptions(), "key1", "value2"));
+  SequenceNumber v2_seq = db->GetLatestSequenceNumber();
+  auto* s1 = db->GetSnapshot();
+  // Advance sequence number.
+  ASSERT_OK(db->Put(WriteOptions(), "key2", "dummy"));
+  auto* s2 = db->GetSnapshot();
+
+  int count_value = 0;
+  auto callback = [&](void* arg) {
+    auto* ikey = reinterpret_cast<ParsedInternalKey*>(arg);
+    if (ikey->user_key == "key1") {
+      count_value++;
+      if (count_value == 2) {
+        // Processing v1.
+        db->ReleaseSnapshot(s1);
+        // Add some keys to advance max_evicted_seq and update
+        // old_commit_map.
+        ASSERT_OK(db->Put(WriteOptions(), "key3", "dummy"));
+        ASSERT_OK(db->Put(WriteOptions(), "key4", "dummy"));
+      }
+    }
+  };
+  SyncPoint::GetInstance()->SetCallBack("CompactionIterator:ProcessKV",
+                                        callback);
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  ASSERT_OK(db->Flush(FlushOptions()));
+  // value1 should be compact out.
+  VerifyInternalKeys({{"key1", "value2", v2_seq, kTypeValue}});
+
+  // cleanup
+  db->ReleaseSnapshot(s2);
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+// Insert two values, v1 and v2, for a key. Insert another dummy key
+// so to evict the commit cache for v2, while v1 is still in commit cache.
+// Take two snapshots, s1 and s2. Release s1 during compaction.
+// Since commit cache for v2 is evicted, and old_commit_map don't have
+// s1 (it is released),
+// TODO(myabandeh): how can we be sure that the v2's commit info is evicted
+// (and not v1's)? Instead of putting a dummy, we can directly call
+// AddCommitted(v2_seq + cache_size, ...) to evict v2's entry from commit cache.
+TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction3) {
+  const size_t snapshot_cache_bits = 7;  // same as default
+  const size_t commit_cache_bits = 1;    // commit cache size = 2
+  UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
+  ReOpen();
+
+  // Add a dummy key to evict v2 commit cache, but keep v1 commit cache.
+  // It also advance max_evicted_seq and can trigger old_commit_map cleanup.
+  auto add_dummy = [&]() {
+    auto* txn_dummy =
+        db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
+    ASSERT_OK(txn_dummy->SetName("txn_dummy"));
+    ASSERT_OK(txn_dummy->Put("dummy", "dummy"));
+    ASSERT_OK(txn_dummy->Prepare());
+    ASSERT_OK(txn_dummy->Commit());
+    delete txn_dummy;
+  };
+
+  ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
+  auto* txn =
+      db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
+  ASSERT_OK(txn->SetName("txn"));
+  ASSERT_OK(txn->Put("key1", "value2"));
+  ASSERT_OK(txn->Prepare());
+  // TODO(myabandeh): replace it with GetId()?
+  auto v2_seq = db->GetLatestSequenceNumber();
+  ASSERT_OK(txn->Commit());
+  delete txn;
+  auto* s1 = db->GetSnapshot();
+  // Dummy key to advance sequence number.
+  add_dummy();
+  auto* s2 = db->GetSnapshot();
+
+  auto callback = [&](void*) {
+    db->ReleaseSnapshot(s1);
+    // Add some dummy entries to trigger s1 being cleanup from old_commit_map.
+    add_dummy();
+    add_dummy();
+  };
+  SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
+                                        callback);
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  ASSERT_OK(db->Flush(FlushOptions()));
+  // value1 should be compact out.
+  VerifyInternalKeys({{"key1", "value2", v2_seq, kTypeValue}});
+
+  db->ReleaseSnapshot(s2);
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotDuringCompaction) {
+  const size_t snapshot_cache_bits = 7;  // same as default
+  const size_t commit_cache_bits = 0;    // minimum commit cache
+  UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
+  ReOpen();
+
+  ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
+  auto* transaction =
+      db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
+  ASSERT_OK(transaction->SetName("txn"));
+  ASSERT_OK(transaction->Delete("key1"));
+  ASSERT_OK(transaction->Prepare());
+  SequenceNumber del_seq = db->GetLatestSequenceNumber();
+  auto snapshot1 = db->GetSnapshot();
+  // Increment sequence number.
+  ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
+  auto snapshot2 = db->GetSnapshot();
+  ASSERT_OK(transaction->Commit());
+  delete transaction;
+  VerifyKeys({{"key1", "NOT_FOUND"}});
+  VerifyKeys({{"key1", "value1"}}, snapshot1);
+  VerifyKeys({{"key1", "value1"}}, snapshot2);
+  ASSERT_OK(db->Flush(FlushOptions()));
+
+  auto callback = [&](void* compaction) {
+    // Release snapshot1 after CompactionIterator init.
+    // CompactionIterator need to double check and find out snapshot2 is now
+    // the earliest existing snapshot.
+    if (compaction != nullptr) {
+      db->ReleaseSnapshot(snapshot1);
+      // Add some keys to advance max_evicted_seq.
+      ASSERT_OK(db->Put(WriteOptions(), "key3", "value3"));
+      ASSERT_OK(db->Put(WriteOptions(), "key4", "value4"));
+    }
+  };
+  SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
+                                        callback);
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  // Dummy keys to avoid compaction trivially move files and get around actual
+  // compaction logic.
+  ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
+  ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
+  ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+  // Only verify for key1. Both the put and delete for the key should be kept.
+  // Since the delete tombstone is not visible to snapshot2, we need to keep
+  // at least one version of the key, for write-conflict check.
+  VerifyInternalKeys({{"key1", "", del_seq, kTypeDeletion},
+                      {"key1", "value1", 0, kTypeValue}});
+  db->ReleaseSnapshot(snapshot2);
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
 // A more complex test to verify compaction/flush should keep keys visible
 // to snapshots.
 TEST_P(WritePreparedTransactionTest,
@@ -1916,6 +2587,34 @@ TEST_P(WritePreparedTransactionTest,
   delete transaction;
 }
 
+TEST_P(WritePreparedTransactionTest, CommitAndSnapshotDuringCompaction) {
+  options.disable_auto_compactions = true;
+  ReOpen();
+
+  const Snapshot* snapshot = nullptr;
+  ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
+  auto* txn = db->BeginTransaction(WriteOptions());
+  ASSERT_OK(txn->SetName("txn"));
+  ASSERT_OK(txn->Put("key1", "value2"));
+  ASSERT_OK(txn->Prepare());
+
+  auto callback = [&](void*) {
+    // Snapshot is taken after compaction start. It should be taken into
+    // consideration for whether to compact out value1.
+    snapshot = db->GetSnapshot();
+    ASSERT_OK(txn->Commit());
+    delete txn;
+  };
+  SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
+                                        callback);
+  SyncPoint::GetInstance()->EnableProcessing();
+  ASSERT_OK(db->Flush(FlushOptions()));
+  ASSERT_NE(nullptr, snapshot);
+  VerifyKeys({{"key1", "value2"}});
+  VerifyKeys({{"key1", "value1"}}, snapshot);
+  db->ReleaseSnapshot(snapshot);
+}
+
 TEST_P(WritePreparedTransactionTest, Iterate) {
   auto verify_state = [](Iterator* iter, const std::string& key,
                          const std::string& value) {
@@ -1977,6 +2676,418 @@ TEST_P(WritePreparedTransactionTest, IteratorRefreshNotSupported) {
   delete iter;
 }
 
+// Committing an delayed prepared has two non-atomic steps: update commit cache,
+// remove seq from delayed_prepared_. The read in IsInSnapshot also involves two
+// non-atomic steps of checking these two data structures. This test breaks each
+// in the middle to ensure correctness in spite of non-atomic execution.
+// Note: This test is limitted to the case where snapshot is larger than the
+// max_evicted_seq_.
+TEST_P(WritePreparedTransactionTest, NonAtomicCommitOfDelayedPrepared) {
+  const size_t snapshot_cache_bits = 7;  // same as default
+  const size_t commit_cache_bits = 3;    // 8 entries
+  for (auto split_read : {true, false}) {
+    std::vector<bool> split_options = {false};
+    if (split_read) {
+      // Also test for break before mutex
+      split_options.push_back(true);
+    }
+    for (auto split_before_mutex : split_options) {
+      UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
+      ReOpen();
+      WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
+      DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
+      // Fill up the commit cache
+      std::string init_value("value1");
+      for (int i = 0; i < 10; i++) {
+        db->Put(WriteOptions(), Slice("key1"), Slice(init_value));
+      }
+      // Prepare a transaction but do not commit it
+      Transaction* txn =
+          db->BeginTransaction(WriteOptions(), TransactionOptions());
+      ASSERT_OK(txn->SetName("xid"));
+      ASSERT_OK(txn->Put(Slice("key1"), Slice("value2")));
+      ASSERT_OK(txn->Prepare());
+      // Commit a bunch of entries to advance max evicted seq and make the
+      // prepared a delayed prepared
+      for (int i = 0; i < 10; i++) {
+        db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
+      }
+      // The snapshot should not see the delayed prepared entry
+      auto snap = db->GetSnapshot();
+
+      if (split_read) {
+        if (split_before_mutex) {
+          // split before acquiring prepare_mutex_
+          rocksdb::SyncPoint::GetInstance()->LoadDependency(
+              {{"WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:pause",
+                "AtomicCommitOfDelayedPrepared:Commit:before"},
+               {"AtomicCommitOfDelayedPrepared:Commit:after",
+                "WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:resume"}});
+        } else {
+          // split right after reading from the commit cache
+          rocksdb::SyncPoint::GetInstance()->LoadDependency(
+              {{"WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause",
+                "AtomicCommitOfDelayedPrepared:Commit:before"},
+               {"AtomicCommitOfDelayedPrepared:Commit:after",
+                "WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume"}});
+        }
+      } else {  // split commit
+        // split right before removing from delayed_prepared_
+        rocksdb::SyncPoint::GetInstance()->LoadDependency(
+            {{"WritePreparedTxnDB::RemovePrepared:pause",
+              "AtomicCommitOfDelayedPrepared:Read:before"},
+             {"AtomicCommitOfDelayedPrepared:Read:after",
+              "WritePreparedTxnDB::RemovePrepared:resume"}});
+      }
+      SyncPoint::GetInstance()->EnableProcessing();
+
+      rocksdb::port::Thread commit_thread([&]() {
+        TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Commit:before");
+        ASSERT_OK(txn->Commit());
+        if (split_before_mutex) {
+          // Do bunch of inserts to evict the commit entry from the cache. This
+          // would prevent the 2nd look into commit cache under prepare_mutex_
+          // to see the commit entry.
+          auto seq = db_impl->TEST_GetLastVisibleSequence();
+          size_t tries = 0;
+          while (wp_db->max_evicted_seq_ < seq && tries < 50) {
+            db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
+            tries++;
+          };
+          ASSERT_LT(tries, 50);
+        }
+        TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Commit:after");
+        delete txn;
+      });
+
+      rocksdb::port::Thread read_thread([&]() {
+        TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Read:before");
+        ReadOptions roptions;
+        roptions.snapshot = snap;
+        PinnableSlice value;
+        auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value);
+        ASSERT_OK(s);
+        // It should not see the commit of delayed prepared
+        ASSERT_TRUE(value == init_value);
+        TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Read:after");
+        db->ReleaseSnapshot(snap);
+      });
+
+      read_thread.join();
+      commit_thread.join();
+      rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+      rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
+    }  // for split_before_mutex
+  }    // for split_read
+}
+
+// When max evicted seq advances a prepared seq, it involves two updates: i)
+// adding prepared seq to delayed_prepared_, ii) updating max_evicted_seq_.
+// ::IsInSnapshot also reads these two values in a non-atomic way. This test
+// ensures correctness if the update occurs after ::IsInSnapshot reads
+// delayed_prepared_empty_ and before it reads max_evicted_seq_.
+// Note: this test focuses on read snapshot larger than max_evicted_seq_.
+TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfDelayedPrepared) {
+  const size_t snapshot_cache_bits = 7;  // same as default
+  const size_t commit_cache_bits = 3;    // 8 entries
+  UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
+  ReOpen();
+  WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
+  // Fill up the commit cache
+  std::string init_value("value1");
+  for (int i = 0; i < 10; i++) {
+    db->Put(WriteOptions(), Slice("key1"), Slice(init_value));
+  }
+  // Prepare a transaction but do not commit it
+  Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
+  ASSERT_OK(txn->SetName("xid"));
+  ASSERT_OK(txn->Put(Slice("key1"), Slice("value2")));
+  ASSERT_OK(txn->Prepare());
+  // Create a gap between prepare seq and snapshot seq
+  db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
+  db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
+  // The snapshot should not see the delayed prepared entry
+  auto snap = db->GetSnapshot();
+  ASSERT_LT(txn->GetId(), snap->GetSequenceNumber());
+
+  // split right after reading delayed_prepared_empty_
+  rocksdb::SyncPoint::GetInstance()->LoadDependency(
+      {{"WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:pause",
+        "AtomicUpdateOfDelayedPrepared:before"},
+       {"AtomicUpdateOfDelayedPrepared:after",
+        "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:resume"}});
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  rocksdb::port::Thread commit_thread([&]() {
+    TEST_SYNC_POINT("AtomicUpdateOfDelayedPrepared:before");
+    // Commit a bunch of entries to advance max evicted seq and make the
+    // prepared a delayed prepared
+    size_t tries = 0;
+    while (wp_db->max_evicted_seq_ < txn->GetId() && tries < 50) {
+      db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
+      tries++;
+    };
+    ASSERT_LT(tries, 50);
+    // This is the case on which the test focuses
+    ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
+    TEST_SYNC_POINT("AtomicUpdateOfDelayedPrepared:after");
+  });
+
+  rocksdb::port::Thread read_thread([&]() {
+    ReadOptions roptions;
+    roptions.snapshot = snap;
+    PinnableSlice value;
+    auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value);
+    ASSERT_OK(s);
+    // It should not see the uncommitted value of delayed prepared
+    ASSERT_TRUE(value == init_value);
+    db->ReleaseSnapshot(snap);
+  });
+
+  read_thread.join();
+  commit_thread.join();
+  ASSERT_OK(txn->Commit());
+  delete txn;
+  rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+  rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+// Eviction from commit cache and update of max evicted seq are two non-atomic
+// steps. Similarly the read of max_evicted_seq_ in ::IsInSnapshot and reading
+// from commit cache are two non-atomic steps. This tests if the update occurs
+// after reading max_evicted_seq_ and before reading the commit cache.
+// Note: the test focuses on snapshot larger than max_evicted_seq_
+TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfMaxEvictedSeq) {
+  const size_t snapshot_cache_bits = 7;  // same as default
+  const size_t commit_cache_bits = 3;    // 8 entries
+  UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
+  ReOpen();
+  WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
+  // Fill up the commit cache
+  std::string init_value("value1");
+  std::string last_value("value_final");
+  for (int i = 0; i < 10; i++) {
+    db->Put(WriteOptions(), Slice("key1"), Slice(init_value));
+  }
+  // Do an uncommitted write to prevent min_uncommitted optimization
+  Transaction* txn1 =
+      db->BeginTransaction(WriteOptions(), TransactionOptions());
+  ASSERT_OK(txn1->SetName("xid1"));
+  ASSERT_OK(txn1->Put(Slice("key0"), last_value));
+  ASSERT_OK(txn1->Prepare());
+  // Do a write with prepare to get the prepare seq
+  Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
+  ASSERT_OK(txn->SetName("xid"));
+  ASSERT_OK(txn->Put(Slice("key1"), last_value));
+  ASSERT_OK(txn->Prepare());
+  ASSERT_OK(txn->Commit());
+  // Create a gap between commit entry and snapshot seq
+  db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
+  db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
+  // The snapshot should see the last commit
+  auto snap = db->GetSnapshot();
+  ASSERT_LE(txn->GetId(), snap->GetSequenceNumber());
+
+  // split right after reading max_evicted_seq_
+  rocksdb::SyncPoint::GetInstance()->LoadDependency(
+      {{"WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:pause",
+        "NonAtomicUpdateOfMaxEvictedSeq:before"},
+       {"NonAtomicUpdateOfMaxEvictedSeq:after",
+        "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:resume"}});
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  rocksdb::port::Thread commit_thread([&]() {
+    TEST_SYNC_POINT("NonAtomicUpdateOfMaxEvictedSeq:before");
+    // Commit a bunch of entries to advance max evicted seq beyond txn->GetId()
+    size_t tries = 0;
+    while (wp_db->max_evicted_seq_ < txn->GetId() && tries < 50) {
+      db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
+      tries++;
+    };
+    ASSERT_LT(tries, 50);
+    // This is the case on which the test focuses
+    ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
+    TEST_SYNC_POINT("NonAtomicUpdateOfMaxEvictedSeq:after");
+  });
+
+  rocksdb::port::Thread read_thread([&]() {
+    ReadOptions roptions;
+    roptions.snapshot = snap;
+    PinnableSlice value;
+    auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value);
+    ASSERT_OK(s);
+    // It should see the committed value of the evicted entry
+    ASSERT_TRUE(value == last_value);
+    db->ReleaseSnapshot(snap);
+  });
+
+  read_thread.join();
+  commit_thread.join();
+  delete txn;
+  txn1->Commit();
+  delete txn1;
+  rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+  rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+// Test when we add a prepared seq when the max_evicted_seq_ already goes beyond
+// that. The test focuses on a race condition between AddPrepared and
+// AdvanceMaxEvictedSeq functions.
+TEST_P(WritePreparedTransactionTest, AddPreparedBeforeMax) {
+  if (!options.two_write_queues) {
+    // This test is only for two write queues
+    return;
+  }
+  const size_t snapshot_cache_bits = 7;  // same as default
+  // 1 entry to advance max after the 2nd commit
+  const size_t commit_cache_bits = 0;
+  UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
+  ReOpen();
+  WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
+  std::string some_value("value_some");
+  std::string uncommitted_value("value_uncommitted");
+  // Prepare two uncommitted transactions
+  Transaction* txn1 =
+      db->BeginTransaction(WriteOptions(), TransactionOptions());
+  ASSERT_OK(txn1->SetName("xid1"));
+  ASSERT_OK(txn1->Put(Slice("key1"), some_value));
+  ASSERT_OK(txn1->Prepare());
+  Transaction* txn2 =
+      db->BeginTransaction(WriteOptions(), TransactionOptions());
+  ASSERT_OK(txn2->SetName("xid2"));
+  ASSERT_OK(txn2->Put(Slice("key2"), some_value));
+  ASSERT_OK(txn2->Prepare());
+  // Start the txn here so the other thread could get its id
+  Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
+  ASSERT_OK(txn->SetName("xid"));
+  ASSERT_OK(txn->Put(Slice("key0"), uncommitted_value));
+  port::Mutex txn_mutex_;
+
+  // t1) Insert prepared entry, t2) commit other entires to advance max
+  // evicted sec and finish checking the existing prepared entires, t1)
+  // AddPrepared, t2) update max_evicted_seq_
+  rocksdb::SyncPoint::GetInstance()->LoadDependency({
+      {"AddPrepared::begin:pause", "AddPreparedBeforeMax::read_thread:start"},
+      {"AdvanceMaxEvictedSeq::update_max:pause", "AddPrepared::begin:resume"},
+      {"AddPrepared::end", "AdvanceMaxEvictedSeq::update_max:resume"},
+  });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  rocksdb::port::Thread write_thread([&]() {
+    txn_mutex_.Lock();
+    ASSERT_OK(txn->Prepare());
+    txn_mutex_.Unlock();
+  });
+
+  rocksdb::port::Thread read_thread([&]() {
+    TEST_SYNC_POINT("AddPreparedBeforeMax::read_thread:start");
+    // Publish seq number with a commit
+    ASSERT_OK(txn1->Commit());
+    // Since the commit cache size is one the 2nd commit evict the 1st one and
+    // invokes AdcanceMaxEvictedSeq
+    ASSERT_OK(txn2->Commit());
+
+    ReadOptions roptions;
+    PinnableSlice value;
+    // The snapshot should not see the uncommitted value from write_thread
+    auto snap = db->GetSnapshot();
+    ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
+    // This is the scenario that we test for
+    txn_mutex_.Lock();
+    ASSERT_GT(wp_db->max_evicted_seq_, txn->GetId());
+    txn_mutex_.Unlock();
+    roptions.snapshot = snap;
+    auto s = db->Get(roptions, db->DefaultColumnFamily(), "key0", &value);
+    ASSERT_TRUE(s.IsNotFound());
+    db->ReleaseSnapshot(snap);
+  });
+
+  read_thread.join();
+  write_thread.join();
+  delete txn1;
+  delete txn2;
+  ASSERT_OK(txn->Commit());
+  delete txn;
+  rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+  rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+// When an old prepared entry gets committed, there is a gap between the time
+// that it is published and when it is cleaned up from old_prepared_. This test
+// stresses such cases.
+TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) {
+  const size_t snapshot_cache_bits = 7;  // same as default
+  for (const size_t commit_cache_bits : {0, 2, 3}) {
+    for (const size_t sub_batch_cnt : {1, 2, 3}) {
+      UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
+      ReOpen();
+      std::atomic<const Snapshot*> snap = {nullptr};
+      std::atomic<SequenceNumber> exp_prepare = {0};
+      // Value is synchronized via snap
+      PinnableSlice value;
+      // Take a snapshot after publish and before RemovePrepared:Start
+      auto callback = [&](void* param) {
+        SequenceNumber prep_seq = *((SequenceNumber*)param);
+        if (prep_seq == exp_prepare.load()) {  // only for write_thread
+          ASSERT_EQ(nullptr, snap.load());
+          snap.store(db->GetSnapshot());
+          ReadOptions roptions;
+          roptions.snapshot = snap.load();
+          auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &value);
+          ASSERT_OK(s);
+        }
+      };
+      SyncPoint::GetInstance()->SetCallBack("RemovePrepared:Start", callback);
+      SyncPoint::GetInstance()->EnableProcessing();
+      // Thread to cause frequent evictions
+      rocksdb::port::Thread eviction_thread([&]() {
+        // Too many txns might cause commit_seq - prepare_seq in another thread
+        // to go beyond DELTA_UPPERBOUND
+        for (int i = 0; i < 25 * (1 << commit_cache_bits); i++) {
+          db->Put(WriteOptions(), Slice("key1"), Slice("value1"));
+        }
+      });
+      rocksdb::port::Thread write_thread([&]() {
+        for (int i = 0; i < 25 * (1 << commit_cache_bits); i++) {
+          Transaction* txn =
+              db->BeginTransaction(WriteOptions(), TransactionOptions());
+          ASSERT_OK(txn->SetName("xid"));
+          std::string val_str = "value" + ToString(i);
+          for (size_t b = 0; b < sub_batch_cnt; b++) {
+            ASSERT_OK(txn->Put(Slice("key"), val_str));
+          }
+          ASSERT_OK(txn->Prepare());
+          // Let an eviction to kick in
+          std::this_thread::yield();
+
+          exp_prepare.store(txn->GetId());
+          ASSERT_OK(txn->Commit());
+          delete txn;
+
+          // Read with the snapshot taken before delayed_prepared_ cleanup
+          ReadOptions roptions;
+          roptions.snapshot = snap.load();
+          ASSERT_NE(nullptr, roptions.snapshot);
+          PinnableSlice value2;
+          auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &value2);
+          ASSERT_OK(s);
+          // It should see its own write
+          ASSERT_TRUE(val_str == value2);
+          // The value read by snapshot should not change
+          ASSERT_STREQ(value2.ToString().c_str(), value.ToString().c_str());
+
+          db->ReleaseSnapshot(roptions.snapshot);
+          snap.store(nullptr);
+        }
+      });
+      write_thread.join();
+      eviction_thread.join();
+    }
+    rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+    rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
+  }
+}
+
 // Test that updating the commit map will not affect the existing snapshots
 TEST_P(WritePreparedTransactionTest, AtomicCommit) {
   for (bool skip_prepare : {true, false}) {