]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/memtable.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / memtable.cc
index 6a96a6fa5e0205fc4401ad867e5351a8f2d6956e..45b139e80989258ece7a506c97ebd775830f5fca 100644 (file)
 #include <array>
 #include <limits>
 #include <memory>
+
 #include "db/dbformat.h"
+#include "db/kv_checksum.h"
 #include "db/merge_context.h"
 #include "db/merge_helper.h"
 #include "db/pinned_iterators_manager.h"
 #include "db/range_tombstone_fragmenter.h"
 #include "db/read_callback.h"
+#include "db/wide/wide_column_serialization.h"
+#include "logging/logging.h"
 #include "memory/arena.h"
 #include "memory/memory_usage.h"
 #include "monitoring/perf_context_imp.h"
@@ -30,6 +34,7 @@
 #include "rocksdb/iterator.h"
 #include "rocksdb/merge_operator.h"
 #include "rocksdb/slice_transform.h"
+#include "rocksdb/types.h"
 #include "rocksdb/write_buffer_manager.h"
 #include "table/internal_iterator.h"
 #include "table/iterator_wrapper.h"
@@ -41,7 +46,7 @@
 namespace ROCKSDB_NAMESPACE {
 
 ImmutableMemTableOptions::ImmutableMemTableOptions(
-    const ImmutableCFOptions& ioptions,
+    const ImmutableOptions& ioptions,
     const MutableCFOptions& mutable_cf_options)
     : arena_block_size(mutable_cf_options.arena_block_size),
       memtable_prefix_bloom_bits(
@@ -56,13 +61,15 @@ ImmutableMemTableOptions::ImmutableMemTableOptions(
       inplace_update_num_locks(mutable_cf_options.inplace_update_num_locks),
       inplace_callback(ioptions.inplace_callback),
       max_successive_merges(mutable_cf_options.max_successive_merges),
-      statistics(ioptions.statistics),
-      merge_operator(ioptions.merge_operator),
-      info_log(ioptions.info_log),
-      allow_data_in_errors(ioptions.allow_data_in_errors) {}
+      statistics(ioptions.stats),
+      merge_operator(ioptions.merge_operator.get()),
+      info_log(ioptions.logger),
+      allow_data_in_errors(ioptions.allow_data_in_errors),
+      protection_bytes_per_key(
+          mutable_cf_options.memtable_protection_bytes_per_key) {}
 
 MemTable::MemTable(const InternalKeyComparator& cmp,
-                   const ImmutableCFOptions& ioptions,
+                   const ImmutableOptions& ioptions,
                    const MutableCFOptions& mutable_cf_options,
                    WriteBufferManager* write_buffer_manager,
                    SequenceNumber latest_seq, uint32_t column_family_id)
@@ -80,9 +87,9 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
              mutable_cf_options.memtable_huge_page_size),
       table_(ioptions.memtable_factory->CreateMemTableRep(
           comparator_, &arena_, mutable_cf_options.prefix_extractor.get(),
-          ioptions.info_log, column_family_id)),
+          ioptions.logger, column_family_id)),
       range_del_table_(SkipListFactory().CreateMemTableRep(
-          comparator_, &arena_, nullptr /* transform */, ioptions.info_log,
+          comparator_, &arena_, nullptr /* transform */, ioptions.logger,
           column_family_id)),
       is_range_del_table_empty_(true),
       data_size_(0),
@@ -102,9 +109,9 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
                  : 0),
       prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
       flush_state_(FLUSH_NOT_REQUESTED),
-      env_(ioptions.env),
+      clock_(ioptions.clock),
       insert_with_hint_prefix_extractor_(
-          ioptions.memtable_insert_with_hint_prefix_extractor),
+          ioptions.memtable_insert_with_hint_prefix_extractor.get()),
       oldest_key_time_(std::numeric_limits<uint64_t>::max()),
       atomic_flush_seqno_(kMaxSequenceNumber),
       approximate_memory_usage_(0) {
@@ -118,7 +125,23 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
     bloom_filter_.reset(
         new DynamicBloom(&arena_, moptions_.memtable_prefix_bloom_bits,
                          6 /* hard coded 6 probes */,
-                         moptions_.memtable_huge_page_size, ioptions.info_log));
+                         moptions_.memtable_huge_page_size, ioptions.logger));
+  }
+  // Initialize cached_range_tombstone_ here since it could
+  // be read before it is constructed in MemTable::Add(), which could also lead
+  // to a data race on the global mutex table backing atomic shared_ptr.
+  auto new_cache = std::make_shared<FragmentedRangeTombstoneListCache>();
+  size_t size = cached_range_tombstone_.Size();
+  for (size_t i = 0; i < size; ++i) {
+    std::shared_ptr<FragmentedRangeTombstoneListCache>* local_cache_ref_ptr =
+        cached_range_tombstone_.AccessAtCore(i);
+    auto new_local_cache_ref = std::make_shared<
+        const std::shared_ptr<FragmentedRangeTombstoneListCache>>(new_cache);
+    std::atomic_store_explicit(
+        local_cache_ref_ptr,
+        std::shared_ptr<FragmentedRangeTombstoneListCache>(new_local_cache_ref,
+                                                           new_cache.get()),
+        std::memory_order_relaxed);
   }
 }
 
