]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/write_batch.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / write_batch.cc
index 30480f64e16625ba67966072d44ead4979720421..d578db59bac0161e48074b5ec5f21176bf8284ae 100644 (file)
 #include <stack>
 #include <stdexcept>
 #include <type_traits>
+#include <unordered_map>
 #include <vector>
 
 #include "db/column_family.h"
-#include "db/db_impl.h"
+#include "db/db_impl/db_impl.h"
 #include "db/dbformat.h"
 #include "db/flush_scheduler.h"
 #include "db/memtable.h"
 #include "db/merge_context.h"
 #include "db/snapshot_impl.h"
+#include "db/trim_history_scheduler.h"
 #include "db/write_batch_internal.h"
 #include "monitoring/perf_context_imp.h"
 #include "monitoring/statistics.h"
 #include "rocksdb/merge_operator.h"
+#include "util/autovector.h"
+#include "util/cast_util.h"
 #include "util/coding.h"
 #include "util/duplicate_detector.h"
 #include "util/string_util.h"
 #include "util/util.h"
 
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
 
 // anon namespace for file-local types
 namespace {
@@ -134,44 +138,157 @@ struct BatchContentClassifier : public WriteBatch::Handler {
   }
 };
 
+class TimestampAssigner : public WriteBatch::Handler {
+ public:
+  explicit TimestampAssigner(const Slice& ts)
+      : timestamp_(ts), timestamps_(kEmptyTimestampList) {}
+  explicit TimestampAssigner(const std::vector<Slice>& ts_list)
+      : timestamps_(ts_list) {
+    SanityCheck();
+  }
+  ~TimestampAssigner() override {}
+
+  Status PutCF(uint32_t, const Slice& key, const Slice&) override {
+    AssignTimestamp(key);
+    ++idx_;
+    return Status::OK();
+  }
+
+  Status DeleteCF(uint32_t, const Slice& key) override {
+    AssignTimestamp(key);
+    ++idx_;
+    return Status::OK();
+  }
+
+  Status SingleDeleteCF(uint32_t, const Slice& key) override {
+    AssignTimestamp(key);
+    ++idx_;
+    return Status::OK();
+  }
+
+  Status DeleteRangeCF(uint32_t, const Slice& begin_key,
+                       const Slice& end_key) override {
+    AssignTimestamp(begin_key);
+    AssignTimestamp(end_key);
+    ++idx_;
+    return Status::OK();
+  }
+
+  Status MergeCF(uint32_t, const Slice& key, const Slice&) override {
+    AssignTimestamp(key);
+    ++idx_;
+    return Status::OK();
+  }
+
+  Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override {
+    // TODO (yanqin): support blob db in the future.
+    return Status::OK();
+  }
+
+  Status MarkBeginPrepare(bool) override {
+    // TODO (yanqin): support in the future.
+    return Status::OK();
+  }
+
+  Status MarkEndPrepare(const Slice&) override {
+    // TODO (yanqin): support in the future.
+    return Status::OK();
+  }
+
+  Status MarkCommit(const Slice&) override {
+    // TODO (yanqin): support in the future.
+    return Status::OK();
+  }
+
+  Status MarkRollback(const Slice&) override {
+    // TODO (yanqin): support in the future.
+    return Status::OK();
+  }
+
+ private:
+  void SanityCheck() const {
+    assert(!timestamps_.empty());
+#ifndef NDEBUG
+    const size_t ts_sz = timestamps_[0].size();
+    for (size_t i = 1; i != timestamps_.size(); ++i) {
+      assert(ts_sz == timestamps_[i].size());
+    }
+#endif  // !NDEBUG
+  }
+
+  void AssignTimestamp(const Slice& key) {
+    assert(timestamps_.empty() || idx_ < timestamps_.size());
+    const Slice& ts = timestamps_.empty() ? timestamp_ : timestamps_[idx_];
+    size_t ts_sz = ts.size();
+    char* ptr = const_cast<char*>(key.data() + key.size() - ts_sz);
+    memcpy(ptr, ts.data(), ts_sz);
+  }
+
+  static const std::vector<Slice> kEmptyTimestampList;
+  const Slice timestamp_;
+  const std::vector<Slice>& timestamps_;
+  size_t idx_ = 0;
+
+  // No copy or move.
+  TimestampAssigner(const TimestampAssigner&) = delete;
+  TimestampAssigner(TimestampAssigner&&) = delete;
+  TimestampAssigner& operator=(const TimestampAssigner&) = delete;
+  TimestampAssigner&& operator=(TimestampAssigner&&) = delete;
+};
+const std::vector<Slice> TimestampAssigner::kEmptyTimestampList;
+
 }  // anon namespace
 
 struct SavePoints {
-  std::stack<SavePoint> stack;
+  std::stack<SavePoint, autovector<SavePoint>> stack;
 };
 
 WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes)
