]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/utilities/transactions/write_prepared_txn_db.h
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / utilities / transactions / write_prepared_txn_db.h
index ec76e271634cb4b25d1cadb88a805a1ef225609c..10d1dbf607ea6c274512395c4cd3d1cdcdea0d54 100644 (file)
 
 namespace rocksdb {
 
-#define ROCKS_LOG_DETAILS(LGR, FMT, ...) \
-  ;  // due to overhead by default skip such lines
-// ROCKS_LOG_DEBUG(LGR, FMT, ##__VA_ARGS__)
-
 // A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC.
 // In this way some data in the DB might not be committed. The DB provides
 // mechanisms to tell such data apart from committed data.
 class WritePreparedTxnDB : public PessimisticTransactionDB {
  public:
-  explicit WritePreparedTxnDB(
-      DB* db, const TransactionDBOptions& txn_db_options,
-      size_t snapshot_cache_bits = DEF_SNAPSHOT_CACHE_BITS,
-      size_t commit_cache_bits = DEF_COMMIT_CACHE_BITS)
+  explicit WritePreparedTxnDB(DB* db,
+                              const TransactionDBOptions& txn_db_options)
       : PessimisticTransactionDB(db, txn_db_options),
-        SNAPSHOT_CACHE_BITS(snapshot_cache_bits),
+        SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits),
         SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)),
-        COMMIT_CACHE_BITS(commit_cache_bits),
+        COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits),
         COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
         FORMAT(COMMIT_CACHE_BITS) {
     Init(txn_db_options);
   }
 
-  explicit WritePreparedTxnDB(
-      StackableDB* db, const TransactionDBOptions& txn_db_options,
-      size_t snapshot_cache_bits = DEF_SNAPSHOT_CACHE_BITS,
-      size_t commit_cache_bits = DEF_COMMIT_CACHE_BITS)
+  explicit WritePreparedTxnDB(StackableDB* db,
+                              const TransactionDBOptions& txn_db_options)
       : PessimisticTransactionDB(db, txn_db_options),
-        SNAPSHOT_CACHE_BITS(snapshot_cache_bits),
+        SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits),
         SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)),
-        COMMIT_CACHE_BITS(commit_cache_bits),
+        COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits),
         COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
         FORMAT(COMMIT_CACHE_BITS) {
     Init(txn_db_options);
@@ -112,17 +104,21 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
       const std::vector<ColumnFamilyHandle*>& column_families,
       std::vector<Iterator*>* iterators) override;
 
-  virtual void ReleaseSnapshot(const Snapshot* snapshot) override;
-
   // Check whether the transaction that wrote the value with sequence number seq
   // is visible to the snapshot with sequence number snapshot_seq.
   // Returns true if commit_seq <= snapshot_seq
+  // If the snapshot_seq is already released and snapshot_seq <= max, sets
+  // *snap_released to true and returns true as well.
   inline bool IsInSnapshot(uint64_t prep_seq, uint64_t snapshot_seq,
-                           uint64_t min_uncommitted = 0) const {
+                           uint64_t min_uncommitted = kMinUnCommittedSeq,
+                           bool* snap_released = nullptr) const {
     ROCKS_LOG_DETAILS(info_log_,
                       "IsInSnapshot %" PRIu64 " in %" PRIu64
                       " min_uncommitted %" PRIu64,
                       prep_seq, snapshot_seq, min_uncommitted);
+    assert(min_uncommitted >= kMinUnCommittedSeq);
+    // Caller is responsible to initialize snap_released.
+    assert(snap_released == nullptr || *snap_released == false);
     // Here we try to infer the return value without looking into prepare list.
     // This would help avoiding synchronization over a shared map.
     // TODO(myabandeh): optimize this. This sequence of checks must be correct
@@ -142,24 +138,6 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
           prep_seq, snapshot_seq, 0);
       return false;
     }
