]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/write_batch.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / db / write_batch.cc
index dacd17ed488634bef2a254e8220c94da6f5aadee..295fba22ed0d6833899e8de2f5810ce2aaed767a 100644 (file)
@@ -1,7 +1,7 @@
 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
-//  This source code is licensed under the BSD-style license found in the
-//  LICENSE file in the root directory of this source tree. An additional grant
-//  of patent rights can be found in the PATENTS file in the same directory.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
 //
 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
 // Use of this source code is governed by a BSD-style license that can be
 //    kTypeValue varstring varstring
 //    kTypeDeletion varstring
 //    kTypeSingleDeletion varstring
+//    kTypeRangeDeletion varstring varstring
 //    kTypeMerge varstring varstring
 //    kTypeColumnFamilyValue varint32 varstring varstring
-//    kTypeColumnFamilyDeletion varint32 varstring varstring
-//    kTypeColumnFamilySingleDeletion varint32 varstring varstring
+//    kTypeColumnFamilyDeletion varint32 varstring
+//    kTypeColumnFamilySingleDeletion varint32 varstring
+//    kTypeColumnFamilyRangeDeletion varint32 varstring varstring
 //    kTypeColumnFamilyMerge varint32 varstring varstring
 //    kTypeBeginPrepareXID varstring
 //    kTypeEndPrepareXID
 //    kTypeCommitXID varstring
 //    kTypeRollbackXID varstring
+//    kTypeBeginPersistedPrepareXID varstring
+//    kTypeBeginUnprepareXID varstring
 //    kTypeNoop
 // varstring :=
 //    len: varint32
@@ -49,7 +53,9 @@
 #include "monitoring/statistics.h"
 #include "rocksdb/merge_operator.h"
 #include "util/coding.h"
+#include "util/duplicate_detector.h"
 #include "util/string_util.h"