-    : save_points_(nullptr), content_flags_(0), max_bytes_(max_bytes), rep_() {
+    : content_flags_(0), max_bytes_(max_bytes), rep_(), timestamp_size_(0) {
+  rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader)
+                   ? reserved_bytes
+                   : WriteBatchInternal::kHeader);
+  rep_.resize(WriteBatchInternal::kHeader);
+}
+
+WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes, size_t ts_sz)
+    : content_flags_(0), max_bytes_(max_bytes), rep_(), timestamp_size_(ts_sz) {
   rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) ?
     reserved_bytes : WriteBatchInternal::kHeader);
   rep_.resize(WriteBatchInternal::kHeader);
 }
 
 WriteBatch::WriteBatch(const std::string& rep)
-    : save_points_(nullptr),
-      content_flags_(ContentFlags::DEFERRED),
+    : content_flags_(ContentFlags::DEFERRED),
       max_bytes_(0),
-      rep_(rep) {}
+      rep_(rep),
+      timestamp_size_(0) {}
 
 WriteBatch::WriteBatch(std::string&& rep)
-    : save_points_(nullptr),
-      content_flags_(ContentFlags::DEFERRED),
+    : content_flags_(ContentFlags::DEFERRED),
       max_bytes_(0),
-      rep_(std::move(rep)) {}
+      rep_(std::move(rep)),
+      timestamp_size_(0) {}
 
 WriteBatch::WriteBatch(const WriteBatch& src)
-    : save_points_(src.save_points_),
-      wal_term_point_(src.wal_term_point_),
+    : wal_term_point_(src.wal_term_point_),
       content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
       max_bytes_(src.max_bytes_),
-      rep_(src.rep_) {}
+      rep_(src.rep_),
+      timestamp_size_(src.timestamp_size_) {
+  if (src.save_points_ != nullptr) {
+    save_points_.reset(new SavePoints());
+    save_points_->stack = src.save_points_->stack;
+  }
+}
 
 WriteBatch::WriteBatch(WriteBatch&& src) noexcept
     : save_points_(std::move(src.save_points_)),
       wal_term_point_(std::move(src.wal_term_point_)),
       content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
       max_bytes_(src.max_bytes_),