-    if (!delayed_prepared_empty_.load(std::memory_order_acquire)) {
-      // We should not normally reach here
-      WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD);
-      ReadLock rl(&prepared_mutex_);
-      ROCKS_LOG_WARN(info_log_, "prepared_mutex_ overhead %" PRIu64,
-                     static_cast<uint64_t>(delayed_prepared_.size()));
-      if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) {
-        // Then it is not committed yet
-        ROCKS_LOG_DETAILS(info_log_,
-                          "IsInSnapshot %" PRIu64 " in %" PRIu64
-                          " returns %" PRId32,
-                          prep_seq, snapshot_seq, 0);
-        return false;
-      }
-    }
-    // Note: since min_uncommitted does not include the delayed_prepared_ we
-    // should check delayed_prepared_ first before applying this optimization.
-    // TODO(myabandeh): include delayed_prepared_ in min_uncommitted
     if (prep_seq < min_uncommitted) {
       ROCKS_LOG_DETAILS(info_log_,
                         "IsInSnapshot %" PRIu64 " in %" PRIu64
@@ -168,38 +146,119 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
                         prep_seq, snapshot_seq, 1, min_uncommitted);
       return true;
     }
-    auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE;
+    // Commit of delayed prepared has two non-atomic steps: add to commit cache,
+    // remove from delayed prepared. Our reads from these two is also
+    // non-atomic. By looking into commit cache first thus we might not find the
+    // prep_seq neither in commit cache not in delayed_prepared_. To fix that i)
+    // we check if there was any delayed prepared BEFORE looking into commit
+    // cache, ii) if there was, we complete the search steps to be these: i)
+    // commit cache, ii) delayed prepared, commit cache again. In this way if
+    // the first query to commit cache missed the commit, the 2nd will catch it.
+    bool was_empty;
+    SequenceNumber max_evicted_seq_lb, max_evicted_seq_ub;
     CommitEntry64b dont_care;