+#include "util/util.h"
 
 namespace rocksdb {
 
@@ -67,6 +73,8 @@ enum ContentFlags : uint32_t {
   HAS_COMMIT = 1 << 7,
   HAS_ROLLBACK = 1 << 8,
   HAS_DELETE_RANGE = 1 << 9,
+  HAS_BLOB_INDEX = 1 << 10,
+  HAS_BEGIN_UNPREPARE = 1 << 11,
 };
 
 struct BatchContentClassifier : public WriteBatch::Handler {
@@ -97,8 +105,16 @@ struct BatchContentClassifier : public WriteBatch::Handler {
     return Status::OK();
   }
 
-  Status MarkBeginPrepare() override {
+  Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override {
+    content_flags |= ContentFlags::HAS_BLOB_INDEX;
+    return Status::OK();
+  }
+
+  Status MarkBeginPrepare(bool unprepare) override {
     content_flags |= ContentFlags::HAS_BEGIN_PREPARE;
+    if (unprepare) {
+      content_flags |= ContentFlags::HAS_BEGIN_UNPREPARE;
+    }
     return Status::OK();
   }
 
@@ -137,6 +153,12 @@ WriteBatch::WriteBatch(const std::string& rep)
       max_bytes_(0),
       rep_(rep) {}
 
+WriteBatch::WriteBatch(std::string&& rep)
+    : save_points_(nullptr),
+      content_flags_(ContentFlags::DEFERRED),
+      max_bytes_(0),
+      rep_(std::move(rep)) {}
+
 WriteBatch::WriteBatch(const WriteBatch& src)
     : save_points_(src.save_points_),
       wal_term_point_(src.wal_term_point_),
@@ -144,7 +166,7 @@ WriteBatch::WriteBatch(const WriteBatch& src)
       max_bytes_(src.max_bytes_),
       rep_(src.rep_) {}
 
-WriteBatch::WriteBatch(WriteBatch&& src)
+WriteBatch::WriteBatch(WriteBatch&& src) noexcept
     : save_points_(std::move(src.save_points_)),
       wal_term_point_(std::move(src.wal_term_point_)),
       content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
@@ -171,7 +193,7 @@ WriteBatch::~WriteBatch() { delete save_points_; }
 
 WriteBatch::Handler::~Handler() { }
 
-void WriteBatch::Handler::LogData(const Slice& blob) {
+void WriteBatch::Handler::LogData(const Slice& /*blob*/) {
   // If the user has not specified something to do with blobs, then we ignore
   // them.
 }
@@ -286,7 +308,7 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag,
       if (!GetVarint32(input, column_family)) {
         return Status::Corruption("bad WriteBatch Put");
       }
-    // intentional fallthrough
+      FALLTHROUGH_INTENDED;
     case kTypeValue:
       if (!GetLengthPrefixedSlice(input, key) ||
           !GetLengthPrefixedSlice(input, value)) {
@@ -298,7 +320,7 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag,
       if (!GetVarint32(input, column_family)) {
         return Status::Corruption("bad WriteBatch Delete");
       }
-    // intentional fallthrough
+      FALLTHROUGH_INTENDED;
     case kTypeDeletion:
     case kTypeSingleDeletion:
       if (!GetLengthPrefixedSlice(input, key)) {
@@ -309,7 +331,7 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag,
       if (!GetVarint32(input, column_family)) {
         return Status::Corruption("bad WriteBatch DeleteRange");
       }
-    // intentional fallthrough
+      FALLTHROUGH_INTENDED;
     case kTypeRangeDeletion:
       // for range delete, "key" is begin_key, "value" is end_key
       if (!GetLengthPrefixedSlice(input, key) ||
@@ -321,13 +343,24 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag,
       if (!GetVarint32(input, column_family)) {
         return Status::Corruption("bad WriteBatch Merge");
       }
-    // intentional fallthrough
+      FALLTHROUGH_INTENDED;
     case kTypeMerge:
       if (!GetLengthPrefixedSlice(input, key) ||
           !GetLengthPrefixedSlice(input, value)) {
         return Status::Corruption("bad WriteBatch Merge");
       }
       break;
+    case kTypeColumnFamilyBlobIndex:
+      if (!GetVarint32(input, column_family)) {
+        return Status::Corruption("bad WriteBatch BlobIndex");
+      }
+      FALLTHROUGH_INTENDED;
+    case kTypeBlobIndex:
+      if (!GetLengthPrefixedSlice(input, key) ||
+          !GetLengthPrefixedSlice(input, value)) {
+        return Status::Corruption("bad WriteBatch BlobIndex");
+      }
+      break;
     case kTypeLogData:
       assert(blob != nullptr);
       if (!GetLengthPrefixedSlice(input, blob)) {
@@ -336,6 +369,11 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag,
       break;
     case kTypeNoop:
     case kTypeBeginPrepareXID:
+      // This indicates that the prepared batch is also persisted in the db.
+      // This is used in WritePreparedTxn
+    case kTypeBeginPersistedPrepareXID:
+      // This is used in WriteUnpreparedTxn
+    case kTypeBeginUnprepareXID:
       break;
     case kTypeEndPrepareXID:
       if (!GetLengthPrefixedSlice(input, xid)) {
@@ -366,16 +404,38 @@ Status WriteBatch::Iterate(Handler* handler) const {
 
   input.remove_prefix(WriteBatchInternal::kHeader);
   Slice key, value, blob, xid;
+  // Sometimes a sub-batch starts with a Noop. We want to exclude such Noops as
+  // the batch boundary symbols otherwise we would mis-count the number of
+  // batches. We do that by checking whether the accumulated batch is empty
+  // before seeing the next Noop.
+  bool empty_batch = true;
   int found = 0;
   Status s;
-  while (s.ok() && !input.empty() && handler->Continue()) {
-    char tag = 0;
-    uint32_t column_family = 0;  // default
-
-    s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
-                                 &blob, &xid);
-    if (!s.ok()) {
-      return s;
+  char tag = 0;
+  uint32_t column_family = 0;  // default
+  bool last_was_try_again = false;
+  while (((s.ok() && !input.empty()) || UNLIKELY(s.IsTryAgain())) &&
+         handler->Continue()) {
+    if (LIKELY(!s.IsTryAgain())) {
+      last_was_try_again = false;
+      tag = 0;
+      column_family = 0;  // default
+
+      s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
+                                   &blob, &xid);
+      if (!s.ok()) {
+        return s;
+      }
+    } else {
+      assert(s.IsTryAgain());
+      assert(!last_was_try_again); // to detect infinite loop bugs
+      if (UNLIKELY(last_was_try_again)) {
+        return Status::Corruption(
+            "two consecutive TryAgain in WriteBatch handler; this is either a "
+            "software bug or data corruption.");
+      }
+      last_was_try_again = true;
+      s = Status::OK();
     }
 
     switch (tag) {
@@ -384,60 +444,137 @@ Status WriteBatch::Iterate(Handler* handler) const {
         assert(content_flags_.load(std::memory_order_relaxed) &
                (ContentFlags::DEFERRED | ContentFlags::HAS_PUT));
         s = handler->PutCF(column_family, key, value);
-        found++;
+        if (LIKELY(s.ok())) {
+          empty_batch = false;
+          found++;
+        }
         break;
       case kTypeColumnFamilyDeletion:
       case kTypeDeletion:
         assert(content_flags_.load(std::memory_order_relaxed) &
                (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE));
         s = handler->DeleteCF(column_family, key);
-        found++;
+        if (LIKELY(s.ok())) {
+          empty_batch = false;
+          found++;
+        }
         break;
       case kTypeColumnFamilySingleDeletion:
       case kTypeSingleDeletion:
         assert(content_flags_.load(std::memory_order_relaxed) &
                (ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE));
         s = handler->SingleDeleteCF(column_family, key);
-        found++;
+        if (LIKELY(s.ok())) {
+          empty_batch = false;
+          found++;
+        }
         break;
       case kTypeColumnFamilyRangeDeletion:
       case kTypeRangeDeletion:
         assert(content_flags_.load(std::memory_order_relaxed) &
                (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE_RANGE));
         s = handler->DeleteRangeCF(column_family, key, value);
-        found++;
+        if (LIKELY(s.ok())) {
+          empty_batch = false;
+          found++;
+        }
         break;
       case kTypeColumnFamilyMerge:
       case kTypeMerge:
         assert(content_flags_.load(std::memory_order_relaxed) &
                (ContentFlags::DEFERRED | ContentFlags::HAS_MERGE));
         s = handler->MergeCF(column_family, key, value);
-        found++;
+        if (LIKELY(s.ok())) {
+          empty_batch = false;
+          found++;
+        }
+        break;
+      case kTypeColumnFamilyBlobIndex:
+      case kTypeBlobIndex:
+        assert(content_flags_.load(std::memory_order_relaxed) &
+               (ContentFlags::DEFERRED | ContentFlags::HAS_BLOB_INDEX));
+        s = handler->PutBlobIndexCF(column_family, key, value);
+        if (LIKELY(s.ok())) {
+          found++;
+        }
         break;
       case kTypeLogData:
         handler->LogData(blob);
+        // A batch might have nothing but LogData. It is still a batch.
+        empty_batch = false;
         break;
       case kTypeBeginPrepareXID:
         assert(content_flags_.load(std::memory_order_relaxed) &
                (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
         handler->MarkBeginPrepare();
+        empty_batch = false;
+        if (!handler->WriteAfterCommit()) {
+          s = Status::NotSupported(
+              "WriteCommitted txn tag when write_after_commit_ is disabled (in "
+              "WritePrepared/WriteUnprepared mode). If it is not due to "
+              "corruption, the WAL must be emptied before changing the "
+              "WritePolicy.");
+        }
+        if (handler->WriteBeforePrepare()) {
+          s = Status::NotSupported(
+              "WriteCommitted txn tag when write_before_prepare_ is enabled "
+              "(in WriteUnprepared mode). If it is not due to corruption, the "
+              "WAL must be emptied before changing the WritePolicy.");
+        }
+        break;
+      case kTypeBeginPersistedPrepareXID:
+        assert(content_flags_.load(std::memory_order_relaxed) &
+               (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
+        handler->MarkBeginPrepare();
+        empty_batch = false;
+        if (handler->WriteAfterCommit()) {
+          s = Status::NotSupported(
+              "WritePrepared/WriteUnprepared txn tag when write_after_commit_ "
+              "is enabled (in default WriteCommitted mode). If it is not due "
+              "to corruption, the WAL must be emptied before changing the "
+              "WritePolicy.");
+        }
+        break;
+      case kTypeBeginUnprepareXID:
+        assert(content_flags_.load(std::memory_order_relaxed) &
+               (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_UNPREPARE));
+        handler->MarkBeginPrepare(true /* unprepared */);
+        empty_batch = false;
+        if (handler->WriteAfterCommit()) {
+          s = Status::NotSupported(
+              "WriteUnprepared txn tag when write_after_commit_ is enabled (in "
+              "default WriteCommitted mode). If it is not due to corruption, "
+              "the WAL must be emptied before changing the WritePolicy.");
+        }
+        if (!handler->WriteBeforePrepare()) {
+          s = Status::NotSupported(
+              "WriteUnprepared txn tag when write_before_prepare_ is disabled "
+              "(in WriteCommitted/WritePrepared mode). If it is not due to "
+              "corruption, the WAL must be emptied before changing the "
+              "WritePolicy.");
+        }
         break;
       case kTypeEndPrepareXID:
         assert(content_flags_.load(std::memory_order_relaxed) &
                (ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE));
         handler->MarkEndPrepare(xid);
+        empty_batch = true;
         break;
       case kTypeCommitXID:
         assert(content_flags_.load(std::memory_order_relaxed) &
                (ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT));
         handler->MarkCommit(xid);
+        empty_batch = true;
         break;
       case kTypeRollbackXID:
         assert(content_flags_.load(std::memory_order_relaxed) &
                (ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK));
         handler->MarkRollback(xid);
+        empty_batch = true;
         break;
       case kTypeNoop:
+        handler->MarkNoop(empty_batch);
+        empty_batch = true;
         break;
       default:
         return Status::Corruption("unknown WriteBatch tag");
@@ -453,6 +590,14 @@ Status WriteBatch::Iterate(Handler* handler) const {
   }
 }
 
+bool WriteBatchInternal::IsLatestPersistentState(const WriteBatch* b) {
+  return b->is_latest_persistent_state_;
+}
+
+void WriteBatchInternal::SetAsLastestPersistentState(WriteBatch* b) {
+  b->is_latest_persistent_state_ = true;
+}
+
 int WriteBatchInternal::Count(const WriteBatch* b) {
   return DecodeFixed32(b->rep_.data() + 8);
 }
@@ -469,12 +614,19 @@ void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
   EncodeFixed64(&b->rep_[0], seq);
 }
 
-size_t WriteBatchInternal::GetFirstOffset(WriteBatch* b) {
+size_t WriteBatchInternal::GetFirstOffset(WriteBatch* /*b*/) {
   return WriteBatchInternal::kHeader;
 }
 
 Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
                                const Slice& key, const Slice& value) {
+  if (key.size() > size_t{port::kMaxUint32}) {
+    return Status::InvalidArgument("key is too large");
+  }
+  if (value.size() > size_t{port::kMaxUint32}) {
+    return Status::InvalidArgument("value is too large");
+  }
+
   LocalSavePoint save(b);
   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
   if (column_family_id == 0) {
@@ -497,8 +649,33 @@ Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
                                  value);
 }
 
+Status WriteBatchInternal::CheckSlicePartsLength(const SliceParts& key,
+                                                 const SliceParts& value) {
+  size_t total_key_bytes = 0;
+  for (int i = 0; i < key.num_parts; ++i) {
+    total_key_bytes += key.parts[i].size();
+  }
+  if (total_key_bytes >= size_t{port::kMaxUint32}) {
+    return Status::InvalidArgument("key is too large");
+  }
+
+  size_t total_value_bytes = 0;
+  for (int i = 0; i < value.num_parts; ++i) {
+    total_value_bytes += value.parts[i].size();
+  }
+  if (total_value_bytes >= size_t{port::kMaxUint32}) {
+    return Status::InvalidArgument("value is too large");
+  }
+  return Status::OK();
+}
+
 Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
                                const SliceParts& key, const SliceParts& value) {
+  Status s = CheckSlicePartsLength(key, value);
+  if (!s.ok()) {
+    return s;
+  }
+
   LocalSavePoint save(b);
   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
   if (column_family_id == 0) {
@@ -526,7 +703,9 @@ Status WriteBatchInternal::InsertNoop(WriteBatch* b) {
   return Status::OK();
 }
 
-Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid) {
+Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid,
+                                          bool write_after_commit,
+                                          bool unprepared_batch) {
   // a manually constructed batch can only contain one prepare section
   assert(b->rep_[12] == static_cast<char>(kTypeNoop));
 
@@ -538,13 +717,21 @@ Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid) {
   }
 
   // rewrite noop as begin marker
-  b->rep_[12] = static_cast<char>(kTypeBeginPrepareXID);
+  b->rep_[12] = static_cast<char>(
+      write_after_commit ? kTypeBeginPrepareXID
+                         : (unprepared_batch ? kTypeBeginUnprepareXID
+                                             : kTypeBeginPersistedPrepareXID));
   b->rep_.push_back(static_cast<char>(kTypeEndPrepareXID));
   PutLengthPrefixedSlice(&b->rep_, xid);
   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
                               ContentFlags::HAS_END_PREPARE |
                               ContentFlags::HAS_BEGIN_PREPARE,
                           std::memory_order_relaxed);
+  if (unprepared_batch) {
+    b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
+                                ContentFlags::HAS_BEGIN_UNPREPARE,
+                            std::memory_order_relaxed);
+  }
   return Status::OK();
 }
 
@@ -712,6 +899,13 @@ Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
 
 Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
                                  const Slice& key, const Slice& value) {
+  if (key.size() > size_t{port::kMaxUint32}) {
+    return Status::InvalidArgument("key is too large");
+  }
+  if (value.size() > size_t{port::kMaxUint32}) {
+    return Status::InvalidArgument("value is too large");
+  }
+
   LocalSavePoint save(b);
   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
   if (column_family_id == 0) {
@@ -737,6 +931,11 @@ Status WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key,
 Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
                                  const SliceParts& key,
                                  const SliceParts& value) {
+  Status s = CheckSlicePartsLength(key, value);
+  if (!s.ok()) {
+    return s;
+  }
+
   LocalSavePoint save(b);
   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
   if (column_family_id == 0) {
@@ -759,6 +958,25 @@ Status WriteBatch::Merge(ColumnFamilyHandle* column_family,
                                    value);
 }
 
+Status WriteBatchInternal::PutBlobIndex(WriteBatch* b,
+                                        uint32_t column_family_id,
+                                        const Slice& key, const Slice& value) {
+  LocalSavePoint save(b);
+  WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
+  if (column_family_id == 0) {
+    b->rep_.push_back(static_cast<char>(kTypeBlobIndex));
+  } else {
+    b->rep_.push_back(static_cast<char>(kTypeColumnFamilyBlobIndex));
+    PutVarint32(&b->rep_, column_family_id);
+  }
+  PutLengthPrefixedSlice(&b->rep_, key);
+  PutLengthPrefixedSlice(&b->rep_, value);
+  b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
+                              ContentFlags::HAS_BLOB_INDEX,
+                          std::memory_order_relaxed);
+  return save.commit();
+}
+
 Status WriteBatch::PutLogData(const Slice& blob) {
   LocalSavePoint save(this);
   rep_.push_back(static_cast<char>(kTypeLogData));
@@ -801,6 +1019,17 @@ Status WriteBatch::RollbackToSavePoint() {
   return Status::OK();
 }
 
+Status WriteBatch::PopSavePoint() {
+  if (save_points_ == nullptr || save_points_->stack.size() == 0) {
+    return Status::NotFound();
+  }
+
+  // Pop the most recent savepoint off the stack
+  save_points_->stack.pop();
+
+  return Status::OK();
+}
+
 class MemTableInserter : public WriteBatch::Handler {
 
   SequenceNumber sequence_;
@@ -820,13 +1049,24 @@ class MemTableInserter : public WriteBatch::Handler {
   // cause memory allocations though unused.
   // Make creation optional but do not incur
   // unique_ptr additional allocation
-  using 
-  MemPostInfoMap = std::map<MemTable*, MemTablePostProcessInfo>;
-  using
-  PostMapType = std::aligned_storage<sizeof(MemPostInfoMap)>::type;
+  using MemPostInfoMap = std::map<MemTable*, MemTablePostProcessInfo>;
+  using PostMapType = std::aligned_storage<sizeof(MemPostInfoMap)>::type;
   PostMapType mem_post_info_map_;
   // current recovered transaction we are rebuilding (recovery)
   WriteBatch* rebuilding_trx_;
+  SequenceNumber rebuilding_trx_seq_;
+  // Increase seq number once per each write batch. Otherwise increase it once
+  // per key.
+  bool seq_per_batch_;
+  // Whether the memtable write will be done only after the commit
+  bool write_after_commit_;
+  // Whether memtable write can be done before prepare
+  bool write_before_prepare_;
+  // Whether this batch was unprepared or not
+  bool unprepared_batch_;
+  using DupDetector = std::aligned_storage<sizeof(DuplicateDetector)>::type;
+  DupDetector       duplicate_detector_;
+  bool              dup_dectector_on_;
 
   MemPostInfoMap& GetPostMap() {
     assert(concurrent_memtable_writes_);
@@ -837,15 +1077,33 @@ class MemTableInserter : public WriteBatch::Handler {
     return *reinterpret_cast<MemPostInfoMap*>(&mem_post_info_map_);
   }
 
-public:
+  bool IsDuplicateKeySeq(uint32_t column_family_id, const Slice& key) {
+    assert(!write_after_commit_);
+    assert(rebuilding_trx_ != nullptr);
+    if (!dup_dectector_on_) {
+      new (&duplicate_detector_) DuplicateDetector(db_);
+      dup_dectector_on_ = true;
+    }
+    return reinterpret_cast<DuplicateDetector*>
+      (&duplicate_detector_)->IsDuplicateKeySeq(column_family_id, key, sequence_);
+  }
+
+ protected:
+  virtual bool WriteBeforePrepare() const override {
+    return write_before_prepare_;
+  }
+  virtual bool WriteAfterCommit() const override { return write_after_commit_; }
+
+ public:
   // cf_mems should not be shared with concurrent inserters
-  MemTableInserter(SequenceNumber sequence, ColumnFamilyMemTables* cf_mems,
+  MemTableInserter(SequenceNumber _sequence, ColumnFamilyMemTables* cf_mems,
                    FlushScheduler* flush_scheduler,
                    bool ignore_missing_column_families,
                    uint64_t recovering_log_number, DB* db,
                    bool concurrent_memtable_writes,
-                   bool* has_valid_writes = nullptr)
-      : sequence_(sequence),
+                   bool* has_valid_writes = nullptr, bool seq_per_batch = false,
+                   bool batch_per_txn = true)
+      : sequence_(_sequence),
         cf_mems_(cf_mems),
         flush_scheduler_(flush_scheduler),
         ignore_missing_column_families_(ignore_missing_column_families),
@@ -855,23 +1113,56 @@ public:
         concurrent_memtable_writes_(concurrent_memtable_writes),
         post_info_created_(false),
         has_valid_writes_(has_valid_writes),
-        rebuilding_trx_(nullptr) {
+        rebuilding_trx_(nullptr),
+        rebuilding_trx_seq_(0),
+        seq_per_batch_(seq_per_batch),
+        // Write after commit currently uses one seq per key (instead of per
+        // batch). So seq_per_batch being false indicates write_after_commit
+        // approach.
+        write_after_commit_(!seq_per_batch),
+        // WriteUnprepared can write WriteBatches per transaction, so
+        // batch_per_txn being false indicates write_before_prepare.
+        write_before_prepare_(!batch_per_txn),
+        unprepared_batch_(false),
+        duplicate_detector_(),
+        dup_dectector_on_(false) {
     assert(cf_mems_);
   }
 
   ~MemTableInserter() {
+    if (dup_dectector_on_) {
+      reinterpret_cast<DuplicateDetector*>
+        (&duplicate_detector_)->~DuplicateDetector();
+    }
     if (post_info_created_) {
       reinterpret_cast<MemPostInfoMap*>
         (&mem_post_info_map_)->~MemPostInfoMap();
     }
+    delete rebuilding_trx_;
   }
 
   MemTableInserter(const MemTableInserter&) = delete;
   MemTableInserter& operator=(const MemTableInserter&) = delete;
 
+  // The batch seq is regularly restarted; In normal mode it is set when
+  // MemTableInserter is constructed in the write thread and in recovery mode it
+  // is set when a batch, which is tagged with seq, is read from the WAL.
+  // Within a sequenced batch, which could be a merge of multiple batches, we
+  // have two policies to advance the seq: i) seq_per_key (default) and ii)
+  // seq_per_batch. To implement the latter we need to mark the boundary between
+  // the individual batches. The approach is this: 1) Use the terminating
+  // markers to indicate the boundary (kTypeEndPrepareXID, kTypeCommitXID,
+  // kTypeRollbackXID) 2) Terminate a batch with kTypeNoop in the absence of a
+  // natural boundary marker.
+  void MaybeAdvanceSeq(bool batch_boundry = false) {
+    if (batch_boundry == seq_per_batch_) {
+      sequence_++;
+    }
+  }
+
   void set_log_number_ref(uint64_t log) { log_number_ref_ = log; }
 
-  SequenceNumber get_final_sequence() const { return sequence_; }
+  SequenceNumber sequence() const { return sequence_; }
 
   void PostProcess() {
     assert(concurrent_memtable_writes_);
@@ -924,28 +1215,48 @@ public:
     return true;
   }
 
-  virtual Status PutCF(uint32_t column_family_id, const Slice& key,
-                       const Slice& value) override {
-    if (rebuilding_trx_ != nullptr) {
+  Status PutCFImpl(uint32_t column_family_id, const Slice& key,
+                   const Slice& value, ValueType value_type) {
+    // optimize for non-recovery mode
+    if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
       WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
       return Status::OK();
+      // else insert the values to the memtable right away
     }
 
     Status seek_status;
-    if (!SeekToColumnFamily(column_family_id, &seek_status)) {
-      ++sequence_;
+    if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
+      bool batch_boundry = false;
+      if (rebuilding_trx_ != nullptr) {
+        assert(!write_after_commit_);
+        // The CF is probably flushed and hence no need for insert but we still
+        // need to keep track of the keys for upcoming rollback/commit.
+        WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
+        batch_boundry = IsDuplicateKeySeq(column_family_id, key);
+      }
+      MaybeAdvanceSeq(batch_boundry);
       return seek_status;
     }
+    Status ret_status;
 
     MemTable* mem = cf_mems_->GetMemTable();
-    auto* moptions = mem->GetMemTableOptions();
+    auto* moptions = mem->GetImmutableMemTableOptions();
+    // inplace_update_support is inconsistent with snapshots, and therefore with
+    // any kind of transactions including the ones that use seq_per_batch
+    assert(!seq_per_batch_ || !moptions->inplace_update_support);
     if (!moptions->inplace_update_support) {
-      mem->Add(sequence_, kTypeValue, key, value, concurrent_memtable_writes_,
-               get_post_process_info(mem));
+      bool mem_res =
+          mem->Add(sequence_, value_type, key, value,
+                   concurrent_memtable_writes_, get_post_process_info(mem));
+      if (UNLIKELY(!mem_res)) {
+        assert(seq_per_batch_);
+        ret_status = Status::TryAgain("key+seq exists");
+        const bool BATCH_BOUNDRY = true;
+        MaybeAdvanceSeq(BATCH_BOUNDRY);
+      }
     } else if (moptions->inplace_callback == nullptr) {
       assert(!concurrent_memtable_writes_);
       mem->Update(sequence_, key, value);
-      RecordTick(moptions->statistics, NUMBER_KEYS_UPDATED);
     } else {
       assert(!concurrent_memtable_writes_);
       if (mem->UpdateCallback(sequence_, key, value)) {
@@ -954,6 +1265,9 @@ public:
         SnapshotImpl read_from_snapshot;
         read_from_snapshot.number_ = sequence_;
         ReadOptions ropts;
+        // it's going to be overwritten for sure, so no point caching data block
+        // containing the old version
+        ropts.fill_cache = false;
         ropts.snapshot = &read_from_snapshot;
 
         std::string prev_value;
@@ -975,77 +1289,154 @@ public:
                                                  value, &merged_value);
         if (status == UpdateStatus::UPDATED_INPLACE) {
           // prev_value is updated in-place with final value.
-          mem->Add(sequence_, kTypeValue, key, Slice(prev_buffer, prev_size));
+          bool mem_res __attribute__((__unused__));
+          mem_res = mem->Add(
+              sequence_, value_type, key, Slice(prev_buffer, prev_size));
+          assert(mem_res);
           RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
         } else if (status == UpdateStatus::UPDATED) {
           // merged_value contains the final value.
-          mem->Add(sequence_, kTypeValue, key, Slice(merged_value));
+          bool mem_res __attribute__((__unused__));
+          mem_res =
+              mem->Add(sequence_, value_type, key, Slice(merged_value));
+          assert(mem_res);
           RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
         }
       }
     }
-    // Since all Puts are logged in trasaction logs (if enabled), always bump
+    // optimize for non-recovery mode
+    if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
+      assert(!write_after_commit_);
+      // If the ret_status is TryAgain then let the next try to add the ky to
+      // the rebuilding transaction object.
+      WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
+    }
+    // Since all Puts are logged in transaction logs (if enabled), always bump
     // sequence number. Even if the update eventually fails and does not result
     // in memtable add/update.
-    sequence_++;
+    MaybeAdvanceSeq();
     CheckMemtableFull();
-    return Status::OK();
+    return ret_status;
+  }
+
+  virtual Status PutCF(uint32_t column_family_id, const Slice& key,
+                       const Slice& value) override {
+    return PutCFImpl(column_family_id, key, value, kTypeValue);
   }
 
-  Status DeleteImpl(uint32_t column_family_id, const Slice& key,
+  Status DeleteImpl(uint32_t /*column_family_id*/, const Slice& key,
                     const Slice& value, ValueType delete_type) {
+    Status ret_status;
     MemTable* mem = cf_mems_->GetMemTable();
-    mem->Add(sequence_, delete_type, key, value, concurrent_memtable_writes_,
-             get_post_process_info(mem));
-    sequence_++;
+    bool mem_res =
+        mem->Add(sequence_, delete_type, key, value,
+                 concurrent_memtable_writes_, get_post_process_info(mem));
+    if (UNLIKELY(!mem_res)) {
+      assert(seq_per_batch_);
+      ret_status = Status::TryAgain("key+seq exists");
+      const bool BATCH_BOUNDRY = true;
+      MaybeAdvanceSeq(BATCH_BOUNDRY);
+    }
+    MaybeAdvanceSeq();
     CheckMemtableFull();
-    return Status::OK();
+    return ret_status;
   }
 
   virtual Status DeleteCF(uint32_t column_family_id,
                           const Slice& key) override {
-    if (rebuilding_trx_ != nullptr) {
+    // optimize for non-recovery mode
+    if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
       WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
       return Status::OK();
+      // else insert the values to the memtable right away
     }
 
     Status seek_status;
-    if (!SeekToColumnFamily(column_family_id, &seek_status)) {
-      ++sequence_;
+    if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
+      bool batch_boundry = false;
+      if (rebuilding_trx_ != nullptr) {
+        assert(!write_after_commit_);
+        // The CF is probably flushed and hence no need for insert but we still
+        // need to keep track of the keys for upcoming rollback/commit.
+        WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
+        batch_boundry = IsDuplicateKeySeq(column_family_id, key);
+      }
+      MaybeAdvanceSeq(batch_boundry);
       return seek_status;
     }
 
-    return DeleteImpl(column_family_id, key, Slice(), kTypeDeletion);
+    auto ret_status = DeleteImpl(column_family_id, key, Slice(), kTypeDeletion);
+    // optimize for non-recovery mode
+    if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
+      assert(!write_after_commit_);
+      // If the ret_status is TryAgain then let the next try to add the ky to
+      // the rebuilding transaction object.
+      WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
+    }
+    return ret_status;
   }
 
   virtual Status SingleDeleteCF(uint32_t column_family_id,
                                 const Slice& key) override {
-    if (rebuilding_trx_ != nullptr) {
+    // optimize for non-recovery mode
+    if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
       WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key);
       return Status::OK();
+      // else insert the values to the memtable right away
     }
 
     Status seek_status;
-    if (!SeekToColumnFamily(column_family_id, &seek_status)) {
-      ++sequence_;
+    if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
+      bool batch_boundry = false;
+      if (rebuilding_trx_ != nullptr) {
+        assert(!write_after_commit_);
+        // The CF is probably flushed and hence no need for insert but we still
+        // need to keep track of the keys for upcoming rollback/commit.
+        WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id,
+                                         key);
+        batch_boundry = IsDuplicateKeySeq(column_family_id, key);
+      }
+      MaybeAdvanceSeq(batch_boundry);
       return seek_status;
     }
 
-    return DeleteImpl(column_family_id, key, Slice(), kTypeSingleDeletion);
+    auto ret_status =
+        DeleteImpl(column_family_id, key, Slice(), kTypeSingleDeletion);
+    // optimize for non-recovery mode
+    if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
+      assert(!write_after_commit_);
+      // If the ret_status is TryAgain then let the next try to add the ky to
+      // the rebuilding transaction object.
+      WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key);
+    }
+    return ret_status;
   }
 
   virtual Status DeleteRangeCF(uint32_t column_family_id,
                                const Slice& begin_key,
                                const Slice& end_key) override {
-    if (rebuilding_trx_ != nullptr) {
+    // optimize for non-recovery mode
+    if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
       WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
                                       begin_key, end_key);
       return Status::OK();
+      // else insert the values to the memtable right away
     }
 
     Status seek_status;
-    if (!SeekToColumnFamily(column_family_id, &seek_status)) {
-      ++sequence_;
+    if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
+      bool batch_boundry = false;
+      if (rebuilding_trx_ != nullptr) {
+        assert(!write_after_commit_);
+        // The CF is probably flushed and hence no need for insert but we still
+        // need to keep track of the keys for upcoming rollback/commit.
+        WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
+                                        begin_key, end_key);
+        // TODO(myabandeh): when transactional DeleteRange support is added,
+        // check if end_key must also be added.
+        batch_boundry = IsDuplicateKeySeq(column_family_id, begin_key);
+      }
+      MaybeAdvanceSeq(batch_boundry);
       return seek_status;
     }
     if (db_ != nullptr) {
@@ -1062,25 +1453,47 @@ public:
       }
     }
 
-    return DeleteImpl(column_family_id, begin_key, end_key, kTypeRangeDeletion);
+    auto ret_status =
+        DeleteImpl(column_family_id, begin_key, end_key, kTypeRangeDeletion);
+    // optimize for non-recovery mode
+    if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
+      assert(!write_after_commit_);
+      // If the ret_status is TryAgain then let the next try to add the ky to
+      // the rebuilding transaction object.
+      WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
+                                      begin_key, end_key);
+    }
+    return ret_status;
   }
 
   virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
                          const Slice& value) override {
     assert(!concurrent_memtable_writes_);
-    if (rebuilding_trx_ != nullptr) {
+    // optimize for non-recovery mode
+    if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
       WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value);
       return Status::OK();
+      // else insert the values to the memtable right away
     }
 
     Status seek_status;
-    if (!SeekToColumnFamily(column_family_id, &seek_status)) {
-      ++sequence_;
+    if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
+      bool batch_boundry = false;
+      if (rebuilding_trx_ != nullptr) {
+        assert(!write_after_commit_);
+        // The CF is probably flushed and hence no need for insert but we still
+        // need to keep track of the keys for upcoming rollback/commit.
+        WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key,
+                                  value);
+        batch_boundry = IsDuplicateKeySeq(column_family_id, key);
+      }
+      MaybeAdvanceSeq(batch_boundry);
       return seek_status;
     }
 
+    Status ret_status;
     MemTable* mem = cf_mems_->GetMemTable();
-    auto* moptions = mem->GetMemTableOptions();
+    auto* moptions = mem->GetImmutableMemTableOptions();
     bool perform_merge = false;
 
     // If we pass DB through and options.max_successive_merges is hit
@@ -1134,18 +1547,43 @@ public:
         perform_merge = false;
       } else {
         // 3) Add value to memtable
-        mem->Add(sequence_, kTypeValue, key, new_value);
+        bool mem_res = mem->Add(sequence_, kTypeValue, key, new_value);
+        if (UNLIKELY(!mem_res)) {
+          assert(seq_per_batch_);
+          ret_status = Status::TryAgain("key+seq exists");
+          const bool BATCH_BOUNDRY = true;
+          MaybeAdvanceSeq(BATCH_BOUNDRY);
+        }
       }
     }
 
     if (!perform_merge) {
       // Add merge operator to memtable
-      mem->Add(sequence_, kTypeMerge, key, value);
+      bool mem_res = mem->Add(sequence_, kTypeMerge, key, value);
+      if (UNLIKELY(!mem_res)) {
+        assert(seq_per_batch_);
+        ret_status = Status::TryAgain("key+seq exists");
+        const bool BATCH_BOUNDRY = true;
+        MaybeAdvanceSeq(BATCH_BOUNDRY);
+      }
     }
 
-    sequence_++;
+    // optimize for non-recovery mode
+    if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
+      assert(!write_after_commit_);
+      // If the ret_status is TryAgain then let the next try to add the ky to
+      // the rebuilding transaction object.
+      WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value);
+    }
+    MaybeAdvanceSeq();
     CheckMemtableFull();
-    return Status::OK();
+    return ret_status;
+  }
+
+  virtual Status PutBlobIndexCF(uint32_t column_family_id, const Slice& key,
+                                const Slice& value) override {
+    // Same as PutCF except for value type.
+    return PutCFImpl(column_family_id, key, value, kTypeBlobIndex);
   }
 
   void CheckMemtableFull() {
@@ -1161,7 +1599,9 @@ public:
     }
   }
 