-      rep_(std::move(src.rep_)) {}
+      rep_(std::move(src.rep_)),
+      timestamp_size_(src.timestamp_size_) {}
 
 WriteBatch& WriteBatch::operator=(const WriteBatch& src) {
   if (&src != this) {
@@ -189,7 +306,7 @@ WriteBatch& WriteBatch::operator=(WriteBatch&& src) {
   return *this;
 }
 
-WriteBatch::~WriteBatch() { delete save_points_; }
+WriteBatch::~WriteBatch() { }
 
 WriteBatch::Handler::~Handler() { }
 
@@ -217,9 +334,7 @@ void WriteBatch::Clear() {
   wal_term_point_.clear();
 }
 
-int WriteBatch::Count() const {
-  return WriteBatchInternal::Count(this);
-}
+uint32_t WriteBatch::Count() const { return WriteBatchInternal::Count(this); }
 
 uint32_t WriteBatch::ComputeContentFlags() const {
   auto rv = content_flags_.load(std::memory_order_relaxed);
@@ -397,19 +512,32 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag,
 }
 
 Status WriteBatch::Iterate(Handler* handler) const {
-  Slice input(rep_);
-  if (input.size() < WriteBatchInternal::kHeader) {
+  if (rep_.size() < WriteBatchInternal::kHeader) {
     return Status::Corruption("malformed WriteBatch (too small)");
   }
 
-  input.remove_prefix(WriteBatchInternal::kHeader);
+  return WriteBatchInternal::Iterate(this, handler, WriteBatchInternal::kHeader,
+                                     rep_.size());
+}
+
+Status WriteBatchInternal::Iterate(const WriteBatch* wb,
+                                   WriteBatch::Handler* handler, size_t begin,
+                                   size_t end) {
+  if (begin > wb->rep_.size() || end > wb->rep_.size() || end < begin) {
+    return Status::Corruption("Invalid start/end bounds for Iterate");
+  }
+  assert(begin <= end);
+  Slice input(wb->rep_.data() + begin, static_cast<size_t>(end - begin));
+  bool whole_batch =
+      (begin == WriteBatchInternal::kHeader) && (end == wb->rep_.size());
+
   Slice key, value, blob, xid;
   // Sometimes a sub-batch starts with a Noop. We want to exclude such Noops as
   // the batch boundary symbols otherwise we would mis-count the number of
   // batches. We do that by checking whether the accumulated batch is empty
   // before seeing the next Noop.
   bool empty_batch = true;
-  int found = 0;
+  uint32_t found = 0;
   Status s;
   char tag = 0;
   uint32_t column_family = 0;  // default
@@ -433,7 +561,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
       }
     } else {
       assert(s.IsTryAgain());
-      assert(!last_was_try_again); // to detect infinite loop bugs
+      assert(!last_was_try_again);  // to detect infinite loop bugs
       if (UNLIKELY(last_was_try_again)) {
         return Status::Corruption(
             "two consecutive TryAgain in WriteBatch handler; this is either a "
@@ -446,7 +574,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
     switch (tag) {
       case kTypeColumnFamilyValue:
       case kTypeValue:
-        assert(content_flags_.load(std::memory_order_relaxed) &
+        assert(wb->content_flags_.load(std::memory_order_relaxed) &
                (ContentFlags::DEFERRED | ContentFlags::HAS_PUT));
         s = handler->PutCF(column_family, key, value);
         if (LIKELY(s.ok())) {
@@ -456,7 +584,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
         break;
       case kTypeColumnFamilyDeletion:
       case kTypeDeletion:
-        assert(content_flags_.load(std::memory_order_relaxed) &
+        assert(wb->content_flags_.load(std::memory_order_relaxed) &
                (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE));
         s = handler->DeleteCF(column_family, key);
         if (LIKELY(s.ok())) {
@@ -466,7 +594,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
         break;
       case kTypeColumnFamilySingleDeletion:
       case kTypeSingleDeletion:
-        assert(content_flags_.load(std::memory_order_relaxed) &
+        assert(wb->content_flags_.load(std::memory_order_relaxed) &
                (ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE));
         s = handler->SingleDeleteCF(column_family, key);
         if (LIKELY(s.ok())) {
@@ -476,7 +604,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
         break;
       case kTypeColumnFamilyRangeDeletion:
       case kTypeRangeDeletion:
-        assert(content_flags_.load(std::memory_order_relaxed) &
+        assert(wb->content_flags_.load(std::memory_order_relaxed) &
                (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE_RANGE));
         s = handler->DeleteRangeCF(column_family, key, value);
         if (LIKELY(s.ok())) {
@@ -486,7 +614,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
         break;
       case kTypeColumnFamilyMerge:
       case kTypeMerge:
-        assert(content_flags_.load(std::memory_order_relaxed) &
+        assert(wb->content_flags_.load(std::memory_order_relaxed) &
                (ContentFlags::DEFERRED | ContentFlags::HAS_MERGE));
         s = handler->MergeCF(column_family, key, value);
         if (LIKELY(s.ok())) {
@@ -496,7 +624,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
         break;
       case kTypeColumnFamilyBlobIndex:
       case kTypeBlobIndex:
-        assert(content_flags_.load(std::memory_order_relaxed) &
+        assert(wb->content_flags_.load(std::memory_order_relaxed) &
                (ContentFlags::DEFERRED | ContentFlags::HAS_BLOB_INDEX));
         s = handler->PutBlobIndexCF(column_family, key, value);
         if (LIKELY(s.ok())) {
@@ -509,7 +637,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
         empty_batch = false;
         break;
       case kTypeBeginPrepareXID:
-        assert(content_flags_.load(std::memory_order_relaxed) &
+        assert(wb->content_flags_.load(std::memory_order_relaxed) &
                (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
         handler->MarkBeginPrepare();
         empty_batch = false;
@@ -528,7 +656,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
         }
         break;
       case kTypeBeginPersistedPrepareXID:
-        assert(content_flags_.load(std::memory_order_relaxed) &
+        assert(wb->content_flags_.load(std::memory_order_relaxed) &
                (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
         handler->MarkBeginPrepare();
         empty_batch = false;
@@ -541,7 +669,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
         }
         break;
       case kTypeBeginUnprepareXID:
-        assert(content_flags_.load(std::memory_order_relaxed) &
+        assert(wb->content_flags_.load(std::memory_order_relaxed) &
                (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_UNPREPARE));
         handler->MarkBeginPrepare(true /* unprepared */);
         empty_batch = false;
@@ -560,19 +688,19 @@ Status WriteBatch::Iterate(Handler* handler) const {
         }
         break;
       case kTypeEndPrepareXID:
-        assert(content_flags_.load(std::memory_order_relaxed) &
+        assert(wb->content_flags_.load(std::memory_order_relaxed) &
                (ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE));
         handler->MarkEndPrepare(xid);
         empty_batch = true;
         break;
       case kTypeCommitXID:
-        assert(content_flags_.load(std::memory_order_relaxed) &
+        assert(wb->content_flags_.load(std::memory_order_relaxed) &
                (ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT));
         handler->MarkCommit(xid);
         empty_batch = true;
         break;
       case kTypeRollbackXID:
-        assert(content_flags_.load(std::memory_order_relaxed) &
+        assert(wb->content_flags_.load(std::memory_order_relaxed) &
                (ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK));
         handler->MarkRollback(xid);
         empty_batch = true;
@@ -588,7 +716,8 @@ Status WriteBatch::Iterate(Handler* handler) const {
   if (!s.ok()) {
     return s;
   }
-  if (handler_continue && found != WriteBatchInternal::Count(this)) {
+  if (handler_continue && whole_batch &&
+      found != WriteBatchInternal::Count(wb)) {
     return Status::Corruption("WriteBatch has wrong count");
   } else {
     return Status::OK();
@@ -603,11 +732,11 @@ void WriteBatchInternal::SetAsLastestPersistentState(WriteBatch* b) {
   b->is_latest_persistent_state_ = true;
 }
 
-int WriteBatchInternal::Count(const WriteBatch* b) {
+uint32_t WriteBatchInternal::Count(const WriteBatch* b) {
   return DecodeFixed32(b->rep_.data() + 8);
 }
 
-void WriteBatchInternal::SetCount(WriteBatch* b, int n) {
+void WriteBatchInternal::SetCount(WriteBatch* b, uint32_t n) {
   EncodeFixed32(&b->rep_[8], n);
 }
 
@@ -640,7 +769,14 @@ Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
     PutVarint32(&b->rep_, column_family_id);
   }
-  PutLengthPrefixedSlice(&b->rep_, key);
+  if (0 == b->timestamp_size_) {
+    PutLengthPrefixedSlice(&b->rep_, key);
+  } else {
+    PutVarint32(&b->rep_,
+                static_cast<uint32_t>(key.size() + b->timestamp_size_));
+    b->rep_.append(key.data(), key.size());
+    b->rep_.append(b->timestamp_size_, '\0');
+  }
   PutLengthPrefixedSlice(&b->rep_, value);
   b->content_flags_.store(
       b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
@@ -689,7 +825,11 @@ Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
     PutVarint32(&b->rep_, column_family_id);
   }
-  PutLengthPrefixedSliceParts(&b->rep_, key);
+  if (0 == b->timestamp_size_) {
+    PutLengthPrefixedSliceParts(&b->rep_, key);
+  } else {
+    PutLengthPrefixedSlicePartsWithPadding(&b->rep_, key, b->timestamp_size_);
+  }
   PutLengthPrefixedSliceParts(&b->rep_, value);
   b->content_flags_.store(
       b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
@@ -991,7 +1131,7 @@ Status WriteBatch::PutLogData(const Slice& blob) {
 
 void WriteBatch::SetSavePoint() {
   if (save_points_ == nullptr) {
-    save_points_ = new SavePoints();
+    save_points_.reset(new SavePoints());
   }
   // Record length and count of current batch of writes.
   save_points_->stack.push(SavePoint(
@@ -1008,7 +1148,7 @@ Status WriteBatch::RollbackToSavePoint() {
   save_points_->stack.pop();
 
   assert(savepoint.size <= rep_.size());
-  assert(savepoint.count <= Count());
+  assert(static_cast<uint32_t>(savepoint.count) <= Count());
 
   if (savepoint.size == rep_.size()) {
     // No changes to rollback
@@ -1035,11 +1175,22 @@ Status WriteBatch::PopSavePoint() {
   return Status::OK();
 }
 
+Status WriteBatch::AssignTimestamp(const Slice& ts) {
+  TimestampAssigner ts_assigner(ts);
+  return Iterate(&ts_assigner);
+}
+
+Status WriteBatch::AssignTimestamps(const std::vector<Slice>& ts_list) {
+  TimestampAssigner ts_assigner(ts_list);
+  return Iterate(&ts_assigner);
+}
+
 class MemTableInserter : public WriteBatch::Handler {
 
   SequenceNumber sequence_;
   ColumnFamilyMemTables* const cf_mems_;
   FlushScheduler* const flush_scheduler_;
+  TrimHistoryScheduler* const trim_history_scheduler_;
   const bool ignore_missing_column_families_;
   const uint64_t recovering_log_number_;
   // log number that all Memtables inserted into should reference
@@ -1073,6 +1224,22 @@ class MemTableInserter : public WriteBatch::Handler {
   DupDetector       duplicate_detector_;
   bool              dup_dectector_on_;
 
+  bool hint_per_batch_;
+  bool hint_created_;
+  // Hints for this batch
+  using HintMap = std::unordered_map<MemTable*, void*>;
+  using HintMapType = std::aligned_storage<sizeof(HintMap)>::type;
+  HintMapType hint_;
+
+  HintMap& GetHintMap() {
+    assert(hint_per_batch_);
+    if (!hint_created_) {
+      new (&hint_) HintMap();
+      hint_created_ = true;
+    }
+    return *reinterpret_cast<HintMap*>(&hint_);
+  }
+
   MemPostInfoMap& GetPostMap() {
     assert(concurrent_memtable_writes_);
     if(!post_info_created_) {
@@ -1101,18 +1268,20 @@ class MemTableInserter : public WriteBatch::Handler {
   // cf_mems should not be shared with concurrent inserters
   MemTableInserter(SequenceNumber _sequence, ColumnFamilyMemTables* cf_mems,
                    FlushScheduler* flush_scheduler,
+                   TrimHistoryScheduler* trim_history_scheduler,
                    bool ignore_missing_column_families,
                    uint64_t recovering_log_number, DB* db,
                    bool concurrent_memtable_writes,
                    bool* has_valid_writes = nullptr, bool seq_per_batch = false,
-                   bool batch_per_txn = true)
+                   bool batch_per_txn = true, bool hint_per_batch = false)
       : sequence_(_sequence),
         cf_mems_(cf_mems),
         flush_scheduler_(flush_scheduler),
+        trim_history_scheduler_(trim_history_scheduler),
         ignore_missing_column_families_(ignore_missing_column_families),
         recovering_log_number_(recovering_log_number),
         log_number_ref_(0),
-        db_(reinterpret_cast<DBImpl*>(db)),
+        db_(static_cast_with_check<DBImpl, DB>(db)),
         concurrent_memtable_writes_(concurrent_memtable_writes),
         post_info_created_(false),
         has_valid_writes_(has_valid_writes),
@@ -1128,7 +1297,9 @@ class MemTableInserter : public WriteBatch::Handler {
         write_before_prepare_(!batch_per_txn),
         unprepared_batch_(false),
         duplicate_detector_(),
-        dup_dectector_on_(false) {
+        dup_dectector_on_(false),
+        hint_per_batch_(hint_per_batch),
+        hint_created_(false) {
     assert(cf_mems_);
   }
 
@@ -1141,6 +1312,12 @@ class MemTableInserter : public WriteBatch::Handler {
       reinterpret_cast<MemPostInfoMap*>
         (&mem_post_info_map_)->~MemPostInfoMap();
     }
+    if (hint_created_) {
+      for (auto iter : GetHintMap()) {
+        delete[] reinterpret_cast<char*>(iter.second);
+      }
+      reinterpret_cast<HintMap*>(&hint_)->~HintMap();
+    }
     delete rebuilding_trx_;
   }
 
@@ -1250,7 +1427,8 @@ class MemTableInserter : public WriteBatch::Handler {
     if (!moptions->inplace_update_support) {
       bool mem_res =
           mem->Add(sequence_, value_type, key, value,
-                   concurrent_memtable_writes_, get_post_process_info(mem));
+                   concurrent_memtable_writes_, get_post_process_info(mem),
+                   hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
       if (UNLIKELY(!mem_res)) {
         assert(seq_per_batch_);
         ret_status = Status::TryAgain("key+seq exists");
@@ -1333,7 +1511,8 @@ class MemTableInserter : public WriteBatch::Handler {
     MemTable* mem = cf_mems_->GetMemTable();
     bool mem_res =
         mem->Add(sequence_, delete_type, key, value,
-                 concurrent_memtable_writes_, get_post_process_info(mem));
+                 concurrent_memtable_writes_, get_post_process_info(mem),
+                 hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
     if (UNLIKELY(!mem_res)) {
       assert(seq_per_batch_);
       ret_status = Status::TryAgain("key+seq exists");
@@ -1468,7 +1647,6 @@ class MemTableInserter : public WriteBatch::Handler {
 
   Status MergeCF(uint32_t column_family_id, const Slice& key,
                  const Slice& value) override {
-    assert(!concurrent_memtable_writes_);
     // optimize for non-recovery mode
     if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
       WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value);
@@ -1495,6 +1673,8 @@ class MemTableInserter : public WriteBatch::Handler {
     MemTable* mem = cf_mems_->GetMemTable();
     auto* moptions = mem->GetImmutableMemTableOptions();
     bool perform_merge = false;
+    assert(!concurrent_memtable_writes_ ||
+           moptions->max_successive_merges == 0);
 
     // If we pass DB through and options.max_successive_merges is hit
     // during recovery, Get() will be issued which will try to acquire
@@ -1502,6 +1682,7 @@ class MemTableInserter : public WriteBatch::Handler {
     // So we disable merge in recovery
     if (moptions->max_successive_merges > 0 && db_ != nullptr &&
         recovering_log_number_ == 0) {
+      assert(!concurrent_memtable_writes_);
       LookupKey lkey(key, sequence_);
 
       // Count the number of successive merges at the head
@@ -1547,6 +1728,7 @@ class MemTableInserter : public WriteBatch::Handler {
         perform_merge = false;
       } else {
         // 3) Add value to memtable
+        assert(!concurrent_memtable_writes_);
         bool mem_res = mem->Add(sequence_, kTypeValue, key, new_value);
         if (UNLIKELY(!mem_res)) {
           assert(seq_per_batch_);
@@ -1559,7 +1741,9 @@ class MemTableInserter : public WriteBatch::Handler {
 
     if (!perform_merge) {
       // Add merge operator to memtable
-      bool mem_res = mem->Add(sequence_, kTypeMerge, key, value);
+      bool mem_res =
+          mem->Add(sequence_, kTypeMerge, key, value,
+                   concurrent_memtable_writes_, get_post_process_info(mem));
       if (UNLIKELY(!mem_res)) {
         assert(seq_per_batch_);
         ret_status = Status::TryAgain("key+seq exists");
@@ -1594,7 +1778,34 @@ class MemTableInserter : public WriteBatch::Handler {
           cfd->mem()->MarkFlushScheduled()) {
         // MarkFlushScheduled only returns true if we are the one that
         // should take action, so no need to dedup further
-        flush_scheduler_->ScheduleFlush(cfd);
+        flush_scheduler_->ScheduleWork(cfd);
+      }
+    }
+    // check if memtable_list size exceeds max_write_buffer_size_to_maintain
+    if (trim_history_scheduler_ != nullptr) {
+      auto* cfd = cf_mems_->current();
+
+      assert(cfd);
+      assert(cfd->ioptions());
+
+      const size_t size_to_maintain = static_cast<size_t>(
+          cfd->ioptions()->max_write_buffer_size_to_maintain);
+
+      if (size_to_maintain > 0) {
+        MemTableList* const imm = cfd->imm();
+        assert(imm);
+
+        if (imm->HasHistory()) {
+          const MemTable* const mem = cfd->mem();
+          assert(mem);
+
+          if (mem->ApproximateMemoryUsageFast() +
+                      imm->ApproximateMemoryUsageExcludingLast() >=
+                  size_to_maintain &&
+              imm->MarkTrimHistoryNeeded()) {
+            trim_history_scheduler_->ScheduleWork(cfd);
+          }
+        }
       }
     }
   }
@@ -1617,8 +1828,9 @@ class MemTableInserter : public WriteBatch::Handler {
       // we are now iterating through a prepared section
       rebuilding_trx_ = new WriteBatch();
       rebuilding_trx_seq_ = sequence_;
-      // We only call MarkBeginPrepare once per batch, and unprepared_batch_
-      // is initialized to false by default.
+      // Verify that we have matching MarkBeginPrepare/MarkEndPrepare markers.
+      // unprepared_batch_ should be false because it is false by default, and
+      // gets reset to false in MarkEndPrepare.
       assert(!unprepared_batch_);
       unprepared_batch_ = unprepare;
 
@@ -1643,6 +1855,7 @@ class MemTableInserter : public WriteBatch::Handler {
       db_->InsertRecoveredTransaction(recovering_log_number_, name.ToString(),
                                       rebuilding_trx_, rebuilding_trx_seq_,
                                       batch_cnt, unprepared_batch_);
+      unprepared_batch_ = false;
       rebuilding_trx_ = nullptr;
     } else {
       assert(rebuilding_trx_ == nullptr);
@@ -1754,12 +1967,14 @@ class MemTableInserter : public WriteBatch::Handler {
 Status WriteBatchInternal::InsertInto(
     WriteThread::WriteGroup& write_group, SequenceNumber sequence,
     ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
+    TrimHistoryScheduler* trim_history_scheduler,
     bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db,
     bool concurrent_memtable_writes, bool seq_per_batch, bool batch_per_txn) {
   MemTableInserter inserter(
-      sequence, memtables, flush_scheduler, ignore_missing_column_families,
-      recovery_log_number, db, concurrent_memtable_writes,
-      nullptr /*has_valid_writes*/, seq_per_batch, batch_per_txn);
+      sequence, memtables, flush_scheduler, trim_history_scheduler,
+      ignore_missing_column_families, recovery_log_number, db,
+      concurrent_memtable_writes, nullptr /*has_valid_writes*/, seq_per_batch,
+      batch_per_txn);
   for (auto w : write_group) {
     if (w->CallbackFailed()) {
       continue;
@@ -1785,17 +2000,19 @@ Status WriteBatchInternal::InsertInto(
 Status WriteBatchInternal::InsertInto(
     WriteThread::Writer* writer, SequenceNumber sequence,
     ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
+    TrimHistoryScheduler* trim_history_scheduler,
     bool ignore_missing_column_families, uint64_t log_number, DB* db,
     bool concurrent_memtable_writes, bool seq_per_batch, size_t batch_cnt,
-    bool batch_per_txn) {
+    bool batch_per_txn, bool hint_per_batch) {
 #ifdef NDEBUG
   (void)batch_cnt;
 #endif
   assert(writer->ShouldWriteToMemtable());
   MemTableInserter inserter(
-      sequence, memtables, flush_scheduler, ignore_missing_column_families,
-      log_number, db, concurrent_memtable_writes, nullptr /*has_valid_writes*/,
-      seq_per_batch, batch_per_txn);
+      sequence, memtables, flush_scheduler, trim_history_scheduler,
+      ignore_missing_column_families, log_number, db,
+      concurrent_memtable_writes, nullptr /*has_valid_writes*/, seq_per_batch,
+      batch_per_txn, hint_per_batch);
   SetSequence(writer->batch, sequence);
   inserter.set_log_number_ref(writer->log_ref);
   Status s = writer->batch->Iterate(&inserter);
@@ -1809,11 +2026,13 @@ Status WriteBatchInternal::InsertInto(
 
 Status WriteBatchInternal::InsertInto(
     const WriteBatch* batch, ColumnFamilyMemTables* memtables,
-    FlushScheduler* flush_scheduler, bool ignore_missing_column_families,
-    uint64_t log_number, DB* db, bool concurrent_memtable_writes,
-    SequenceNumber* next_seq, bool* has_valid_writes, bool seq_per_batch,
-    bool batch_per_txn) {
+    FlushScheduler* flush_scheduler,
+    TrimHistoryScheduler* trim_history_scheduler,
+    bool ignore_missing_column_families, uint64_t log_number, DB* db,
+    bool concurrent_memtable_writes, SequenceNumber* next_seq,
+    bool* has_valid_writes, bool seq_per_batch, bool batch_per_txn) {
   MemTableInserter inserter(Sequence(batch), memtables, flush_scheduler,
+                            trim_history_scheduler,
                             ignore_missing_column_families, log_number, db,
                             concurrent_memtable_writes, has_valid_writes,
                             seq_per_batch, batch_per_txn);
@@ -1870,4 +2089,4 @@ size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize,
   }
 }
 
-}  // namespace rocksdb
+}  // namespace ROCKSDB_NAMESPACE