-    CommitEntry cached;
-    bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
-    if (exist && prep_seq == cached.prep_seq) {
-      // It is committed and also not evicted from commit cache
-      ROCKS_LOG_DETAILS(
-          info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
-          prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);
-      return cached.commit_seq <= snapshot_seq;
-    }
-    // else it could be committed but not inserted in the map which could happen
-    // after recovery, or it could be committed and evicted by another commit,
-    // or never committed.
-
-    // At this point we dont know if it was committed or it is still prepared
-    auto max_evicted_seq = max_evicted_seq_.load(std::memory_order_acquire);
-    // max_evicted_seq_ when we did GetCommitEntry <= max_evicted_seq now
-    if (max_evicted_seq < prep_seq) {
-      // Not evicted from cache and also not present, so must be still prepared
-      ROCKS_LOG_DETAILS(
-          info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
-          prep_seq, snapshot_seq, 0);
-      return false;
-    }
+    auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE;
+    size_t repeats = 0;
+    do {
+      repeats++;
+      assert(repeats < 100);
+      if (UNLIKELY(repeats >= 100)) {
+        throw std::runtime_error(
+            "The read was intrupted 100 times by update to max_evicted_seq_. "
+            "This is unexpected in all setups");
+      }
+      max_evicted_seq_lb = max_evicted_seq_.load(std::memory_order_acquire);
+      TEST_SYNC_POINT(
+          "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:pause");
+      TEST_SYNC_POINT(
+          "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:resume");
+      was_empty = delayed_prepared_empty_.load(std::memory_order_acquire);
+      TEST_SYNC_POINT(
+          "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:pause");
+      TEST_SYNC_POINT(
+          "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:resume");
+      CommitEntry cached;
+      bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
+      TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause");
+      TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume");
+      if (exist && prep_seq == cached.prep_seq) {
+        // It is committed and also not evicted from commit cache
+        ROCKS_LOG_DETAILS(
+            info_log_,
+            "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
+            prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);
+        return cached.commit_seq <= snapshot_seq;
+      }
+      // else it could be committed but not inserted in the map which could
+      // happen after recovery, or it could be committed and evicted by another
+      // commit, or never committed.
+
+      // At this point we dont know if it was committed or it is still prepared
+      max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire);
+      if (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub)) {
+        continue;
+      }
+      // Note: max_evicted_seq_ when we did GetCommitEntry <= max_evicted_seq_ub
+      if (max_evicted_seq_ub < prep_seq) {
+        // Not evicted from cache and also not present, so must be still
+        // prepared
+        ROCKS_LOG_DETAILS(info_log_,
+                          "IsInSnapshot %" PRIu64 " in %" PRIu64
+                          " returns %" PRId32,
+                          prep_seq, snapshot_seq, 0);
+        return false;
+      }
+      TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:pause");
+      TEST_SYNC_POINT(
+          "WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:resume");
+      if (!was_empty) {
+        // We should not normally reach here
+        WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD);
+        ReadLock rl(&prepared_mutex_);
+        ROCKS_LOG_WARN(
+            info_log_, "prepared_mutex_ overhead %" PRIu64 " for %" PRIu64,
+            static_cast<uint64_t>(delayed_prepared_.size()), prep_seq);
+        if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) {
+          // This is the order: 1) delayed_prepared_commits_ update, 2) publish
+          // 3) delayed_prepared_ clean up. So check if it is the case of a late
+          // clenaup.
+          auto it = delayed_prepared_commits_.find(prep_seq);
+          if (it == delayed_prepared_commits_.end()) {
+            // Then it is not committed yet
+            ROCKS_LOG_DETAILS(info_log_,
+                              "IsInSnapshot %" PRIu64 " in %" PRIu64
+                              " returns %" PRId32,
+                              prep_seq, snapshot_seq, 0);
+            return false;
+          } else {
+            ROCKS_LOG_DETAILS(info_log_,
+                              "IsInSnapshot %" PRIu64 " in %" PRIu64
+                              " commit: %" PRIu64 " returns %" PRId32,
+                              prep_seq, snapshot_seq, it->second,
+                              snapshot_seq <= it->second);
+            return it->second <= snapshot_seq;
+          }
+        } else {
+          // 2nd query to commit cache. Refer to was_empty comment above.
+          exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
+          if (exist && prep_seq == cached.prep_seq) {
+            ROCKS_LOG_DETAILS(
+                info_log_,
+                "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
+                prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);
+            return cached.commit_seq <= snapshot_seq;
+          }
+          max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire);
+        }
+      }
+    } while (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub));
     // When advancing max_evicted_seq_, we move older entires from prepared to
     // delayed_prepared_. Also we move evicted entries from commit cache to
     // old_commit_map_ if it overlaps with any snapshot. Since prep_seq <=
     // max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in
     // old_commit_map_, iii) committed with no conflict with any snapshot. Case
     // (i) delayed_prepared_ is checked above