-  Status MarkBeginPrepare() override {
+  // The write batch handler calls MarkBeginPrepare with unprepare set to true
+  // if it encounters the kTypeBeginUnprepareXID marker.
+  Status MarkBeginPrepare(bool unprepare) override {
     assert(rebuilding_trx_ == nullptr);
     assert(db_);
 
@@ -1176,14 +1616,15 @@ public:
 
       // we are now iterating through a prepared section
       rebuilding_trx_ = new WriteBatch();
+      rebuilding_trx_seq_ = sequence_;
+      // We only call MarkBeginPrepare once per batch, and unprepared_batch_
+      // is initialized to false by default.
+      assert(!unprepared_batch_);
+      unprepared_batch_ = unprepare;
+
       if (has_valid_writes_ != nullptr) {
         *has_valid_writes_ = true;
       }
-    } else {
-      // in non-recovery we ignore prepare markers
-      // and insert the values directly. making sure we have a
-      // log for each insertion to reference.
-      assert(log_number_ref_ > 0);
     }
 
     return Status::OK();
@@ -1195,17 +1636,36 @@ public:
 
     if (recovering_log_number_ != 0) {
       assert(db_->allow_2pc());
+      size_t batch_cnt =
+          write_after_commit_
+              ? 0  // 0 will disable further checks
+              : static_cast<size_t>(sequence_ - rebuilding_trx_seq_ + 1);
       db_->InsertRecoveredTransaction(recovering_log_number_, name.ToString(),
-                                      rebuilding_trx_);
+                                      rebuilding_trx_, rebuilding_trx_seq_,
+                                      batch_cnt, unprepared_batch_);
       rebuilding_trx_ = nullptr;
     } else {
       assert(rebuilding_trx_ == nullptr);
-      assert(log_number_ref_ > 0);
     }