@@ -136,8 +159,8 @@ size_t MemTable::ApproximateMemoryUsage() {
   for (size_t usage : usages) {
     // If usage + total_usage >= kMaxSizet, return kMaxSizet.
     // the following variation is to avoid numeric overflow.
-    if (usage >= port::kMaxSizet - total_usage) {
-      return port::kMaxSizet;
+    if (usage >= std::numeric_limits<size_t>::max() - total_usage) {
+      return std::numeric_limits<size_t>::max();
     }
     total_usage += usage;
   }
@@ -221,7 +244,7 @@ void MemTable::UpdateOldestKeyTime() {
   uint64_t oldest_key_time = oldest_key_time_.load(std::memory_order_relaxed);
   if (oldest_key_time == std::numeric_limits<uint64_t>::max()) {
     int64_t current_time = 0;
-    auto s = env_->GetCurrentTime(&current_time);
+    auto s = clock_->GetCurrentTime(&current_time);
     if (s.ok()) {
       assert(current_time >= 0);
       // If fail, the timestamp is already set.
@@ -232,6 +255,73 @@ void MemTable::UpdateOldestKeyTime() {
   }
 }
 
+Status MemTable::VerifyEntryChecksum(const char* entry,
+                                     size_t protection_bytes_per_key,
+                                     bool allow_data_in_errors) {
+  if (protection_bytes_per_key == 0) {
+    return Status::OK();
+  }
+  uint32_t key_length;
+  const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
+  if (key_ptr == nullptr) {
+    return Status::Corruption("Unable to parse internal key length");
+  }
+  if (key_length < 8) {
+    return Status::Corruption("Memtable entry internal key length too short.");
+  }
+  Slice user_key = Slice(key_ptr, key_length - 8);
+
+  const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
+  ValueType type;
+  SequenceNumber seq;
+  UnPackSequenceAndType(tag, &seq, &type);
+
+  uint32_t value_length = 0;
+  const char* value_ptr = GetVarint32Ptr(
+      key_ptr + key_length, key_ptr + key_length + 5, &value_length);
+  if (value_ptr == nullptr) {
+    return Status::Corruption("Unable to parse internal key value");
+  }
+  Slice value = Slice(value_ptr, value_length);
+
+  const char* checksum_ptr = value_ptr + value_length;
+  uint64_t expected = ProtectionInfo64()
+                          .ProtectKVO(user_key, value, type)
+                          .ProtectS(seq)
+                          .GetVal();
+  bool match = true;
+  switch (protection_bytes_per_key) {
+    case 1:
+      match = static_cast<uint8_t>(checksum_ptr[0]) ==
+              static_cast<uint8_t>(expected);
+      break;
+    case 2:
+      match = DecodeFixed16(checksum_ptr) == static_cast<uint16_t>(expected);
+      break;
+    case 4:
+      match = DecodeFixed32(checksum_ptr) == static_cast<uint32_t>(expected);
+      break;
+    case 8:
+      match = DecodeFixed64(checksum_ptr) == expected;
+      break;
+    default:
+      assert(false);
+  }
+  if (!match) {
+    std::string msg(
+        "Corrupted memtable entry, per key-value checksum verification "
+        "failed.");
+    if (allow_data_in_errors) {
+      msg.append("Unrecognized value type: " +
+                 std::to_string(static_cast<int>(type)) + ". ");
+      msg.append("User key: " + user_key.ToString(/*hex=*/true) + ". ");
+      msg.append("seq: " + std::to_string(seq) + ".");
+    }
+    return Status::Corruption(msg.c_str());
+  }
+  return Status::OK();
+}
+
 int MemTable::KeyComparator::operator()(const char* prefix_len_key1,
                                         const char* prefix_len_key2) const {
   // Internal keys are encoded as length-prefixed strings.
@@ -240,9 +330,8 @@ int MemTable::KeyComparator::operator()(const char* prefix_len_key1,
   return comparator.CompareKeySeq(k1, k2);
 }
 
-int MemTable::KeyComparator::operator()(const char* prefix_len_key,
-                                        const KeyComparator::DecodedType& key)
-    const {
+int MemTable::KeyComparator::operator()(
+    const char* prefix_len_key, const KeyComparator::DecodedType& key) const {
   // Internal keys are encoded as length-prefixed strings.
   Slice a = GetLengthPrefixedSlice(prefix_len_key);
   return comparator.CompareKeySeq(a, key);
@@ -286,7 +375,10 @@ class MemTableIterator : public InternalIterator {
         valid_(false),
         arena_mode_(arena != nullptr),
         value_pinned_(
-            !mem.GetImmutableMemTableOptions()->inplace_update_support) {
+            !mem.GetImmutableMemTableOptions()->inplace_update_support),
+        protection_bytes_per_key_(mem.moptions_.protection_bytes_per_key),
+        status_(Status::OK()),
+        logger_(mem.moptions_.info_log) {
     if (use_range_del_table) {
       iter_ = mem.range_del_table_->GetIterator(arena);
     } else if (prefix_extractor_ != nullptr && !read_options.total_order_seek &&
@@ -297,6 +389,7 @@ class MemTableIterator : public InternalIterator {
     } else {
       iter_ = mem.table_->GetIterator(arena);
     }
+    status_.PermitUncheckedError();
   }
   // No copying allowed
   MemTableIterator(const MemTableIterator&) = delete;
@@ -322,42 +415,50 @@ class MemTableIterator : public InternalIterator {
   PinnedIteratorsManager* pinned_iters_mgr_ = nullptr;
 #endif
 
-  bool Valid() const override { return valid_; }
+  bool Valid() const override { return valid_ && status_.ok(); }
   void Seek(const Slice& k) override {
     PERF_TIMER_GUARD(seek_on_memtable_time);
     PERF_COUNTER_ADD(seek_on_memtable_count, 1);
     if (bloom_) {
       // iterator should only use prefix bloom filter
-      Slice user_k(ExtractUserKey(k));
-      if (prefix_extractor_->InDomain(user_k) &&
-          !bloom_->MayContain(prefix_extractor_->Transform(user_k))) {
-        PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
-        valid_ = false;
-        return;
-      } else {
-        PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
+      auto ts_sz = comparator_.comparator.user_comparator()->timestamp_size();
+      Slice user_k_without_ts(ExtractUserKeyAndStripTimestamp(k, ts_sz));
+      if (prefix_extractor_->InDomain(user_k_without_ts)) {
+        if (!bloom_->MayContain(
+                prefix_extractor_->Transform(user_k_without_ts))) {
+          PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
+          valid_ = false;
+          return;
+        } else {
+          PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
+        }
       }
     }
     iter_->Seek(k, nullptr);
     valid_ = iter_->Valid();
+    VerifyEntryChecksum();
   }
   void SeekForPrev(const Slice& k) override {
     PERF_TIMER_GUARD(seek_on_memtable_time);
     PERF_COUNTER_ADD(seek_on_memtable_count, 1);
     if (bloom_) {
-      Slice user_k(ExtractUserKey(k));
-      if (prefix_extractor_->InDomain(user_k) &&
-          !bloom_->MayContain(prefix_extractor_->Transform(user_k))) {
-        PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
-        valid_ = false;
-        return;
-      } else {
-        PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
+      auto ts_sz = comparator_.comparator.user_comparator()->timestamp_size();
+      Slice user_k_without_ts(ExtractUserKeyAndStripTimestamp(k, ts_sz));
+      if (prefix_extractor_->InDomain(user_k_without_ts)) {
+        if (!bloom_->MayContain(
+                prefix_extractor_->Transform(user_k_without_ts))) {
+          PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
+          valid_ = false;
+          return;
+        } else {
+          PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
+        }
       }
     }
     iter_->Seek(k, nullptr);
     valid_ = iter_->Valid();
-    if (!Valid()) {
+    VerifyEntryChecksum();
+    if (!Valid() && status().ok()) {
       SeekToLast();
     }
     while (Valid() && comparator_.comparator.Compare(k, key()) < 0) {
@@ -367,10 +468,12 @@ class MemTableIterator : public InternalIterator {
   void SeekToFirst() override {
     iter_->SeekToFirst();
     valid_ = iter_->Valid();
+    VerifyEntryChecksum();
   }
   void SeekToLast() override {
     iter_->SeekToLast();
     valid_ = iter_->Valid();
+    VerifyEntryChecksum();
   }
   void Next() override {
     PERF_COUNTER_ADD(next_on_memtable_count, 1);
@@ -378,10 +481,11 @@ class MemTableIterator : public InternalIterator {
     iter_->Next();
     TEST_SYNC_POINT_CALLBACK("MemTableIterator::Next:0", iter_);
     valid_ = iter_->Valid();
+    VerifyEntryChecksum();
   }
   bool NextAndGetResult(IterateResult* result) override {
     Next();
-    bool is_valid = valid_;
+    bool is_valid = Valid();
     if (is_valid) {
       result->key = key();
       result->bound_check_result = IterBoundCheck::kUnknown;
@@ -394,6 +498,7 @@ class MemTableIterator : public InternalIterator {
     assert(Valid());
     iter_->Prev();
     valid_ = iter_->Valid();
+    VerifyEntryChecksum();
   }
   Slice key() const override {
     assert(Valid());
@@ -405,7 +510,7 @@ class MemTableIterator : public InternalIterator {
     return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
   }
 
-  Status status() const override { return Status::OK(); }
+  Status status() const override { return status_; }
 
   bool IsKeyPinned() const override {
     // memtable data is always pinned
@@ -425,6 +530,19 @@ class MemTableIterator : public InternalIterator {
   bool valid_;
   bool arena_mode_;
   bool value_pinned_;
+  size_t protection_bytes_per_key_;
+  Status status_;
+  Logger* logger_;
+
+  void VerifyEntryChecksum() {
+    if (protection_bytes_per_key_ > 0 && Valid()) {
+      status_ = MemTable::VerifyEntryChecksum(iter_->key(),
+                                              protection_bytes_per_key_);
+      if (!status_.ok()) {
+        ROCKS_LOG_ERROR(logger_, "In MemtableIterator: %s", status_.getState());
+      }
+    }
+  }
 };
 
 InternalIterator* MemTable::NewIterator(const ReadOptions& read_options,
@@ -435,26 +553,67 @@ InternalIterator* MemTable::NewIterator(const ReadOptions& read_options,
 }
 
 FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIterator(
-    const ReadOptions& read_options, SequenceNumber read_seq) {
+    const ReadOptions& read_options, SequenceNumber read_seq,
+    bool immutable_memtable) {
   if (read_options.ignore_range_deletions ||
       is_range_del_table_empty_.load(std::memory_order_relaxed)) {
     return nullptr;
   }
-  auto* unfragmented_iter = new MemTableIterator(
-      *this, read_options, nullptr /* arena */, true /* use_range_del_table */);
-  if (unfragmented_iter == nullptr) {
-    return nullptr;
+  return NewRangeTombstoneIteratorInternal(read_options, read_seq,
+                                           immutable_memtable);
+}
+
+FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIteratorInternal(
+    const ReadOptions& read_options, SequenceNumber read_seq,
+    bool immutable_memtable) {
+  if (immutable_memtable) {
+    // Note that caller should already have verified that
+    // !is_range_del_table_empty_
+    assert(IsFragmentedRangeTombstonesConstructed());
+    return new FragmentedRangeTombstoneIterator(
+        fragmented_range_tombstone_list_.get(), comparator_.comparator,
+        read_seq, read_options.timestamp);
   }
-  auto fragmented_tombstone_list =
-      std::make_shared<FragmentedRangeTombstoneList>(
+
+  // takes current cache
+  std::shared_ptr<FragmentedRangeTombstoneListCache> cache =
+      std::atomic_load_explicit(cached_range_tombstone_.Access(),
+                                std::memory_order_relaxed);
+  // construct fragmented tombstone list if necessary
+  if (!cache->initialized.load(std::memory_order_acquire)) {
+    cache->reader_mutex.lock();
+    if (!cache->tombstones) {
+      auto* unfragmented_iter =
+          new MemTableIterator(*this, read_options, nullptr /* arena */,
+                               true /* use_range_del_table */);
+      cache->tombstones.reset(new FragmentedRangeTombstoneList(
           std::unique_ptr<InternalIterator>(unfragmented_iter),
-          comparator_.comparator);
+          comparator_.comparator));
+      cache->initialized.store(true, std::memory_order_release);
+    }
+    cache->reader_mutex.unlock();
+  }
 
   auto* fragmented_iter = new FragmentedRangeTombstoneIterator(
-      fragmented_tombstone_list, comparator_.comparator, read_seq);
+      cache, comparator_.comparator, read_seq, read_options.timestamp);
   return fragmented_iter;
 }
 
+void MemTable::ConstructFragmentedRangeTombstones() {
+  assert(!IsFragmentedRangeTombstonesConstructed(false));
+  // There should be no concurrent Construction
+  if (!is_range_del_table_empty_.load(std::memory_order_relaxed)) {
+    auto* unfragmented_iter =
+        new MemTableIterator(*this, ReadOptions(), nullptr /* arena */,
+                             true /* use_range_del_table */);
+
+    fragmented_range_tombstone_list_ =
+        std::make_unique<FragmentedRangeTombstoneList>(
+            std::unique_ptr<InternalIterator>(unfragmented_iter),
+            comparator_.comparator);
+  }
+}
+
 port::RWMutex* MemTable::GetLock(const Slice& key) {
   return &locks_[GetSliceRangedNPHash(key, locks_.size())];
 }
@@ -480,21 +639,97 @@ MemTable::MemTableStats MemTable::ApproximateStats(const Slice& start_ikey,
   return {entry_count * (data_size / n), entry_count};
 }
 
-bool MemTable::Add(SequenceNumber s, ValueType type,
-                   const Slice& key, /* user key */
-                   const Slice& value, bool allow_concurrent,
-                   MemTablePostProcessInfo* post_process_info, void** hint) {
+Status MemTable::VerifyEncodedEntry(Slice encoded,
+                                    const ProtectionInfoKVOS64& kv_prot_info) {
+  uint32_t ikey_len = 0;
+  if (!GetVarint32(&encoded, &ikey_len)) {
+    return Status::Corruption("Unable to parse internal key length");
+  }
+  size_t ts_sz = GetInternalKeyComparator().user_comparator()->timestamp_size();
+  if (ikey_len < 8 + ts_sz) {
+    return Status::Corruption("Internal key length too short");
+  }
+  if (ikey_len > encoded.size()) {
+    return Status::Corruption("Internal key length too long");
+  }
+  uint32_t value_len = 0;
+  const size_t user_key_len = ikey_len - 8;
+  Slice key(encoded.data(), user_key_len);
+  encoded.remove_prefix(user_key_len);
+
+  uint64_t packed = DecodeFixed64(encoded.data());
+  ValueType value_type = kMaxValue;
+  SequenceNumber sequence_number = kMaxSequenceNumber;
+  UnPackSequenceAndType(packed, &sequence_number, &value_type);
+  encoded.remove_prefix(8);
+
+  if (!GetVarint32(&encoded, &value_len)) {
+    return Status::Corruption("Unable to parse value length");
+  }
+  if (value_len < encoded.size()) {
+    return Status::Corruption("Value length too short");
+  }
+  if (value_len > encoded.size()) {
+    return Status::Corruption("Value length too long");
+  }
+  Slice value(encoded.data(), value_len);
+
+  return kv_prot_info.StripS(sequence_number)
+      .StripKVO(key, value, value_type)
+      .GetStatus();
+}
+
+void MemTable::UpdateEntryChecksum(const ProtectionInfoKVOS64* kv_prot_info,
+                                   const Slice& key, const Slice& value,
+                                   ValueType type, SequenceNumber s,
+                                   char* checksum_ptr) {
+  if (moptions_.protection_bytes_per_key == 0) {
+    return;
+  }
+
+  uint64_t checksum = 0;
+  if (kv_prot_info == nullptr) {
+    checksum =
+        ProtectionInfo64().ProtectKVO(key, value, type).ProtectS(s).GetVal();
+  } else {
+    checksum = kv_prot_info->GetVal();
+  }
+  switch (moptions_.protection_bytes_per_key) {
+    case 1:
+      checksum_ptr[0] = static_cast<uint8_t>(checksum);
+      break;
+    case 2:
+      EncodeFixed16(checksum_ptr, static_cast<uint16_t>(checksum));
+      break;
+    case 4:
+      EncodeFixed32(checksum_ptr, static_cast<uint32_t>(checksum));
+      break;
+    case 8:
+      EncodeFixed64(checksum_ptr, checksum);
+      break;
+    default:
+      assert(false);
+  }
+}
+
+Status MemTable::Add(SequenceNumber s, ValueType type,
+                     const Slice& key, /* user key */
+                     const Slice& value,
+                     const ProtectionInfoKVOS64* kv_prot_info,
+                     bool allow_concurrent,
+                     MemTablePostProcessInfo* post_process_info, void** hint) {
   // Format of an entry is concatenation of:
   //  key_size     : varint32 of internal_key.size()
   //  key bytes    : char[internal_key.size()]
   //  value_size   : varint32 of value.size()
   //  value bytes  : char[value.size()]
+  //  checksum     : char[moptions_.protection_bytes_per_key]
   uint32_t key_size = static_cast<uint32_t>(key.size());
   uint32_t val_size = static_cast<uint32_t>(value.size());
   uint32_t internal_key_size = key_size + 8;
   const uint32_t encoded_len = VarintLength(internal_key_size) +
                                internal_key_size + VarintLength(val_size) +
-                               val_size;
+                               val_size + moptions_.protection_bytes_per_key;
   char* buf = nullptr;
   std::unique_ptr<MemTableRep>& table =
       type == kTypeRangeDeletion ? range_del_table_ : table_;
@@ -509,8 +744,22 @@ bool MemTable::Add(SequenceNumber s, ValueType type,
   p += 8;
   p = EncodeVarint32(p, val_size);
   memcpy(p, value.data(), val_size);
-  assert((unsigned)(p + val_size - buf) == (unsigned)encoded_len);
+  assert((unsigned)(p + val_size - buf + moptions_.protection_bytes_per_key) ==
+         (unsigned)encoded_len);
+
+  UpdateEntryChecksum(kv_prot_info, key, value, type, s,
+                      buf + encoded_len - moptions_.protection_bytes_per_key);
+  Slice encoded(buf, encoded_len - moptions_.protection_bytes_per_key);
+  if (kv_prot_info != nullptr) {
+    TEST_SYNC_POINT_CALLBACK("MemTable::Add:Encoded", &encoded);
+    Status status = VerifyEncodedEntry(encoded, *kv_prot_info);
+    if (!status.ok()) {
+      return status;
+    }
+  }
+
   size_t ts_sz = GetInternalKeyComparator().user_comparator()->timestamp_size();
+  Slice key_without_ts = StripTimestampFromUserKey(key, ts_sz);
 
   if (!allow_concurrent) {
     // Extract prefix for insert with hint.
@@ -519,12 +768,12 @@ bool MemTable::Add(SequenceNumber s, ValueType type,
       Slice prefix = insert_with_hint_prefix_extractor_->Transform(key_slice);
       bool res = table->InsertKeyWithHint(handle, &insert_hints_[prefix]);
       if (UNLIKELY(!res)) {
-        return res;
+        return Status::TryAgain("key+seq exists");
       }
     } else {
       bool res = table->InsertKey(handle);
       if (UNLIKELY(!res)) {
-        return res;
+        return Status::TryAgain("key+seq exists");
       }
     }
 
@@ -534,18 +783,18 @@ bool MemTable::Add(SequenceNumber s, ValueType type,
                        std::memory_order_relaxed);
     data_size_.store(data_size_.load(std::memory_order_relaxed) + encoded_len,
                      std::memory_order_relaxed);
-    if (type == kTypeDeletion) {
+    if (type == kTypeDeletion || type == kTypeSingleDeletion ||
+        type == kTypeDeletionWithTimestamp) {
       num_deletes_.store(num_deletes_.load(std::memory_order_relaxed) + 1,
                          std::memory_order_relaxed);
     }
 
     if (bloom_filter_ && prefix_extractor_ &&
-        prefix_extractor_->InDomain(key)) {
-      bloom_filter_->Add(
-          prefix_extractor_->Transform(StripTimestampFromUserKey(key, ts_sz)));
+        prefix_extractor_->InDomain(key_without_ts)) {
+      bloom_filter_->Add(prefix_extractor_->Transform(key_without_ts));
     }
     if (bloom_filter_ && moptions_.memtable_whole_key_filtering) {
-      bloom_filter_->Add(StripTimestampFromUserKey(key, ts_sz));
+      bloom_filter_->Add(key_without_ts);
     }
 
     // The first sequence number inserted into the memtable
@@ -566,7 +815,7 @@ bool MemTable::Add(SequenceNumber s, ValueType type,
                    ? table->InsertKeyConcurrently(handle)
                    : table->InsertKeyWithHintConcurrently(handle, hint);
     if (UNLIKELY(!res)) {
-      return res;
+      return Status::TryAgain("key+seq exists");
     }
 
     assert(post_process_info != nullptr);
@@ -577,11 +826,12 @@ bool MemTable::Add(SequenceNumber s, ValueType type,
     }
 
     if (bloom_filter_ && prefix_extractor_ &&
-        prefix_extractor_->InDomain(key)) {
-      bloom_filter_->AddConcurrently(prefix_extractor_->Transform(key));
+        prefix_extractor_->InDomain(key_without_ts)) {
+      bloom_filter_->AddConcurrently(
+          prefix_extractor_->Transform(key_without_ts));
     }
     if (bloom_filter_ && moptions_.memtable_whole_key_filtering) {
-      bloom_filter_->AddConcurrently(StripTimestampFromUserKey(key, ts_sz));
+      bloom_filter_->AddConcurrently(key_without_ts);
     }
 
     // atomically update first_seqno_ and earliest_seqno_.
@@ -597,10 +847,36 @@ bool MemTable::Add(SequenceNumber s, ValueType type,
     }
   }
   if (type == kTypeRangeDeletion) {
+    auto new_cache = std::make_shared<FragmentedRangeTombstoneListCache>();
+    size_t size = cached_range_tombstone_.Size();
+    if (allow_concurrent) {
+      range_del_mutex_.lock();
+    }
+    for (size_t i = 0; i < size; ++i) {
+      std::shared_ptr<FragmentedRangeTombstoneListCache>* local_cache_ref_ptr =
+          cached_range_tombstone_.AccessAtCore(i);
+      auto new_local_cache_ref = std::make_shared<
+          const std::shared_ptr<FragmentedRangeTombstoneListCache>>(new_cache);
+      // It is okay for some reader to load old cache during invalidation as
+      // the new sequence number is not published yet.
+      // Each core will have a shared_ptr to a shared_ptr to the cached
+      // fragmented range tombstones, so that ref count is maintianed locally
+      // per-core using the per-core shared_ptr.
+      std::atomic_store_explicit(
+          local_cache_ref_ptr,
+          std::shared_ptr<FragmentedRangeTombstoneListCache>(
+              new_local_cache_ref, new_cache.get()),
+          std::memory_order_relaxed);
+    }
+    if (allow_concurrent) {
+      range_del_mutex_.unlock();
+    }
     is_range_del_table_empty_.store(false, std::memory_order_relaxed);
   }
   UpdateOldestKeyTime();
-  return true;
+
+  TEST_SYNC_POINT_CALLBACK("MemTable::Add:BeforeReturn:Encoded", &encoded);
+  return Status::OK();
 }
 
 // Callback from MemTable::Get()
@@ -612,6 +888,7 @@ struct Saver {
   bool* found_final_value;  // Is value set correctly? Used by KeyMayExist
   bool* merge_in_progress;
   std::string* value;
+  PinnableWideColumns* columns;
   SequenceNumber seq;
   std::string* timestamp;
   const MergeOperator* merge_operator;
@@ -623,10 +900,12 @@ struct Saver {
   Statistics* statistics;
   bool inplace_update_support;
   bool do_merge;
-  Env* env_;
+  SystemClock* clock;
+
   ReadCallback* callback_;
   bool* is_blob_index;
   bool allow_data_in_errors;
+  size_t protection_bytes_per_key;
   bool CheckCallback(SequenceNumber _seq) {
     if (callback_) {
       return callback_->IsVisible(_seq);
@@ -634,26 +913,32 @@ struct Saver {
     return true;
   }
 };
-}  // namespace
+}  // anonymous namespace
 
 static bool SaveValue(void* arg, const char* entry) {
+  TEST_SYNC_POINT_CALLBACK("Memtable::SaveValue:Begin:entry", &entry);
   Saver* s = reinterpret_cast<Saver*>(arg);
   assert(s != nullptr);
+  assert(!s->value || !s->columns);
+
+  if (s->protection_bytes_per_key > 0) {
+    *(s->status) = MemTable::VerifyEntryChecksum(
+        entry, s->protection_bytes_per_key, s->allow_data_in_errors);
+    if (!s->status->ok()) {
+      ROCKS_LOG_ERROR(s->logger, "In SaveValue: %s", s->status->getState());
+      // Memtable entry corrupted
+      return false;
+    }
+  }
+
   MergeContext* merge_context = s->merge_context;
   SequenceNumber max_covering_tombstone_seq = s->max_covering_tombstone_seq;
   const MergeOperator* merge_operator = s->merge_operator;
 
   assert(merge_context != nullptr);
 
-  // entry format is:
-  //    klength  varint32
-  //    userkey  char[klength-8]
-  //    tag      uint64
-  //    vlength  varint32f
-  //    value    char[vlength]
-  // Check that it belongs to same user key.  We do not check the
-  // sequence number since the Seek() call above should have skipped
-  // all entries with overly large sequence numbers.
+  // Refer to comments under MemTable::Add() for entry format.
+  // Check that it belongs to same user key.
   uint32_t key_length = 0;
   const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
   assert(key_length >= 8);
@@ -661,8 +946,12 @@ static bool SaveValue(void* arg, const char* entry) {
   const Comparator* user_comparator =
       s->mem->GetInternalKeyComparator().user_comparator();
   size_t ts_sz = user_comparator->timestamp_size();
-  if (user_comparator->CompareWithoutTimestamp(user_key_slice,
-                                               s->key->user_key()) == 0) {
+  if (ts_sz && s->timestamp && max_covering_tombstone_seq > 0) {
+    // timestamp should already be set to range tombstone timestamp
+    assert(s->timestamp->size() == ts_sz);
+  }
+  if (user_comparator->EqualWithoutTimestamp(user_key_slice,
+                                             s->key->user_key())) {
     // Correct user key
     const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
     ValueType type;
@@ -673,68 +962,212 @@ static bool SaveValue(void* arg, const char* entry) {
       return true;  // to continue to the next seq
     }
 
-    s->seq = seq;
+    if (s->seq == kMaxSequenceNumber) {
+      s->seq = seq;
+      if (s->seq > max_covering_tombstone_seq) {
+        if (ts_sz && s->timestamp != nullptr) {
+          // `timestamp` was set to range tombstone's timestamp before
+          // `SaveValue` is ever called. This key has a higher sequence number
+          // than range tombstone, and is the key with the highest seqno across
+          // all keys with this user_key, so we update timestamp here.
+          Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz);
+          s->timestamp->assign(ts.data(), ts_sz);
+        }
+      } else {
+        s->seq = max_covering_tombstone_seq;
+      }
+    }
+
+    if (ts_sz > 0 && s->timestamp != nullptr) {
+      if (!s->timestamp->empty()) {
+        assert(ts_sz == s->timestamp->size());
+      }
+      // TODO optimize for smaller size ts
+      const std::string kMaxTs(ts_sz, '\xff');
+      if (s->timestamp->empty() ||
+          user_comparator->CompareTimestamp(*(s->timestamp), kMaxTs) == 0) {
+        Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz);
+        s->timestamp->assign(ts.data(), ts_sz);
+      }
+    }
 
-    if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) &&
+    if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex ||
+         type == kTypeWideColumnEntity || type == kTypeDeletion ||
+         type == kTypeSingleDeletion || type == kTypeDeletionWithTimestamp) &&
         max_covering_tombstone_seq > seq) {
       type = kTypeRangeDeletion;
     }
     switch (type) {
-      case kTypeBlobIndex:
-        if (s->is_blob_index == nullptr) {
-          ROCKS_LOG_ERROR(s->logger, "Encounter unexpected blob index.");
+      case kTypeBlobIndex: {
+        if (!s->do_merge) {
           *(s->status) = Status::NotSupported(
-              "Encounter unsupported blob value. Please open DB with "
-              "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
-        } else if (*(s->merge_in_progress)) {
-          *(s->status) =
-              Status::NotSupported("Blob DB does not support merge operator.");
+              "GetMergeOperands not supported by stacked BlobDB");
+          *(s->found_final_value) = true;
+          return false;
+        }
+
+        if (*(s->merge_in_progress)) {
+          *(s->status) = Status::NotSupported(
+              "Merge operator not supported by stacked BlobDB");
+          *(s->found_final_value) = true;
+          return false;
         }
-        if (!s->status->ok()) {
+
+        if (s->is_blob_index == nullptr) {
+          ROCKS_LOG_ERROR(s->logger, "Encountered unexpected blob index.");
+          *(s->status) = Status::NotSupported(
+              "Encountered unexpected blob index. Please open DB with "
+              "ROCKSDB_NAMESPACE::blob_db::BlobDB.");
           *(s->found_final_value) = true;
           return false;
         }
-        FALLTHROUGH_INTENDED;
+
+        if (s->inplace_update_support) {
+          s->mem->GetLock(s->key->user_key())->ReadLock();
+        }
+
+        Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
+
+        *(s->status) = Status::OK();
+
+        if (s->value) {
+          s->value->assign(v.data(), v.size());
+        } else if (s->columns) {
+          s->columns->SetPlainValue(v);
+        }
+
+        if (s->inplace_update_support) {
+          s->mem->GetLock(s->key->user_key())->ReadUnlock();
+        }
+
+        *(s->found_final_value) = true;
+        *(s->is_blob_index) = true;
+
+        return false;
+      }
       case kTypeValue: {
         if (s->inplace_update_support) {
           s->mem->GetLock(s->key->user_key())->ReadLock();
         }
+
         Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
+
         *(s->status) = Status::OK();
-        if (*(s->merge_in_progress)) {
-          if (s->do_merge) {
-            if (s->value != nullptr) {
-              *(s->status) = MergeHelper::TimedFullMerge(
-                  merge_operator, s->key->user_key(), &v,
-                  merge_context->GetOperands(), s->value, s->logger,
-                  s->statistics, s->env_, nullptr /* result_operand */, true);
-            }
-          } else {
-            // Preserve the value with the goal of returning it as part of
-            // raw merge operands to the user
-            merge_context->PushOperand(
-                v, s->inplace_update_support == false /* operand_pinned */);
-          }
-        } else if (!s->do_merge) {
+
+        if (!s->do_merge) {
           // Preserve the value with the goal of returning it as part of
           // raw merge operands to the user
+          // TODO(yanqin) update MergeContext so that timestamps information
+          // can also be retained.
+
           merge_context->PushOperand(
               v, s->inplace_update_support == false /* operand_pinned */);
-        } else if (s->value != nullptr) {
+        } else if (*(s->merge_in_progress)) {
+          assert(s->do_merge);
+
+          if (s->value || s->columns) {
+            std::string result;
+            *(s->status) = MergeHelper::TimedFullMerge(
+                merge_operator, s->key->user_key(), &v,
+                merge_context->GetOperands(), &result, s->logger, s->statistics,
+                s->clock, /* result_operand */ nullptr,
+                /* update_num_ops_stats */ true);
+
+            if (s->status->ok()) {
+              if (s->value) {
+                *(s->value) = std::move(result);
+              } else {
+                assert(s->columns);
+                s->columns->SetPlainValue(result);
+              }
+            }
+          }
+        } else if (s->value) {
           s->value->assign(v.data(), v.size());
+        } else if (s->columns) {
+          s->columns->SetPlainValue(v);
         }
+
         if (s->inplace_update_support) {
           s->mem->GetLock(s->key->user_key())->ReadUnlock();
         }
+
         *(s->found_final_value) = true;
+
         if (s->is_blob_index != nullptr) {
-          *(s->is_blob_index) = (type == kTypeBlobIndex);
+          *(s->is_blob_index) = false;
         }
 
-        if (ts_sz > 0 && s->timestamp != nullptr) {
-          Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz);
-          s->timestamp->assign(ts.data(), ts.size());
+        return false;
+      }
+      case kTypeWideColumnEntity: {
+        if (s->inplace_update_support) {
+          s->mem->GetLock(s->key->user_key())->ReadLock();
+        }
+
+        Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
+
+        *(s->status) = Status::OK();
+
+        if (!s->do_merge) {
+          // Preserve the value with the goal of returning it as part of
+          // raw merge operands to the user
+
+          Slice value_of_default;
+          *(s->status) = WideColumnSerialization::GetValueOfDefaultColumn(
+              v, value_of_default);
+
+          if (s->status->ok()) {
+            merge_context->PushOperand(
+                value_of_default,
+                s->inplace_update_support == false /* operand_pinned */);
+          }
+        } else if (*(s->merge_in_progress)) {
+          assert(s->do_merge);
+
+          if (s->value) {
+            Slice value_of_default;
+            *(s->status) = WideColumnSerialization::GetValueOfDefaultColumn(
+                v, value_of_default);
+            if (s->status->ok()) {
+              *(s->status) = MergeHelper::TimedFullMerge(
+                  merge_operator, s->key->user_key(), &value_of_default,
+                  merge_context->GetOperands(), s->value, s->logger,
+                  s->statistics, s->clock, /* result_operand */ nullptr,
+                  /* update_num_ops_stats */ true);
+            }
+          } else if (s->columns) {
+            std::string result;
+            *(s->status) = MergeHelper::TimedFullMergeWithEntity(
+                merge_operator, s->key->user_key(), v,
+                merge_context->GetOperands(), &result, s->logger, s->statistics,
+                s->clock, /* update_num_ops_stats */ true);
+
+            if (s->status->ok()) {
+              *(s->status) = s->columns->SetWideColumnValue(result);
+            }
+          }
+        } else if (s->value) {
+          Slice value_of_default;
+          *(s->status) = WideColumnSerialization::GetValueOfDefaultColumn(
+              v, value_of_default);
+          if (s->status->ok()) {
+            s->value->assign(value_of_default.data(), value_of_default.size());
+          }
+        } else if (s->columns) {
+          *(s->status) = s->columns->SetWideColumnValue(v);
+        }
+
+        if (s->inplace_update_support) {
+          s->mem->GetLock(s->key->user_key())->ReadUnlock();
+        }
+
+        *(s->found_final_value) = true;
+
+        if (s->is_blob_index != nullptr) {
+          *(s->is_blob_index) = false;
         }
+
         return false;
       }
       case kTypeDeletion:
@@ -742,11 +1175,22 @@ static bool SaveValue(void* arg, const char* entry) {
       case kTypeSingleDeletion:
       case kTypeRangeDeletion: {
         if (*(s->merge_in_progress)) {
-          if (s->value != nullptr) {
+          if (s->value || s->columns) {
+            std::string result;
             *(s->status) = MergeHelper::TimedFullMerge(
                 merge_operator, s->key->user_key(), nullptr,
-                merge_context->GetOperands(), s->value, s->logger,
-                s->statistics, s->env_, nullptr /* result_operand */, true);
+                merge_context->GetOperands(), &result, s->logger, s->statistics,
+                s->clock, /* result_operand */ nullptr,
+                /* update_num_ops_stats */ true);
+
+            if (s->status->ok()) {
+              if (s->value) {
+                *(s->value) = std::move(result);
+              } else {
+                assert(s->columns);
+                s->columns->SetPlainValue(result);
+              }
+            }
           }
         } else {
           *(s->status) = Status::NotFound();
@@ -771,10 +1215,24 @@ static bool SaveValue(void* arg, const char* entry) {
             v, s->inplace_update_support == false /* operand_pinned */);
         if (s->do_merge && merge_operator->ShouldMerge(
                                merge_context->GetOperandsDirectionBackward())) {
-          *(s->status) = MergeHelper::TimedFullMerge(
-              merge_operator, s->key->user_key(), nullptr,
-              merge_context->GetOperands(), s->value, s->logger, s->statistics,
-              s->env_, nullptr /* result_operand */, true);
+          if (s->value || s->columns) {
+            std::string result;
+            *(s->status) = MergeHelper::TimedFullMerge(
+                merge_operator, s->key->user_key(), nullptr,
+                merge_context->GetOperands(), &result, s->logger, s->statistics,
+                s->clock, /* result_operand */ nullptr,
+                /* update_num_ops_stats */ true);
+
+            if (s->status->ok()) {
+              if (s->value) {
+                *(s->value) = std::move(result);
+              } else {
+                assert(s->columns);
+                s->columns->SetPlainValue(result);
+              }
+            }
+          }
+
           *(s->found_final_value) = true;
           return false;
         }
@@ -800,11 +1258,12 @@ static bool SaveValue(void* arg, const char* entry) {
 }
 
 bool MemTable::Get(const LookupKey& key, std::string* value,
-                   std::string* timestamp, Status* s,
-                   MergeContext* merge_context,
+                   PinnableWideColumns* columns, std::string* timestamp,
+                   Status* s, MergeContext* merge_context,
                    SequenceNumber* max_covering_tombstone_seq,
                    SequenceNumber* seq, const ReadOptions& read_opts,
-                   ReadCallback* callback, bool* is_blob_index, bool do_merge) {
+                   bool immutable_memtable, ReadCallback* callback,
+                   bool* is_blob_index, bool do_merge) {
   // The sequence number is updated synchronously in version_set.h
   if (IsEmpty()) {
     // Avoiding recording stats for speed.
@@ -814,29 +1273,41 @@ bool MemTable::Get(const LookupKey& key, std::string* value,
 
   std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
       NewRangeTombstoneIterator(read_opts,
-                                GetInternalKeySeqno(key.internal_key())));
+                                GetInternalKeySeqno(key.internal_key()),
+                                immutable_memtable));
   if (range_del_iter != nullptr) {
-    *max_covering_tombstone_seq =
-        std::max(*max_covering_tombstone_seq,
-                 range_del_iter->MaxCoveringTombstoneSeqnum(key.user_key()));
+    SequenceNumber covering_seq =
+        range_del_iter->MaxCoveringTombstoneSeqnum(key.user_key());
+    if (covering_seq > *max_covering_tombstone_seq) {
+      *max_covering_tombstone_seq = covering_seq;
+      if (timestamp) {
+        // Will be overwritten in SaveValue() if there is a point key with
+        // a higher seqno.
+        timestamp->assign(range_del_iter->timestamp().data(),
+                          range_del_iter->timestamp().size());
+      }
+    }
   }
 
-  Slice user_key = key.user_key();
   bool found_final_value = false;
   bool merge_in_progress = s->IsMergeInProgress();
   bool may_contain = true;
   size_t ts_sz = GetInternalKeyComparator().user_comparator()->timestamp_size();
+  Slice user_key_without_ts = StripTimestampFromUserKey(key.user_key(), ts_sz);
+  bool bloom_checked = false;
   if (bloom_filter_) {
     // when both memtable_whole_key_filtering and prefix_extractor_ are set,
     // only do whole key filtering for Get() to save CPU
     if (moptions_.memtable_whole_key_filtering) {
-      may_contain =
-          bloom_filter_->MayContain(StripTimestampFromUserKey(user_key, ts_sz));
+      may_contain = bloom_filter_->MayContain(user_key_without_ts);
+      bloom_checked = true;
     } else {
       assert(prefix_extractor_);
-      may_contain =
-          !prefix_extractor_->InDomain(user_key) ||
-          bloom_filter_->MayContain(prefix_extractor_->Transform(user_key));
+      if (prefix_extractor_->InDomain(user_key_without_ts)) {
+        may_contain = bloom_filter_->MayContain(
+            prefix_extractor_->Transform(user_key_without_ts));
+        bloom_checked = true;
+      }
     }
   }
 
@@ -845,16 +1316,17 @@ bool MemTable::Get(const LookupKey& key, std::string* value,
     PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
     *seq = kMaxSequenceNumber;
   } else {
-    if (bloom_filter_) {
+    if (bloom_checked) {
       PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
     }
     GetFromTable(key, *max_covering_tombstone_seq, do_merge, callback,
-                 is_blob_index, value, timestamp, s, merge_context, seq,
-                 &found_final_value, &merge_in_progress);
+                 is_blob_index, value, columns, timestamp, s, merge_context,
+                 seq, &found_final_value, &merge_in_progress);
   }
 
   // No change to value, since we have not yet found a Put/Delete
-  if (!found_final_value && merge_in_progress) {
+  // Propagate corruption error
+  if (!found_final_value && merge_in_progress && !s->IsCorruption()) {
     *s = Status::MergeInProgress();
   }
   PERF_COUNTER_ADD(get_from_memtable_count, 1);
@@ -865,6 +1337,7 @@ void MemTable::GetFromTable(const LookupKey& key,
                             SequenceNumber max_covering_tombstone_seq,
                             bool do_merge, ReadCallback* callback,
                             bool* is_blob_index, std::string* value,
+                            PinnableWideColumns* columns,
                             std::string* timestamp, Status* s,
                             MergeContext* merge_context, SequenceNumber* seq,
                             bool* found_final_value, bool* merge_in_progress) {
@@ -874,6 +1347,7 @@ void MemTable::GetFromTable(const LookupKey& key,
   saver.merge_in_progress = merge_in_progress;
   saver.key = &key;
   saver.value = value;
+  saver.columns = columns;
   saver.timestamp = timestamp;
   saver.seq = kMaxSequenceNumber;
   saver.mem = this;
@@ -883,17 +1357,18 @@ void MemTable::GetFromTable(const LookupKey& key,
   saver.logger = moptions_.info_log;
   saver.inplace_update_support = moptions_.inplace_update_support;
   saver.statistics = moptions_.statistics;
-  saver.env_ = env_;
+  saver.clock = clock_;
   saver.callback_ = callback;
   saver.is_blob_index = is_blob_index;
   saver.do_merge = do_merge;
   saver.allow_data_in_errors = moptions_.allow_data_in_errors;
+  saver.protection_bytes_per_key = moptions_.protection_bytes_per_key;
   table_->Get(key, &saver, SaveValue);
   *seq = saver.seq;
 }
 
 void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
-                        ReadCallback* callback, bool* is_blob) {
+                        ReadCallback* callback, bool immutable_memtable) {
   // The sequence number is updated synchronously in version_set.h
   if (IsEmpty()) {
     // Avoiding recording stats for speed.
@@ -901,53 +1376,64 @@ void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
   }
   PERF_TIMER_GUARD(get_from_memtable_time);
 
+  // For now, memtable Bloom filter is effectively disabled if there are any
+  // range tombstones. This is the simplest way to ensure range tombstones are
+  // handled. TODO: allow Bloom checks where max_covering_tombstone_seq==0
+  bool no_range_del = read_options.ignore_range_deletions ||
+                      is_range_del_table_empty_.load(std::memory_order_relaxed);
   MultiGetRange temp_range(*range, range->begin(), range->end());
-  if (bloom_filter_) {
-    std::array<Slice*, MultiGetContext::MAX_BATCH_SIZE> keys;
-    std::array<bool, MultiGetContext::MAX_BATCH_SIZE> may_match = {{true}};
-    autovector<Slice, MultiGetContext::MAX_BATCH_SIZE> prefixes;
+  if (bloom_filter_ && no_range_del) {
+    bool whole_key =
+        !prefix_extractor_ || moptions_.memtable_whole_key_filtering;
+    std::array<Slice, MultiGetContext::MAX_BATCH_SIZE> bloom_keys;
+    std::array<bool, MultiGetContext::MAX_BATCH_SIZE> may_match;
+    std::array<size_t, MultiGetContext::MAX_BATCH_SIZE> range_indexes;
     int num_keys = 0;
     for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) {
-      if (!prefix_extractor_) {
-        keys[num_keys++] = &iter->ukey_without_ts;
+      if (whole_key) {
+        bloom_keys[num_keys] = iter->ukey_without_ts;
+        range_indexes[num_keys++] = iter.index();
       } else if (prefix_extractor_->InDomain(iter->ukey_without_ts)) {
-        prefixes.emplace_back(
-            prefix_extractor_->Transform(iter->ukey_without_ts));
-        keys[num_keys++] = &prefixes.back();
+        bloom_keys[num_keys] =
+            prefix_extractor_->Transform(iter->ukey_without_ts);
+        range_indexes[num_keys++] = iter.index();
       }
     }
-    bloom_filter_->MayContain(num_keys, &keys[0], &may_match[0]);
-    int idx = 0;
-    for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) {
-      if (prefix_extractor_ &&
-          !prefix_extractor_->InDomain(iter->ukey_without_ts)) {
-        PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
-        continue;
-      }
-      if (!may_match[idx]) {
-        temp_range.SkipKey(iter);
+    bloom_filter_->MayContain(num_keys, &bloom_keys[0], &may_match[0]);
+    for (int i = 0; i < num_keys; ++i) {
+      if (!may_match[i]) {
+        temp_range.SkipIndex(range_indexes[i]);
         PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
       } else {
         PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
       }
-      idx++;
     }
   }
   for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) {
-    SequenceNumber seq = kMaxSequenceNumber;
     bool found_final_value{false};
     bool merge_in_progress = iter->s->IsMergeInProgress();
-    std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
-        NewRangeTombstoneIterator(
-            read_options, GetInternalKeySeqno(iter->lkey->internal_key())));
-    if (range_del_iter != nullptr) {
-      iter->max_covering_tombstone_seq = std::max(
-          iter->max_covering_tombstone_seq,
-          range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key()));
+    if (!no_range_del) {
+      std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
+          NewRangeTombstoneIteratorInternal(
+              read_options, GetInternalKeySeqno(iter->lkey->internal_key()),
+              immutable_memtable));
+      SequenceNumber covering_seq =
+          range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key());
+      if (covering_seq > iter->max_covering_tombstone_seq) {
+        iter->max_covering_tombstone_seq = covering_seq;
+        if (iter->timestamp) {
+          // Will be overwritten in SaveValue() if there is a point key with
+          // a higher seqno.
+          iter->timestamp->assign(range_del_iter->timestamp().data(),
+                                  range_del_iter->timestamp().size());
+        }
+      }
     }
+    SequenceNumber dummy_seq;
     GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true,
-                 callback, is_blob, iter->value->GetSelf(), iter->timestamp,
-                 iter->s, &(iter->merge_context), &seq, &found_final_value,
+                 callback, &iter->is_blob_index, iter->value->GetSelf(),
+                 /*columns=*/nullptr, iter->timestamp, iter->s,
+                 &(iter->merge_context), &dummy_seq, &found_final_value,
                  &merge_in_progress);
 
     if (!found_final_value && merge_in_progress) {
@@ -973,9 +1459,9 @@ void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
   PERF_COUNTER_ADD(get_from_memtable_count, 1);
 }
 
-void MemTable::Update(SequenceNumber seq,
-                      const Slice& key,
-                      const Slice& value) {
+Status MemTable::Update(SequenceNumber seq, ValueType value_type,
+                        const Slice& key, const Slice& value,
+                        const ProtectionInfoKVOS64* kv_prot_info) {
   LookupKey lkey(key, seq);
   Slice mem_key = lkey.memtable_key();
 
@@ -984,12 +1470,7 @@ void MemTable::Update(SequenceNumber seq,
   iter->Seek(lkey.internal_key(), mem_key.data());
 
   if (iter->Valid()) {
-    // entry format is:
-    //    key_length  varint32
-    //    userkey  char[klength-8]
-    //    tag      uint64
-    //    vlength  varint32
-    //    value    char[vlength]
+    // Refer to comments under MemTable::Add() for entry format.
     // Check that it belongs to same user key.  We do not check the
     // sequence number since the Seek() call above should have skipped
     // all entries with overly large sequence numbers.
@@ -1004,7 +1485,7 @@ void MemTable::Update(SequenceNumber seq,
       SequenceNumber existing_seq;
       UnPackSequenceAndType(tag, &existing_seq, &type);
       assert(existing_seq != seq);
-      if (type == kTypeValue) {
+      if (type == value_type) {
         Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
         uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
         uint32_t new_size = static_cast<uint32_t>(value.size());
@@ -1019,22 +1500,31 @@ void MemTable::Update(SequenceNumber seq,
                  (unsigned)(VarintLength(key_length) + key_length +
                             VarintLength(value.size()) + value.size()));
           RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
-          return;
+          if (kv_prot_info != nullptr) {
+            ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info);
+            // `seq` is swallowed and `existing_seq` prevails.
+            updated_kv_prot_info.UpdateS(seq, existing_seq);
+            UpdateEntryChecksum(&updated_kv_prot_info, key, value, type,
+                                existing_seq, p + value.size());
+            Slice encoded(entry, p + value.size() - entry);
+            return VerifyEncodedEntry(encoded, updated_kv_prot_info);
+          } else {
+            UpdateEntryChecksum(nullptr, key, value, type, existing_seq,
+                                p + value.size());
+          }
+          return Status::OK();
         }
       }
     }
   }
 
-  // key doesn't exist
-  bool add_res __attribute__((__unused__));
-  add_res = Add(seq, kTypeValue, key, value);
-  // We already checked unused != seq above. In that case, Add should not fail.
-  assert(add_res);
+  // The latest value is not value_type or key doesn't exist
+  return Add(seq, value_type, key, value, kv_prot_info);
 }
 
-bool MemTable::UpdateCallback(SequenceNumber seq,
-                              const Slice& key,
-                              const Slice& delta) {
+Status MemTable::UpdateCallback(SequenceNumber seq, const Slice& key,
+                                const Slice& delta,
+                                const ProtectionInfoKVOS64* kv_prot_info) {
   LookupKey lkey(key, seq);
   Slice memkey = lkey.memtable_key();
 
@@ -1043,12 +1533,7 @@ bool MemTable::UpdateCallback(SequenceNumber seq,
   iter->Seek(lkey.internal_key(), memkey.data());
 
   if (iter->Valid()) {
-    // entry format is:
-    //    key_length  varint32
-    //    userkey  char[klength-8]
-    //    tag      uint64
-    //    vlength  varint32
-    //    value    char[vlength]
+    // Refer to comments under MemTable::Add() for entry format.
     // Check that it belongs to same user key.  We do not check the
     // sequence number since the Seek() call above should have skipped
     // all entries with overly large sequence numbers.
@@ -1060,54 +1545,74 @@ bool MemTable::UpdateCallback(SequenceNumber seq,
       // Correct user key
       const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
       ValueType type;
-      uint64_t unused;
-      UnPackSequenceAndType(tag, &unused, &type);
-      switch (type) {
-        case kTypeValue: {
-          Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
-          uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
-
-          char* prev_buffer = const_cast<char*>(prev_value.data());
-          uint32_t new_prev_size = prev_size;
+      uint64_t existing_seq;
+      UnPackSequenceAndType(tag, &existing_seq, &type);
+      if (type == kTypeValue) {
+        Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
+        uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
 
-          std::string str_value;
-          WriteLock wl(GetLock(lkey.user_key()));
-          auto status = moptions_.inplace_callback(prev_buffer, &new_prev_size,
-                                                   delta, &str_value);
-          if (status == UpdateStatus::UPDATED_INPLACE) {
-            // Value already updated by callback.
-            assert(new_prev_size <= prev_size);
-            if (new_prev_size < prev_size) {
-              // overwrite the new prev_size
-              char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
-                                       new_prev_size);
-              if (VarintLength(new_prev_size) < VarintLength(prev_size)) {
-                // shift the value buffer as well.
-                memcpy(p, prev_buffer, new_prev_size);
-              }
+        char* prev_buffer = const_cast<char*>(prev_value.data());
+        uint32_t new_prev_size = prev_size;
+
+        std::string str_value;
+        WriteLock wl(GetLock(lkey.user_key()));
+        auto status = moptions_.inplace_callback(prev_buffer, &new_prev_size,
+                                                 delta, &str_value);
+        if (status == UpdateStatus::UPDATED_INPLACE) {
+          // Value already updated by callback.
+          assert(new_prev_size <= prev_size);
+          if (new_prev_size < prev_size) {
+            // overwrite the new prev_size
+            char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
+                                     new_prev_size);
+            if (VarintLength(new_prev_size) < VarintLength(prev_size)) {
+              // shift the value buffer as well.
+              memcpy(p, prev_buffer, new_prev_size);
+              prev_buffer = p;
             }
-            RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
-            UpdateFlushState();
-            return true;
-          } else if (status == UpdateStatus::UPDATED) {
-            Add(seq, kTypeValue, key, Slice(str_value));
-            RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN);
-            UpdateFlushState();
-            return true;
-          } else if (status == UpdateStatus::UPDATE_FAILED) {
-            // No action required. Return.
-            UpdateFlushState();
-            return true;
           }
+          RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
+          UpdateFlushState();
+          Slice new_value(prev_buffer, new_prev_size);
+          if (kv_prot_info != nullptr) {
+            ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info);
+            // `seq` is swallowed and `existing_seq` prevails.
+            updated_kv_prot_info.UpdateS(seq, existing_seq);
+            updated_kv_prot_info.UpdateV(delta, new_value);
+            Slice encoded(entry, prev_buffer + new_prev_size - entry);
+            UpdateEntryChecksum(&updated_kv_prot_info, key, new_value, type,
+                                existing_seq, prev_buffer + new_prev_size);
+            return VerifyEncodedEntry(encoded, updated_kv_prot_info);
+          } else {
+            UpdateEntryChecksum(nullptr, key, new_value, type, existing_seq,
+                                prev_buffer + new_prev_size);
+          }
+          return Status::OK();
+        } else if (status == UpdateStatus::UPDATED) {
+          Status s;
+          if (kv_prot_info != nullptr) {
+            ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info);
+            updated_kv_prot_info.UpdateV(delta, str_value);
+            s = Add(seq, kTypeValue, key, Slice(str_value),
+                    &updated_kv_prot_info);
+          } else {
+            s = Add(seq, kTypeValue, key, Slice(str_value),
+                    nullptr /* kv_prot_info */);
+          }
+          RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN);
+          UpdateFlushState();
+          return s;
+        } else if (status == UpdateStatus::UPDATE_FAILED) {
+          // `UPDATE_FAILED` is named incorrectly. It indicates no update
+          // happened. It does not indicate a failure happened.
+          UpdateFlushState();
+          return Status::OK();
         }
-        default:
-          break;
       }
     }
   }
-  // If the latest value is not kTypeValue
-  // or key doesn't exist
-  return false;
+  // The latest value is not `kTypeValue` or key doesn't exist
+  return Status::NotFound();
 }
 
 size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) {