-    if (max_evicted_seq < snapshot_seq) {  // then (ii) cannot be the case
+    if (max_evicted_seq_ub < snapshot_seq) {  // then (ii) cannot be the case
       // only (iii) is the case: committed
       // commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq <
       // snapshot_seq
@@ -212,9 +271,15 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
     // snapshot. If there was no overlapping commit entry, then it is committed
     // with a commit_seq lower than any live snapshot, including snapshot_seq.
     if (old_commit_map_empty_.load(std::memory_order_acquire)) {
-      ROCKS_LOG_DETAILS(
-          info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
-          prep_seq, snapshot_seq, 1);
+      ROCKS_LOG_DETAILS(info_log_,
+                        "IsInSnapshot %" PRIu64 " in %" PRIu64
+                        " returns %" PRId32 " released=1",
+                        prep_seq, snapshot_seq, 0);
+      assert(snap_released);
+      // This snapshot is not valid anymore. We cannot tell if prep_seq is
+      // committed before or after the snapshot. Return true but also set
+      // snap_released to true.
+      *snap_released = true;
       return true;
     }
     {
@@ -222,14 +287,26 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
       // rare case and it is ok to pay the cost of mutex ReadLock for such old,
       // reading transactions.
       WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
-      ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead");
       ReadLock rl(&old_commit_map_mutex_);
       auto prep_set_entry = old_commit_map_.find(snapshot_seq);
       bool found = prep_set_entry != old_commit_map_.end();
       if (found) {
         auto& vec = prep_set_entry->second;
         found = std::binary_search(vec.begin(), vec.end(), prep_seq);
+      } else {
+        // coming from compaction
+        ROCKS_LOG_DETAILS(info_log_,
+                          "IsInSnapshot %" PRIu64 " in %" PRIu64
+                          " returns %" PRId32 " released=1",
+                          prep_seq, snapshot_seq, 0);
+        // This snapshot is not valid anymore. We cannot tell if prep_seq is
+        // committed before or after the snapshot. Return true but also set
+        // snap_released to true.
+        assert(snap_released);
+        *snap_released = true;
+        return true;
       }
+
       if (!found) {
         ROCKS_LOG_DETAILS(info_log_,
                           "IsInSnapshot %" PRIu64 " in %" PRIu64
@@ -245,12 +322,17 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
     return false;
   }
 
-  // Add the transaction with prepare sequence seq to the prepared list
+  // Add the transaction with prepare sequence seq to the prepared list.
+  // Note: must be called serially with increasing seq on each call.
   void AddPrepared(uint64_t seq);
+  // Check if any of the prepared txns are less than new max_evicted_seq_. Must
+  // be called with prepared_mutex_ write locked.
+  void CheckPreparedAgainstMax(SequenceNumber new_max);
   // Remove the transaction with prepare sequence seq from the prepared list
   void RemovePrepared(const uint64_t seq, const size_t batch_cnt = 1);
   // Add the transaction with prepare sequence prepare_seq and commit sequence
   // commit_seq to the commit map. loop_cnt is to detect infinite loops.
+  // Note: must be called serially.
   void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
                     uint8_t loop_cnt = 0);
 
@@ -358,29 +440,43 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
   void UpdateCFComparatorMap(ColumnFamilyHandle* handle) override;
 
   virtual const Snapshot* GetSnapshot() override;
+  SnapshotImpl* GetSnapshotInternal(bool for_ww_conflict_check);
 
  protected:
   virtual Status VerifyCFOptions(
       const ColumnFamilyOptions& cf_options) override;
 
  private:
-  friend class WritePreparedTransactionTest_IsInSnapshotTest_Test;
-  friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test;
-  friend class WritePreparedTransactionTest_CommitMapTest_Test;
-  friend class
-      WritePreparedTransactionTest_ConflictDetectionAfterRecoveryTest_Test;
-  friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccessTest_Test;
-  friend class WritePreparedTransactionTestBase;
   friend class PreparedHeap_BasicsTest_Test;
-  friend class PreparedHeap_EmptyAtTheEnd_Test;
   friend class PreparedHeap_Concurrent_Test;
+  friend class PreparedHeap_EmptyAtTheEnd_Test;
+  friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccessTest_Test;
+  friend class WritePreparedCommitEntryPreReleaseCallback;
+  friend class WritePreparedTransactionTestBase;
   friend class WritePreparedTxn;
   friend class WritePreparedTxnDBMock;
+  friend class WritePreparedTransactionTest_AddPreparedBeforeMax_Test;
   friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test;
   friend class
       WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicatesTest_Test;
+  friend class WritePreparedTransactionTest_AdvanceSeqByOne_Test;
   friend class WritePreparedTransactionTest_BasicRecoveryTest_Test;
+  friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test;
+  friend class WritePreparedTransactionTest_CleanupSnapshotEqualToMax_Test;
+  friend class
+      WritePreparedTransactionTest_ConflictDetectionAfterRecoveryTest_Test;
+  friend class WritePreparedTransactionTest_CommitMapTest_Test;
+  friend class WritePreparedTransactionTest_DoubleSnapshot_Test;
   friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test;
+  friend class WritePreparedTransactionTest_IsInSnapshotReleased_Test;
+  friend class WritePreparedTransactionTest_IsInSnapshotTest_Test;
+  friend class WritePreparedTransactionTest_NewSnapshotLargerThanMax_Test;
+  friend class WritePreparedTransactionTest_MaxCatchupWithNewSnapshot_Test;
+  friend class
+      WritePreparedTransactionTest_NonAtomicCommitOfDelayedPrepared_Test;
+  friend class
+      WritePreparedTransactionTest_NonAtomicUpdateOfDelayedPrepared_Test;
+  friend class WritePreparedTransactionTest_NonAtomicUpdateOfMaxEvictedSeq_Test;
   friend class WritePreparedTransactionTest_OldCommitMapGC_Test;
   friend class WritePreparedTransactionTest_RollbackTest_Test;
   friend class WriteUnpreparedTxnDB;
@@ -491,6 +587,10 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
     // reflect any uncommitted data that is not added to prepared_txns_ yet.
     // Otherwise, if there is no concurrent txn, this value simply reflects that
     // latest value in the memtable.
+    if (!delayed_prepared_.empty()) {
+      assert(!delayed_prepared_empty_.load());
+      return *delayed_prepared_.begin();
+    }
     if (prepared_txns_.empty()) {
       return db_impl_->GetLatestSequenceNumber() + 1;
     } else {
@@ -519,6 +619,11 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
   // version value.
   void UpdateSnapshots(const std::vector<SequenceNumber>& snapshots,
                        const SequenceNumber& version);
+  // Check the new list of new snapshots against the old one to see  if any of
+  // the snapshots are released and to do the cleanup for the released snapshot.
+  void CleanupReleasedSnapshots(
+      const std::vector<SequenceNumber>& new_snapshots,
+      const std::vector<SequenceNumber>& old_snapshots);
 
   // Check an evicted entry against live snapshots to see if it should be kept
   // around or it can be safely discarded (and hence assume committed for all
@@ -537,6 +642,10 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
                                const uint64_t& snapshot_seq,
                                const bool next_is_larger);
 
+  // A trick to increase the last visible sequence number by one and also wait
+  // for the in-flight commits to be visible.
+  void AdvanceSeqByOne();
+
   // The list of live snapshots at the last time that max_evicted_seq_ advanced.
   // The list stored into two data structures: in snapshot_cache_ that is
   // efficient for concurrent reads, and in snapshots_ if the data does not fit
@@ -545,14 +654,17 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
   // The list sorted in ascending order. Thread-safety for writes is provided
   // with snapshots_mutex_ and concurrent reads are safe due to std::atomic for
   // each entry. In x86_64 architecture such reads are compiled to simple read
-  // instructions. 128 entries
-  static const size_t DEF_SNAPSHOT_CACHE_BITS = static_cast<size_t>(7);
+  // instructions.
   const size_t SNAPSHOT_CACHE_BITS;
   const size_t SNAPSHOT_CACHE_SIZE;
-  unique_ptr<std::atomic<SequenceNumber>[]> snapshot_cache_;
+  std::unique_ptr<std::atomic<SequenceNumber>[]> snapshot_cache_;
   // 2nd list for storing snapshots. The list sorted in ascending order.
   // Thread-safety is provided with snapshots_mutex_.
   std::vector<SequenceNumber> snapshots_;
+  // The list of all snapshots: snapshots_ + snapshot_cache_. This list although
+  // redundant but simplifies CleanupOldSnapshots implementation.
+  // Thread-safety is provided with snapshots_mutex_.
+  std::vector<SequenceNumber> snapshots_all_;
   // The version of the latest list of snapshots. This can be used to avoid
   // rewriting a list that is concurrently updated with a more recent version.
   SequenceNumber snapshots_version_ = 0;
@@ -560,19 +672,23 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
   // A heap of prepared transactions. Thread-safety is provided with
   // prepared_mutex_.
   PreparedHeap prepared_txns_;
-  // 8m entry, 64MB size
-  static const size_t DEF_COMMIT_CACHE_BITS = static_cast<size_t>(23);
   const size_t COMMIT_CACHE_BITS;
   const size_t COMMIT_CACHE_SIZE;
   const CommitEntry64bFormat FORMAT;
   // commit_cache_ must be initialized to zero to tell apart an empty index from
   // a filled one. Thread-safety is provided with commit_cache_mutex_.
-  unique_ptr<std::atomic<CommitEntry64b>[]> commit_cache_;
+  std::unique_ptr<std::atomic<CommitEntry64b>[]> commit_cache_;
   // The largest evicted *commit* sequence number from the commit_cache_. If a
   // seq is smaller than max_evicted_seq_ is might or might not be present in
   // commit_cache_. So commit_cache_ must first be checked before consulting
   // with max_evicted_seq_.
   std::atomic<uint64_t> max_evicted_seq_ = {};
+  // Order: 1) update future_max_evicted_seq_ = new_max, 2)
+  // GetSnapshotListFromDB(new_max), max_evicted_seq_ = new_max. Since
+  // GetSnapshotInternal guarantess that the snapshot seq is larger than
+  // future_max_evicted_seq_, this guarantes that if a snapshot is not larger
+  // than max has already being looked at via a GetSnapshotListFromDB(new_max).
+  std::atomic<uint64_t> future_max_evicted_seq_ = {};
   // Advance max_evicted_seq_ by this value each time it needs an update. The
   // larger the value, the less frequent advances we would have. We do not want
   // it to be too large either as it would cause stalls by doing too much
@@ -590,6 +706,11 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
   // time max_evicted_seq_ advances their sequence number. This is expected to
   // be empty normally. Thread-safety is provided with prepared_mutex_.
   std::set<uint64_t> delayed_prepared_;
+  // Commit of a delayed prepared: 1) update commit cache, 2) update
+  // delayed_prepared_commits_, 3) publish seq, 3) clean up delayed_prepared_.
+  // delayed_prepared_commits_ will help us tell apart the unprepared txns from
+  // the ones that are committed but not cleaned up yet.
+  std::unordered_map<SequenceNumber, SequenceNumber> delayed_prepared_commits_;
   // Update when delayed_prepared_.empty() changes. Expected to be true
   // normally.
   std::atomic<bool> delayed_prepared_empty_ = {true};
@@ -610,76 +731,93 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
 
 class WritePreparedTxnReadCallback : public ReadCallback {
  public:
+  WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot)
+      : ReadCallback(snapshot), db_(db) {}
   WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot,
                                SequenceNumber min_uncommitted)
-      : db_(db), snapshot_(snapshot), min_uncommitted_(min_uncommitted) {}
+      : ReadCallback(snapshot, min_uncommitted), db_(db) {}
 
   // Will be called to see if the seq number visible; if not it moves on to
   // the next seq number.
-  inline virtual bool IsVisible(SequenceNumber seq) override {
-    return db_->IsInSnapshot(seq, snapshot_, min_uncommitted_);
+  inline virtual bool IsVisibleFullCheck(SequenceNumber seq) override {
+    auto snapshot = max_visible_seq_;
+    return db_->IsInSnapshot(seq, snapshot, min_uncommitted_);
   }
 
+  // TODO(myabandeh): override Refresh when Iterator::Refresh is supported
  private:
   WritePreparedTxnDB* db_;
-  SequenceNumber snapshot_;
-  SequenceNumber min_uncommitted_;
 };
 
 class AddPreparedCallback : public PreReleaseCallback {
  public:
-  AddPreparedCallback(WritePreparedTxnDB* db, size_t sub_batch_cnt,
-                      bool two_write_queues)
+  AddPreparedCallback(WritePreparedTxnDB* db, DBImpl* db_impl,
+                      size_t sub_batch_cnt, bool two_write_queues,
+                      bool first_prepare_batch)
       : db_(db),
+        db_impl_(db_impl),
         sub_batch_cnt_(sub_batch_cnt),
-        two_write_queues_(two_write_queues) {
+        two_write_queues_(two_write_queues),
+        first_prepare_batch_(first_prepare_batch) {
     (void)two_write_queues_;  // to silence unused private field warning
   }
   virtual Status Callback(SequenceNumber prepare_seq,
-                          bool is_mem_disabled) override {
-#ifdef NDEBUG
-    (void)is_mem_disabled;
-#endif
+                          bool is_mem_disabled __attribute__((__unused__)),
+                          uint64_t log_number) override {
+    // Always Prepare from the main queue
     assert(!two_write_queues_ || !is_mem_disabled);  // implies the 1st queue
     for (size_t i = 0; i < sub_batch_cnt_; i++) {
       db_->AddPrepared(prepare_seq + i);
     }
+    if (first_prepare_batch_) {
+      assert(log_number != 0);
+      db_impl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(
+          log_number);
+    }
     return Status::OK();
   }
 
  private:
   WritePreparedTxnDB* db_;
+  DBImpl* db_impl_;
   size_t sub_batch_cnt_;
   bool two_write_queues_;
+  // It is 2PC and this is the first prepare batch. Always the case in 2PC
+  // unless it is WriteUnPrepared.
+  bool first_prepare_batch_;
 };
 
 class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
  public:
   // includes_data indicates that the commit also writes non-empty
   // CommitTimeWriteBatch to memtable, which needs to be committed separately.
-  WritePreparedCommitEntryPreReleaseCallback(WritePreparedTxnDB* db,
-                                             DBImpl* db_impl,
-                                             SequenceNumber prep_seq,
-                                             size_t prep_batch_cnt,
-                                             size_t data_batch_cnt = 0,
-                                             bool publish_seq = true)
+  WritePreparedCommitEntryPreReleaseCallback(
+      WritePreparedTxnDB* db, DBImpl* db_impl, SequenceNumber prep_seq,
+      size_t prep_batch_cnt, size_t data_batch_cnt = 0,
+      SequenceNumber aux_seq = kMaxSequenceNumber, size_t aux_batch_cnt = 0)
       : db_(db),
         db_impl_(db_impl),
         prep_seq_(prep_seq),
         prep_batch_cnt_(prep_batch_cnt),
         data_batch_cnt_(data_batch_cnt),
         includes_data_(data_batch_cnt_ > 0),
-        publish_seq_(publish_seq) {
+        aux_seq_(aux_seq),
+        aux_batch_cnt_(aux_batch_cnt),
+        includes_aux_batch_(aux_batch_cnt > 0) {
     assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber));  // xor
     assert(prep_batch_cnt_ > 0 || data_batch_cnt_ > 0);
+    assert((aux_batch_cnt_ > 0) != (aux_seq == kMaxSequenceNumber));  // xor
   }
 
   virtual Status Callback(SequenceNumber commit_seq,
-                          bool is_mem_disabled) override {
-#ifdef NDEBUG
-    (void)is_mem_disabled;
-#endif
+                          bool is_mem_disabled __attribute__((__unused__)),
+                          uint64_t) override {
+    // Always commit from the 2nd queue
+    assert(!db_impl_->immutable_db_options().two_write_queues ||
+           is_mem_disabled);
     assert(includes_data_ || prep_seq_ != kMaxSequenceNumber);
+    // Data batch is what accompanied with the commit marker and affects the
+    // last seq in the commit batch.
     const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1)
                                          ? commit_seq
                                          : commit_seq + data_batch_cnt_ - 1;
@@ -688,6 +826,11 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
         db_->AddCommitted(prep_seq_ + i, last_commit_seq);
       }
     }  // else there was no prepare phase