+    const bool batch_boundry = true;
+    MaybeAdvanceSeq(batch_boundry);
 
     return Status::OK();
   }
 
+  Status MarkNoop(bool empty_batch) override {
+    // A hack in pessimistic transaction could result into a noop at the start
+    // of the write batch, that should be ignored.
+    if (!empty_batch) {
+      // In the absence of Prepare markers, a kTypeNoop tag indicates the end of
+      // a batch. This happens when write batch commits skipping the prepare
+      // phase.
+      const bool batch_boundry = true;
+      MaybeAdvanceSeq(batch_boundry);
+    }
+    return Status::OK();
+  }
+
   Status MarkCommit(const Slice& name) override {
     assert(db_);
 
@@ -1217,17 +1677,23 @@ public:
       // and commit.
       auto trx = db_->GetRecoveredTransaction(name.ToString());
 
-      // the log contaiting the prepared section may have
+      // the log containing the prepared section may have
       // been released in the last incarnation because the
       // data was flushed to L0
       if (trx != nullptr) {
         // at this point individual CF lognumbers will prevent
         // duplicate re-insertion of values.
         assert(log_number_ref_ == 0);
-        // all insertes must reference this trx log number
-        log_number_ref_ = trx->log_number_;
-        s = trx->batch_->Iterate(this);
-        log_number_ref_ = 0;
+        if (write_after_commit_) {
+          // write_after_commit_ can only have one batch in trx.
+          assert(trx->batches_.size() == 1);
+          const auto& batch_info = trx->batches_.begin()->second;
+          // all inserts must reference this trx log number
+          log_number_ref_ = batch_info.log_number_;
+          s = batch_info.batch_->Iterate(this);
+          log_number_ref_ = 0;
+        }
+        // else the values are already inserted before the commit
 
         if (s.ok()) {
           db_->DeleteRecoveredTransaction(name.ToString());
@@ -1237,8 +1703,13 @@ public:
         }
       }
     } else {
-      // in non recovery we simply ignore this tag
+      // When writes are not delayed until commit, there is no disconnect
+      // between a memtable write and the WAL that supports it. So the commit
+      // need not reference any log as the only log to which it depends.
+      assert(!write_after_commit_ || log_number_ref_ > 0);
     }
+    const bool batch_boundry = true;
+    MaybeAdvanceSeq(batch_boundry);
 
     return s;
   }
@@ -1259,6 +1730,9 @@ public:
       // in non recovery we simply ignore this tag
     }
 
+    const bool batch_boundry = true;
+    MaybeAdvanceSeq(batch_boundry);
+
     return Status::OK();
   }
 
@@ -1278,40 +1752,55 @@ public:
 // 3) During Write(), in a concurrent context where memtables has been cloned
 // The reason is that it calls memtables->Seek(), which has a stateful cache
 Status WriteBatchInternal::InsertInto(
-    const autovector<WriteThread::Writer*>& writers, SequenceNumber sequence,
+    WriteThread::WriteGroup& write_group, SequenceNumber sequence,
     ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
     bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db,
-    bool concurrent_memtable_writes) {
-  MemTableInserter inserter(sequence, memtables, flush_scheduler,
-                            ignore_missing_column_families, recovery_log_number,
-                            db, concurrent_memtable_writes);
-  for (size_t i = 0; i < writers.size(); i++) {
-    auto w = writers[i];
+    bool concurrent_memtable_writes, bool seq_per_batch, bool batch_per_txn) {
+  MemTableInserter inserter(
+      sequence, memtables, flush_scheduler, ignore_missing_column_families,
+      recovery_log_number, db, concurrent_memtable_writes,
+      nullptr /*has_valid_writes*/, seq_per_batch, batch_per_txn);
+  for (auto w : write_group) {
+    if (w->CallbackFailed()) {
+      continue;
+    }
+    w->sequence = inserter.sequence();
     if (!w->ShouldWriteToMemtable()) {
+      // In seq_per_batch_ mode this advances the seq by one.
+      inserter.MaybeAdvanceSeq(true);
       continue;
     }
+    SetSequence(w->batch, inserter.sequence());
     inserter.set_log_number_ref(w->log_ref);
     w->status = w->batch->Iterate(&inserter);
     if (!w->status.ok()) {
       return w->status;
     }
+    assert(!seq_per_batch || w->batch_cnt != 0);
+    assert(!seq_per_batch || inserter.sequence() - w->sequence == w->batch_cnt);
   }
   return Status::OK();
 }
 
-Status WriteBatchInternal::InsertInto(WriteThread::Writer* writer,
-                                      ColumnFamilyMemTables* memtables,
-                                      FlushScheduler* flush_scheduler,
-                                      bool ignore_missing_column_families,
-                                      uint64_t log_number, DB* db,
-                                      bool concurrent_memtable_writes) {
-  MemTableInserter inserter(WriteBatchInternal::Sequence(writer->batch),
-                            memtables, flush_scheduler,
-                            ignore_missing_column_families, log_number, db,
-                            concurrent_memtable_writes);
+Status WriteBatchInternal::InsertInto(
+    WriteThread::Writer* writer, SequenceNumber sequence,
+    ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
+    bool ignore_missing_column_families, uint64_t log_number, DB* db,
+    bool concurrent_memtable_writes, bool seq_per_batch, size_t batch_cnt,
+    bool batch_per_txn) {
+#ifdef NDEBUG
+  (void)batch_cnt;
+#endif
   assert(writer->ShouldWriteToMemtable());
+  MemTableInserter inserter(
+      sequence, memtables, flush_scheduler, ignore_missing_column_families,
+      log_number, db, concurrent_memtable_writes, nullptr /*has_valid_writes*/,
+      seq_per_batch, batch_per_txn);
+  SetSequence(writer->batch, sequence);
   inserter.set_log_number_ref(writer->log_ref);
   Status s = writer->batch->Iterate(&inserter);
+  assert(!seq_per_batch || batch_cnt != 0);
+  assert(!seq_per_batch || inserter.sequence() - sequence == batch_cnt);
   if (concurrent_memtable_writes) {
     inserter.PostProcess();
   }
@@ -1322,14 +1811,15 @@ Status WriteBatchInternal::InsertInto(
     const WriteBatch* batch, ColumnFamilyMemTables* memtables,
     FlushScheduler* flush_scheduler, bool ignore_missing_column_families,
     uint64_t log_number, DB* db, bool concurrent_memtable_writes,
-    SequenceNumber* last_seq_used, bool* has_valid_writes) {
-  MemTableInserter inserter(WriteBatchInternal::Sequence(batch), memtables,
-                            flush_scheduler, ignore_missing_column_families,
-                            log_number, db, concurrent_memtable_writes,
-                            has_valid_writes);
+    SequenceNumber* next_seq, bool* has_valid_writes, bool seq_per_batch,
+    bool batch_per_txn) {
+  MemTableInserter inserter(Sequence(batch), memtables, flush_scheduler,
+                            ignore_missing_column_families, log_number, db,
+                            concurrent_memtable_writes, has_valid_writes,
+                            seq_per_batch, batch_per_txn);
   Status s = batch->Iterate(&inserter);
-  if (last_seq_used != nullptr) {
-    *last_seq_used = inserter.get_final_sequence();
+  if (next_seq != nullptr) {
+    *next_seq = inserter.sequence();
   }
   if (concurrent_memtable_writes) {
     inserter.PostProcess();