+    if (includes_aux_batch_) {
+      for (size_t i = 0; i < aux_batch_cnt_; i++) {
+        db_->AddCommitted(aux_seq_ + i, last_commit_seq);
+      }
+    }
     if (includes_data_) {
       assert(data_batch_cnt_);
       // Commit the data that is accompanied with the commit request
@@ -698,7 +841,7 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
         db_->AddCommitted(commit_seq + i, last_commit_seq);
       }
     }
-    if (db_impl_->immutable_db_options().two_write_queues && publish_seq_) {
+    if (db_impl_->immutable_db_options().two_write_queues) {
       assert(is_mem_disabled);  // implies the 2nd queue
       // Publish the sequence number. We can do that here assuming the callback
       // is invoked only from one write queue, which would guarantee that the
@@ -718,11 +861,60 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
   SequenceNumber prep_seq_;
   size_t prep_batch_cnt_;
   size_t data_batch_cnt_;
-  // Either because it is commit without prepare or it has a
-  // CommitTimeWriteBatch
+  // Data here is the batch that is written with the commit marker, either
+  // because it is commit without prepare or commit has a CommitTimeWriteBatch.
   bool includes_data_;
-  // Should the callback also publishes the commit seq number
-  bool publish_seq_;
+  // Auxiliary batch (if there is any) is a batch that is written before, but
+  // gets the same commit seq as prepare batch or data batch. This is used in
+  // two write queues where the CommitTimeWriteBatch becomes the aux batch and
+  // we do a separate write to actually commit everything.
+  SequenceNumber aux_seq_;
+  size_t aux_batch_cnt_;
+  bool includes_aux_batch_;
+};
+
+// For two_write_queues commit both the aborted batch and the cleanup batch and
+// then published the seq
+class WritePreparedRollbackPreReleaseCallback : public PreReleaseCallback {
+ public:
+  WritePreparedRollbackPreReleaseCallback(WritePreparedTxnDB* db,
+                                          DBImpl* db_impl,
+                                          SequenceNumber prep_seq,
+                                          SequenceNumber rollback_seq,
+                                          size_t prep_batch_cnt)
+      : db_(db),
+        db_impl_(db_impl),
+        prep_seq_(prep_seq),
+        rollback_seq_(rollback_seq),
+        prep_batch_cnt_(prep_batch_cnt) {
+    assert(prep_seq != kMaxSequenceNumber);
+    assert(rollback_seq != kMaxSequenceNumber);
+    assert(prep_batch_cnt_ > 0);
+  }
+
+  Status Callback(SequenceNumber commit_seq, bool is_mem_disabled,
+                  uint64_t) override {
+    // Always commit from the 2nd queue
+    assert(is_mem_disabled);  // implies the 2nd queue
+    assert(db_impl_->immutable_db_options().two_write_queues);
+#ifdef NDEBUG
+    (void)is_mem_disabled;
+#endif
+    const uint64_t last_commit_seq = commit_seq;
+    db_->AddCommitted(rollback_seq_, last_commit_seq);
+    for (size_t i = 0; i < prep_batch_cnt_; i++) {
+      db_->AddCommitted(prep_seq_ + i, last_commit_seq);
+    }
+    db_impl_->SetLastPublishedSequence(last_commit_seq);
+    return Status::OK();
+  }
+
+ private:
+  WritePreparedTxnDB* db_;
+  DBImpl* db_impl_;
+  SequenceNumber prep_seq_;
+  SequenceNumber rollback_seq_;
+  size_t prep_batch_cnt_;
 };
 
 // Count the number of sub-batches inside a batch. A sub-batch does not have