]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/table/block_based/filter_policy.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / table / block_based / filter_policy.cc
index a7ab907d4bc783ae74b6721da11d71e512fdaffe..f84f804dd6ccbbe948f7cda58bc4968a5f4ddc63 100644 (file)
 #include "rocksdb/filter_policy.h"
 
 #include <array>
+#include <climits>
+#include <cstring>
 #include <deque>
-
+#include <limits>
+#include <memory>
+
+#include "cache/cache_entry_roles.h"
+#include "cache/cache_reservation_manager.h"
+#include "logging/logging.h"
+#include "port/lang.h"
+#include "rocksdb/convenience.h"
+#include "rocksdb/rocksdb_namespace.h"
 #include "rocksdb/slice.h"
-#include "table/block_based/block_based_filter_block.h"
+#include "rocksdb/utilities/object_registry.h"
+#include "table/block_based/block_based_table_reader.h"
 #include "table/block_based/filter_policy_internal.h"
 #include "table/block_based/full_filter_block.h"
-#include "third-party/folly/folly/ConstexprMath.h"
 #include "util/bloom_impl.h"
 #include "util/coding.h"
 #include "util/hash.h"
+#include "util/math.h"
+#include "util/ribbon_config.h"
 #include "util/ribbon_impl.h"
+#include "util/string_util.h"
 
 namespace ROCKSDB_NAMESPACE {
 
-int BuiltinFilterBitsBuilder::CalculateNumEntry(const uint32_t bytes) {
-  int cur = 1;
-  // Find overestimate
-  while (CalculateSpace(cur) <= bytes && cur * 2 > cur) {
-    cur *= 2;
-  }
-  // Change to underestimate less than factor of two from answer
-  cur /= 2;
-  // Binary search
-  int delta = cur / 2;
-  while (delta > 0) {
-    if (CalculateSpace(cur + delta) <= bytes) {
-      cur += delta;
-    }
-    delta /= 2;
-  }
-  return cur;
-}
-
 namespace {
 
+// Metadata trailer size for built-in filters. (This is separate from
+// block-based table block trailer.)
+//
+// Originally this was 1 byte for num_probes and 4 bytes for number of
+// cache lines in the Bloom filter, but now the first trailer byte is
+// usually an implementation marker and remaining 4 bytes have various
+// meanings.
+static constexpr uint32_t kMetadataLen = 5;
+
 Slice FinishAlwaysFalse(std::unique_ptr<const char[]>* /*buf*/) {
   // Missing metadata, treated as zero entries
   return Slice(nullptr, 0);
 }
 
+Slice FinishAlwaysTrue(std::unique_ptr<const char[]>* /*buf*/) {
+  return Slice("\0\0\0\0\0\0", 6);
+}
+
 // Base class for filter builders using the XXH3 preview hash,
 // also known as Hash64 or GetSliceHash64.
-class XXH3pFilterBitsBuilder : public BuiltinFilterBitsBuilder {
+class XXPH3FilterBitsBuilder : public BuiltinFilterBitsBuilder {
  public:
-  ~XXH3pFilterBitsBuilder() override {}
+  explicit XXPH3FilterBitsBuilder(
+      std::atomic<int64_t>* aggregate_rounding_balance,
+      std::shared_ptr<CacheReservationManager> cache_res_mgr,
+      bool detect_filter_construct_corruption)
+      : aggregate_rounding_balance_(aggregate_rounding_balance),
+        cache_res_mgr_(cache_res_mgr),
+        detect_filter_construct_corruption_(
+            detect_filter_construct_corruption) {}
+
+  ~XXPH3FilterBitsBuilder() override {}
 
   virtual void AddKey(const Slice& key) override {
     uint64_t hash = GetSliceHash64(key);
@@ -62,113 +78,58 @@ class XXH3pFilterBitsBuilder : public BuiltinFilterBitsBuilder {
     // though only adjacent repetition, which we want to immediately
     // recognize and collapse for estimating true filter space
     // requirements.
-    if (hash_entries_.empty() || hash != hash_entries_.back()) {
-      hash_entries_.push_back(hash);
+    if (hash_entries_info_.entries.empty() ||
+        hash != hash_entries_info_.entries.back()) {
+      if (detect_filter_construct_corruption_) {
+        hash_entries_info_.xor_checksum ^= hash;
+      }
+      hash_entries_info_.entries.push_back(hash);
+      if (cache_res_mgr_ &&
+          // Traditional rounding to whole bucket size
+          ((hash_entries_info_.entries.size() %
+            kUint64tHashEntryCacheResBucketSize) ==
+           kUint64tHashEntryCacheResBucketSize / 2)) {
+        hash_entries_info_.cache_res_bucket_handles.emplace_back(nullptr);
+        Status s = cache_res_mgr_->MakeCacheReservation(
+            kUint64tHashEntryCacheResBucketSize * sizeof(hash),
+            &hash_entries_info_.cache_res_bucket_handles.back());
+        s.PermitUncheckedError();
+      }
     }
   }
 
- protected:
-  // For delegating between XXH3pFilterBitsBuilders
-  void SwapEntriesWith(XXH3pFilterBitsBuilder* other) {
-    std::swap(hash_entries_, other->hash_entries_);
-  }
-
-  // A deque avoids unnecessary copying of already-saved values
-  // and has near-minimal peak memory use.
-  std::deque<uint64_t> hash_entries_;
-};
-
-// #################### FastLocalBloom implementation ################## //
-// ############## also known as format_version=5 Bloom filter ########## //
-
-// See description in FastLocalBloomImpl
-class FastLocalBloomBitsBuilder : public XXH3pFilterBitsBuilder {
- public:
-  // Non-null aggregate_rounding_balance implies optimize_filters_for_memory
-  explicit FastLocalBloomBitsBuilder(
-      const int millibits_per_key,
-      std::atomic<int64_t>* aggregate_rounding_balance)
-      : millibits_per_key_(millibits_per_key),
-        aggregate_rounding_balance_(aggregate_rounding_balance) {
-    assert(millibits_per_key >= 1000);
+  virtual size_t EstimateEntriesAdded() override {
+    return hash_entries_info_.entries.size();
   }
 
-  // No Copy allowed
-  FastLocalBloomBitsBuilder(const FastLocalBloomBitsBuilder&) = delete;
-  void operator=(const FastLocalBloomBitsBuilder&) = delete;
-
-  ~FastLocalBloomBitsBuilder() override {}
-
-  virtual Slice Finish(std::unique_ptr<const char[]>* buf) override {
-    size_t num_entry = hash_entries_.size();
-    std::unique_ptr<char[]> mutable_buf;
-    uint32_t len_with_metadata =
-        CalculateAndAllocate(num_entry, &mutable_buf, /*update_balance*/ true);
-
-    assert(mutable_buf);
-    assert(len_with_metadata >= 5);
-
-    // Compute num_probes after any rounding / adjustments
-    int num_probes = GetNumProbes(num_entry, len_with_metadata);
+  virtual Status MaybePostVerify(const Slice& filter_content) override;
 
-    uint32_t len = len_with_metadata - 5;
-    if (len > 0) {
-      AddAllEntries(mutable_buf.get(), len, num_probes);
-    }
-
-    assert(hash_entries_.empty());
-
-    // See BloomFilterPolicy::GetBloomBitsReader re: metadata
-    // -1 = Marker for newer Bloom implementations
-    mutable_buf[len] = static_cast<char>(-1);
-    // 0 = Marker for this sub-implementation
-    mutable_buf[len + 1] = static_cast<char>(0);
-    // num_probes (and 0 in upper bits for 64-byte block size)
-    mutable_buf[len + 2] = static_cast<char>(num_probes);
-    // rest of metadata stays zero
-
-    Slice rv(mutable_buf.get(), len_with_metadata);
-    *buf = std::move(mutable_buf);
-    return rv;
+ protected:
+  static constexpr uint32_t kMetadataLen = 5;
+
+  // Number of hash entries to accumulate before charging their memory usage to
+  // the cache when cache charging is available
+  static const std::size_t kUint64tHashEntryCacheResBucketSize =
+      CacheReservationManagerImpl<
+          CacheEntryRole::kFilterConstruction>::GetDummyEntrySize() /
+      sizeof(uint64_t);
+
+  // For delegating between XXPH3FilterBitsBuilders
+  void SwapEntriesWith(XXPH3FilterBitsBuilder* other) {
+    assert(other != nullptr);
+    hash_entries_info_.Swap(&(other->hash_entries_info_));
   }
 
-  int CalculateNumEntry(const uint32_t bytes) override {
-    uint32_t bytes_no_meta = bytes >= 5u ? bytes - 5u : 0;
-    return static_cast<int>(uint64_t{8000} * bytes_no_meta /
-                            millibits_per_key_);
-  }
+  void ResetEntries() { hash_entries_info_.Reset(); }
 
-  uint32_t CalculateSpace(const int num_entry) override {
-    // NB: the BuiltinFilterBitsBuilder API presumes len fits in uint32_t.
-    return static_cast<uint32_t>(
-        CalculateAndAllocate(static_cast<size_t>(num_entry),
-                             /* buf */ nullptr,
-                             /*update_balance*/ false));
-  }
+  virtual size_t RoundDownUsableSpace(size_t available_size) = 0;
 
   // To choose size using malloc_usable_size, we have to actually allocate.
-  uint32_t CalculateAndAllocate(size_t num_entry, std::unique_ptr<char[]>* buf,
-                                bool update_balance) {
-    std::unique_ptr<char[]> tmpbuf;
-
-    // If not for cache line blocks in the filter, what would the target
-    // length in bytes be?
-    size_t raw_target_len = static_cast<size_t>(
-        (uint64_t{num_entry} * millibits_per_key_ + 7999) / 8000);
-
-    if (raw_target_len >= size_t{0xffffffc0}) {
-      // Max supported for this data structure implementation
-      raw_target_len = size_t{0xffffffc0};
-    }
-
-    // Round up to nearest multiple of 64 (block size). This adjustment is
-    // used for target FP rate only so that we don't receive complaints about
-    // lower FP rate vs. historic Bloom filter behavior.
-    uint32_t target_len =
-        static_cast<uint32_t>(raw_target_len + 63) & ~uint32_t{63};
-
+  size_t AllocateMaybeRounding(size_t target_len_with_metadata,
+                               size_t num_entries,
+                               std::unique_ptr<char[]>* buf) {
     // Return value set to a default; overwritten in some cases
-    uint32_t rv = target_len + /* metadata */ 5;
+    size_t rv = target_len_with_metadata;
 #ifdef ROCKSDB_MALLOC_USABLE_SIZE
     if (aggregate_rounding_balance_ != nullptr) {
       // Do optimize_filters_for_memory, using malloc_usable_size.
@@ -180,7 +141,7 @@ class FastLocalBloomBitsBuilder : public XXH3pFilterBitsBuilder {
       // allocation size.
 
       // Although it can be considered bad practice to use malloc_usable_size
-      // to access an object beyond its original size, this approach should
+      // to access an object beyond its original size, this approach should be
       // quite general: working for all allocators that properly support
       // malloc_usable_size.
 
@@ -189,7 +150,8 @@ class FastLocalBloomBitsBuilder : public XXH3pFilterBitsBuilder {
       // and relative.
       int64_t balance = aggregate_rounding_balance_->load();
 
-      double target_fp_rate = EstimatedFpRate(num_entry, target_len + 5);
+      double target_fp_rate =
+          EstimatedFpRate(num_entries, target_len_with_metadata);
       double rv_fp_rate = target_fp_rate;
 
       if (balance < 0) {
@@ -200,14 +162,17 @@ class FastLocalBloomBitsBuilder : public XXH3pFilterBitsBuilder {
         // To simplify, we just try a few modified smaller sizes. This also
         // caps how much we vary filter size vs. target, to avoid outlier
         // behavior from excessive variance.
-        for (uint64_t maybe_len64 :
+        size_t target_len = target_len_with_metadata - kMetadataLen;
+        assert(target_len < target_len_with_metadata);  // check underflow
+        for (uint64_t maybe_len_rough :
              {uint64_t{3} * target_len / 4, uint64_t{13} * target_len / 16,
               uint64_t{7} * target_len / 8, uint64_t{15} * target_len / 16}) {
-          uint32_t maybe_len =
-              static_cast<uint32_t>(maybe_len64) & ~uint32_t{63};
-          double maybe_fp_rate = EstimatedFpRate(num_entry, maybe_len + 5);
+          size_t maybe_len_with_metadata =
+              RoundDownUsableSpace(maybe_len_rough + kMetadataLen);
+          double maybe_fp_rate =
+              EstimatedFpRate(num_entries, maybe_len_with_metadata);
           if (maybe_fp_rate <= for_balance_fp_rate) {
-            rv = maybe_len + /* metadata */ 5;
+            rv = maybe_len_with_metadata;
             rv_fp_rate = maybe_fp_rate;
             break;
           }
@@ -217,12 +182,12 @@ class FastLocalBloomBitsBuilder : public XXH3pFilterBitsBuilder {
       // Filter blocks are loaded into block cache with their block trailer.
       // We need to make sure that's accounted for in choosing a
       // fragmentation-friendly size.
-      const uint32_t kExtraPadding = kBlockTrailerSize;
+      const size_t kExtraPadding = BlockBasedTable::kBlockTrailerSize;
       size_t requested = rv + kExtraPadding;
 
       // Allocate and get usable size
-      tmpbuf.reset(new char[requested]);
-      size_t usable = malloc_usable_size(tmpbuf.get());
+      buf->reset(new char[requested]);
+      size_t usable = malloc_usable_size(buf->get());
 
       if (usable - usable / 4 > requested) {
         // Ratio greater than 4/3 is too much for utilizing, if it's
@@ -232,53 +197,257 @@ class FastLocalBloomBitsBuilder : public XXH3pFilterBitsBuilder {
         // storage on disk.
         // Nothing to do, except assert that the result is accurate about
         // the usable size. (Assignment never used.)
-        assert((tmpbuf[usable - 1] = 'x'));
+        assert(((*buf)[usable - 1] = 'x'));
       } else if (usable > requested) {
-        // Adjust for reasonably larger usable size
-        size_t usable_len = (usable - kExtraPadding - /* metadata */ 5);
-        if (usable_len >= size_t{0xffffffc0}) {
-          // Max supported for this data structure implementation
-          usable_len = size_t{0xffffffc0};
-        }
-
-        rv = (static_cast<uint32_t>(usable_len) & ~uint32_t{63}) +
-             /* metadata */ 5;
-        rv_fp_rate = EstimatedFpRate(num_entry, rv);
+        rv = RoundDownUsableSpace(usable - kExtraPadding);
+        assert(rv <= usable - kExtraPadding);
+        rv_fp_rate = EstimatedFpRate(num_entries, rv);
       } else {
         // Too small means bad malloc_usable_size
         assert(usable == requested);
       }
-      memset(tmpbuf.get(), 0, rv);
+      memset(buf->get(), 0, rv);
 
-      if (update_balance) {
-        int64_t diff = static_cast<int64_t>((rv_fp_rate - target_fp_rate) *
-                                            double{0x100000000});
-        *aggregate_rounding_balance_ += diff;
-      }
+      // Update balance
+      int64_t diff = static_cast<int64_t>((rv_fp_rate - target_fp_rate) *
+                                          double{0x100000000});
+      *aggregate_rounding_balance_ += diff;
+    } else {
+      buf->reset(new char[rv]());
     }
 #else
-    (void)update_balance;
+    (void)num_entries;
+    buf->reset(new char[rv]());
 #endif  // ROCKSDB_MALLOC_USABLE_SIZE
-    if (buf) {
-      if (tmpbuf) {
-        *buf = std::move(tmpbuf);
-      } else {
-        buf->reset(new char[rv]());
+    return rv;
+  }
+
+  // TODO: Ideally we want to verify the hash entry
+  // as it is added to the filter and eliminate this function
+  // for speeding up and leaving fewer spaces for undetected memory/CPU
+  // corruption. For Ribbon Filter, it's bit harder.
+  // Possible solution:
+  // pass a custom iterator that tracks the xor checksum as
+  // it iterates to ResetAndFindSeedToSolve
+  Status MaybeVerifyHashEntriesChecksum() {
+    if (!detect_filter_construct_corruption_) {
+      return Status::OK();
+    }
+
+    uint64_t actual_hash_entries_xor_checksum = 0;
+    for (uint64_t h : hash_entries_info_.entries) {
+      actual_hash_entries_xor_checksum ^= h;
+    }
+
+    if (actual_hash_entries_xor_checksum == hash_entries_info_.xor_checksum) {
+      return Status::OK();
+    } else {
+      // Since these hash entries are corrupted and they will not be used
+      // anymore, we can reset them and release memory.
+      ResetEntries();
+      return Status::Corruption("Filter's hash entries checksum mismatched");
+    }
+  }
+
+  // See BloomFilterPolicy::aggregate_rounding_balance_. If nullptr,
+  // always "round up" like historic behavior.
+  std::atomic<int64_t>* aggregate_rounding_balance_;
+
+  // For reserving memory used in (new) Bloom and Ribbon Filter construction
+  std::shared_ptr<CacheReservationManager> cache_res_mgr_;
+
+  // For managing cache charge for final filter in (new) Bloom and Ribbon
+  // Filter construction
+  std::deque<std::unique_ptr<CacheReservationManager::CacheReservationHandle>>
+      final_filter_cache_res_handles_;
+
+  bool detect_filter_construct_corruption_;
+
+  struct HashEntriesInfo {
+    // A deque avoids unnecessary copying of already-saved values
+    // and has near-minimal peak memory use.
+    std::deque<uint64_t> entries;
+
+    // If cache_res_mgr_ != nullptr,
+    // it manages cache charge for buckets of hash entries in (new) Bloom
+    // or Ribbon Filter construction.
+    // Otherwise, it is empty.
+    std::deque<std::unique_ptr<CacheReservationManager::CacheReservationHandle>>
+        cache_res_bucket_handles;
+
+    // If detect_filter_construct_corruption_ == true,
+    // it records the xor checksum of hash entries.
+    // Otherwise, it is 0.
+    uint64_t xor_checksum = 0;
+
+    void Swap(HashEntriesInfo* other) {
+      assert(other != nullptr);
+      std::swap(entries, other->entries);
+      std::swap(cache_res_bucket_handles, other->cache_res_bucket_handles);
+      std::swap(xor_checksum, other->xor_checksum);
+    }
+
+    void Reset() {
+      entries.clear();
+      cache_res_bucket_handles.clear();
+      xor_checksum = 0;
+    }
+  };
+
+  HashEntriesInfo hash_entries_info_;
+};
+
+// #################### FastLocalBloom implementation ################## //
+// ############## also known as format_version=5 Bloom filter ########## //
+
+// See description in FastLocalBloomImpl
+class FastLocalBloomBitsBuilder : public XXPH3FilterBitsBuilder {
+ public:
+  // Non-null aggregate_rounding_balance implies optimize_filters_for_memory
+  explicit FastLocalBloomBitsBuilder(
+      const int millibits_per_key,
+      std::atomic<int64_t>* aggregate_rounding_balance,
+      std::shared_ptr<CacheReservationManager> cache_res_mgr,
+      bool detect_filter_construct_corruption)
+      : XXPH3FilterBitsBuilder(aggregate_rounding_balance, cache_res_mgr,
+                               detect_filter_construct_corruption),
+        millibits_per_key_(millibits_per_key) {
+    assert(millibits_per_key >= 1000);
+  }
+
+  // No Copy allowed
+  FastLocalBloomBitsBuilder(const FastLocalBloomBitsBuilder&) = delete;
+  void operator=(const FastLocalBloomBitsBuilder&) = delete;
+
+  ~FastLocalBloomBitsBuilder() override {}
+
+  using FilterBitsBuilder::Finish;
+
+  virtual Slice Finish(std::unique_ptr<const char[]>* buf) override {
+    return Finish(buf, nullptr);
+  }
+
+  virtual Slice Finish(std::unique_ptr<const char[]>* buf,
+                       Status* status) override {
+    size_t num_entries = hash_entries_info_.entries.size();
+    size_t len_with_metadata = CalculateSpace(num_entries);
+
+    std::unique_ptr<char[]> mutable_buf;
+    std::unique_ptr<CacheReservationManager::CacheReservationHandle>
+        final_filter_cache_res_handle;
+    len_with_metadata =
+        AllocateMaybeRounding(len_with_metadata, num_entries, &mutable_buf);
+    // Cache charging for mutable_buf
+    if (cache_res_mgr_) {
+      Status s = cache_res_mgr_->MakeCacheReservation(
+          len_with_metadata * sizeof(char), &final_filter_cache_res_handle);
+      s.PermitUncheckedError();
+    }
+
+    assert(mutable_buf);
+    assert(len_with_metadata >= kMetadataLen);
+
+    // Max size supported by implementation
+    assert(len_with_metadata <= 0xffffffffU);
+
+    // Compute num_probes after any rounding / adjustments
+    int num_probes = GetNumProbes(num_entries, len_with_metadata);
+
+    uint32_t len = static_cast<uint32_t>(len_with_metadata - kMetadataLen);
+    if (len > 0) {
+      TEST_SYNC_POINT_CALLBACK(
+          "XXPH3FilterBitsBuilder::Finish::"
+          "TamperHashEntries",
+          &hash_entries_info_.entries);
+      AddAllEntries(mutable_buf.get(), len, num_probes);
+      Status verify_hash_entries_checksum_status =
+          MaybeVerifyHashEntriesChecksum();
+      if (!verify_hash_entries_checksum_status.ok()) {
+        if (status) {
+          *status = verify_hash_entries_checksum_status;
+        }
+        return FinishAlwaysTrue(buf);
       }
     }
+
+    bool keep_entries_for_postverify = detect_filter_construct_corruption_;
+    if (!keep_entries_for_postverify) {
+      ResetEntries();
+    }
+
+    // See BloomFilterPolicy::GetBloomBitsReader re: metadata
+    // -1 = Marker for newer Bloom implementations
+    mutable_buf[len] = static_cast<char>(-1);
+    // 0 = Marker for this sub-implementation
+    mutable_buf[len + 1] = static_cast<char>(0);
+    // num_probes (and 0 in upper bits for 64-byte block size)
+    mutable_buf[len + 2] = static_cast<char>(num_probes);
+    // rest of metadata stays zero
+
+    auto TEST_arg_pair __attribute__((__unused__)) =
+        std::make_pair(&mutable_buf, len_with_metadata);
+    TEST_SYNC_POINT_CALLBACK("XXPH3FilterBitsBuilder::Finish::TamperFilter",
+                             &TEST_arg_pair);
+
+    Slice rv(mutable_buf.get(), len_with_metadata);
+    *buf = std::move(mutable_buf);
+    final_filter_cache_res_handles_.push_back(
+        std::move(final_filter_cache_res_handle));
+    if (status) {
+      *status = Status::OK();
+    }
     return rv;
   }
 
+  size_t ApproximateNumEntries(size_t bytes) override {
+    size_t bytes_no_meta =
+        bytes >= kMetadataLen ? RoundDownUsableSpace(bytes) - kMetadataLen : 0;
+    return static_cast<size_t>(uint64_t{8000} * bytes_no_meta /
+                               millibits_per_key_);
+  }
+
+  size_t CalculateSpace(size_t num_entries) override {
+    // If not for cache line blocks in the filter, what would the target
+    // length in bytes be?
+    size_t raw_target_len = static_cast<size_t>(
+        (uint64_t{num_entries} * millibits_per_key_ + 7999) / 8000);
+
+    if (raw_target_len >= size_t{0xffffffc0}) {
+      // Max supported for this data structure implementation
+      raw_target_len = size_t{0xffffffc0};
+    }
+
+    // Round up to nearest multiple of 64 (block size). This adjustment is
+    // used for target FP rate only so that we don't receive complaints about
+    // lower FP rate vs. historic Bloom filter behavior.
+    return ((raw_target_len + 63) & ~size_t{63}) + kMetadataLen;
+  }
+
   double EstimatedFpRate(size_t keys, size_t len_with_metadata) override {
     int num_probes = GetNumProbes(keys, len_with_metadata);
     return FastLocalBloomImpl::EstimatedFpRate(
-        keys, len_with_metadata - /*metadata*/ 5, num_probes, /*hash bits*/ 64);
+        keys, len_with_metadata - kMetadataLen, num_probes, /*hash bits*/ 64);
+  }
+
+ protected:
+  size_t RoundDownUsableSpace(size_t available_size) override {
+    size_t rv = available_size - kMetadataLen;
+
+    if (rv >= size_t{0xffffffc0}) {
+      // Max supported for this data structure implementation
+      rv = size_t{0xffffffc0};
+    }
+
+    // round down to multiple of 64 (block size)
+    rv &= ~size_t{63};
+
+    return rv + kMetadataLen;
   }
 
  private:
   // Compute num_probes after any rounding / adjustments
   int GetNumProbes(size_t keys, size_t len_with_metadata) {
-    uint64_t millibits = uint64_t{len_with_metadata - 5} * 8000;
+    uint64_t millibits = uint64_t{len_with_metadata - kMetadataLen} * 8000;
     int actual_millibits_per_key =
         static_cast<int>(millibits / std::max(keys, size_t{1}));
     // BEGIN XXX/TODO(peterd): preserving old/default behavior for now to
@@ -293,12 +462,12 @@ class FastLocalBloomBitsBuilder : public XXH3pFilterBitsBuilder {
   void AddAllEntries(char* data, uint32_t len, int num_probes) {
     // Simple version without prefetching:
     //
-    // for (auto h : hash_entries_) {
+    // for (auto h : hash_entries_info_.entries) {
     //   FastLocalBloomImpl::AddHash(Lower32of64(h), Upper32of64(h), len,
     //                               num_probes, data);
     // }
 
-    const size_t num_entries = hash_entries_.size();
+    const size_t num_entries = hash_entries_info_.entries.size();
     constexpr size_t kBufferMask = 7;
     static_assert(((kBufferMask + 1) & kBufferMask) == 0,
                   "Must be power of 2 minus 1");
@@ -308,12 +477,14 @@ class FastLocalBloomBitsBuilder : public XXH3pFilterBitsBuilder {
 
     // Prime the buffer
     size_t i = 0;
+    std::deque<uint64_t>::iterator hash_entries_it =
+        hash_entries_info_.entries.begin();
     for (; i <= kBufferMask && i < num_entries; ++i) {
-      uint64_t h = hash_entries_.front();
-      hash_entries_.pop_front();
+      uint64_t h = *hash_entries_it;
       FastLocalBloomImpl::PrepareHash(Lower32of64(h), len, data,
                                       /*out*/ &byte_offsets[i]);
       hashes[i] = Upper32of64(h);
+      ++hash_entries_it;
     }
 
     // Process and buffer
@@ -324,11 +495,11 @@ class FastLocalBloomBitsBuilder : public XXH3pFilterBitsBuilder {
       FastLocalBloomImpl::AddHashPrepared(hash_ref, num_probes,
                                           data + byte_offset_ref);
       // And buffer
-      uint64_t h = hash_entries_.front();
-      hash_entries_.pop_front();
+      uint64_t h = *hash_entries_it;
       FastLocalBloomImpl::PrepareHash(Lower32of64(h), len, data,
                                       /*out*/ &byte_offset_ref);
       hash_ref = Upper32of64(h);
+      ++hash_entries_it;
     }
 
     // Finish processing
@@ -340,13 +511,10 @@ class FastLocalBloomBitsBuilder : public XXH3pFilterBitsBuilder {
 
   // Target allocation per added key, in thousandths of a bit.
   int millibits_per_key_;
-  // See BloomFilterPolicy::aggregate_rounding_balance_. If nullptr,
-  // always "round up" like historic behavior.
-  std::atomic<int64_t>* aggregate_rounding_balance_;
 };
 
 // See description in FastLocalBloomImpl
-class FastLocalBloomBitsReader : public FilterBitsReader {
+class FastLocalBloomBitsReader : public BuiltinFilterBitsReader {
  public:
   FastLocalBloomBitsReader(const char* data, int num_probes, uint32_t len_bytes)
       : data_(data), num_probes_(num_probes), len_bytes_(len_bytes) {}
@@ -381,6 +549,11 @@ class FastLocalBloomBitsReader : public FilterBitsReader {
     }
   }
 
+  bool HashMayMatch(const uint64_t h) override {
+    return FastLocalBloomImpl::HashMayMatch(Lower32of64(h), Upper32of64(h),
+                                            len_bytes_, num_probes_, data_);
+  }
+
  private:
   const char* data_;
   const int num_probes_;
@@ -394,6 +567,7 @@ struct Standard128RibbonRehasherTypesAndSettings {
   // These are schema-critical. Any change almost certainly changes
   // underlying data.
   static constexpr bool kIsFilter = true;
+  static constexpr bool kHomogeneous = false;
   static constexpr bool kFirstCoeffAlwaysOne = true;
   static constexpr bool kUseSmash = false;
   using CoeffRow = ROCKSDB_NAMESPACE::Unsigned128;
@@ -410,14 +584,19 @@ struct Standard128RibbonRehasherTypesAndSettings {
 using Standard128RibbonTypesAndSettings =
     ribbon::StandardRehasherAdapter<Standard128RibbonRehasherTypesAndSettings>;
 
-class Standard128RibbonBitsBuilder : public XXH3pFilterBitsBuilder {
+class Standard128RibbonBitsBuilder : public XXPH3FilterBitsBuilder {
  public:
-  explicit Standard128RibbonBitsBuilder(double desired_one_in_fp_rate,
-                                        int bloom_millibits_per_key,
-                                        Logger* info_log)
-      : desired_one_in_fp_rate_(desired_one_in_fp_rate),
+  explicit Standard128RibbonBitsBuilder(
+      double desired_one_in_fp_rate, int bloom_millibits_per_key,
+      std::atomic<int64_t>* aggregate_rounding_balance,
+      std::shared_ptr<CacheReservationManager> cache_res_mgr,
+      bool detect_filter_construct_corruption, Logger* info_log)
+      : XXPH3FilterBitsBuilder(aggregate_rounding_balance, cache_res_mgr,
+                               detect_filter_construct_corruption),
+        desired_one_in_fp_rate_(desired_one_in_fp_rate),
         info_log_(info_log),
-        bloom_fallback_(bloom_millibits_per_key, nullptr) {
+        bloom_fallback_(bloom_millibits_per_key, aggregate_rounding_balance,
+                        cache_res_mgr, detect_filter_construct_corruption) {
     assert(desired_one_in_fp_rate >= 1.0);
   }
 
@@ -427,59 +606,122 @@ class Standard128RibbonBitsBuilder : public XXH3pFilterBitsBuilder {
 
   ~Standard128RibbonBitsBuilder() override {}
 
+  using FilterBitsBuilder::Finish;
+
   virtual Slice Finish(std::unique_ptr<const char[]>* buf) override {
-    // More than 2^30 entries (~1 billion) not supported
-    if (hash_entries_.size() >= (size_t{1} << 30)) {
-      ROCKS_LOG_WARN(info_log_, "Too many keys for Ribbon filter: %llu",
-                     static_cast<unsigned long long>(hash_entries_.size()));
+    return Finish(buf, nullptr);
+  }
+
+  virtual Slice Finish(std::unique_ptr<const char[]>* buf,
+                       Status* status) override {
+    if (hash_entries_info_.entries.size() > kMaxRibbonEntries) {
+      ROCKS_LOG_WARN(
+          info_log_, "Too many keys for Ribbon filter: %llu",
+          static_cast<unsigned long long>(hash_entries_info_.entries.size()));
       SwapEntriesWith(&bloom_fallback_);
-      assert(hash_entries_.empty());
-      return bloom_fallback_.Finish(buf);
+      assert(hash_entries_info_.entries.empty());
+      return bloom_fallback_.Finish(buf, status);
     }
-    if (hash_entries_.size() == 0) {
+    if (hash_entries_info_.entries.size() == 0) {
       // Save a conditional in Ribbon queries by using alternate reader
       // for zero entries added.
+      if (status) {
+        *status = Status::OK();
+      }
       return FinishAlwaysFalse(buf);
     }
-    uint32_t num_entries = static_cast<uint32_t>(hash_entries_.size());
-    uint32_t num_slots = BandingType::GetNumSlotsFor95PctSuccess(num_entries);
-    num_slots = SolnType::RoundUpNumSlots(num_slots);
+    uint32_t num_entries =
+        static_cast<uint32_t>(hash_entries_info_.entries.size());
+    uint32_t num_slots;
+    size_t len_with_metadata;
+
+    CalculateSpaceAndSlots(num_entries, &len_with_metadata, &num_slots);
+
+    // Bloom fall-back indicator
+    if (num_slots == 0) {
+      SwapEntriesWith(&bloom_fallback_);
+      assert(hash_entries_info_.entries.empty());
+      return bloom_fallback_.Finish(buf, status);
+    }
 
     uint32_t entropy = 0;
-    if (num_entries > 0) {
-      entropy = Lower32of64(hash_entries_.front());
+    if (!hash_entries_info_.entries.empty()) {
+      entropy = Lower32of64(hash_entries_info_.entries.front());
+    }
+
+    BandingType banding;
+    std::size_t bytes_banding = ribbon::StandardBanding<
+        Standard128RibbonTypesAndSettings>::EstimateMemoryUsage(num_slots);
+    Status status_banding_cache_res = Status::OK();
+
+    // Cache charging for banding
+    std::unique_ptr<CacheReservationManager::CacheReservationHandle>
+        banding_res_handle;
+    if (cache_res_mgr_) {
+      status_banding_cache_res = cache_res_mgr_->MakeCacheReservation(
+          bytes_banding, &banding_res_handle);
     }
-    size_t len = SolnType::GetBytesForOneInFpRate(
-        num_slots, desired_one_in_fp_rate_, /*rounding*/ entropy);
-    size_t len_with_metadata = len + 5;
 
-    // Use Bloom filter when it's better for small filters
-    if (num_slots < 1024 && bloom_fallback_.CalculateSpace(static_cast<int>(
-                                num_entries)) < len_with_metadata) {
+    if (status_banding_cache_res.IsMemoryLimit()) {
+      ROCKS_LOG_WARN(info_log_,
+                     "Cache charging for Ribbon filter banding failed due "
+                     "to cache full");
       SwapEntriesWith(&bloom_fallback_);
-      assert(hash_entries_.empty());
-      return bloom_fallback_.Finish(buf);
+      assert(hash_entries_info_.entries.empty());
+      // Release cache for banding since the banding won't be allocated
+      banding_res_handle.reset();
+      return bloom_fallback_.Finish(buf, status);
     }
 
-    BandingType banding;
+    TEST_SYNC_POINT_CALLBACK(
+        "XXPH3FilterBitsBuilder::Finish::"
+        "TamperHashEntries",
+        &hash_entries_info_.entries);
+
     bool success = banding.ResetAndFindSeedToSolve(
-        num_slots, hash_entries_.begin(), hash_entries_.end(),
+        num_slots, hash_entries_info_.entries.begin(),
+        hash_entries_info_.entries.end(),
         /*starting seed*/ entropy & 255, /*seed mask*/ 255);
     if (!success) {
-      ROCKS_LOG_WARN(info_log_,
-                     "Too many re-seeds (256) for Ribbon filter, %llu / %llu",
-                     static_cast<unsigned long long>(hash_entries_.size()),
-                     static_cast<unsigned long long>(num_slots));
+      ROCKS_LOG_WARN(
+          info_log_, "Too many re-seeds (256) for Ribbon filter, %llu / %llu",
+          static_cast<unsigned long long>(hash_entries_info_.entries.size()),
+          static_cast<unsigned long long>(num_slots));
       SwapEntriesWith(&bloom_fallback_);
-      assert(hash_entries_.empty());
-      return bloom_fallback_.Finish(buf);
+      assert(hash_entries_info_.entries.empty());
+      return bloom_fallback_.Finish(buf, status);
+    }
+
+    Status verify_hash_entries_checksum_status =
+        MaybeVerifyHashEntriesChecksum();
+    if (!verify_hash_entries_checksum_status.ok()) {
+      ROCKS_LOG_WARN(info_log_, "Verify hash entries checksum error: %s",
+                     verify_hash_entries_checksum_status.getState());
+      if (status) {
+        *status = verify_hash_entries_checksum_status;
+      }
+      return FinishAlwaysTrue(buf);
+    }
+
+    bool keep_entries_for_postverify = detect_filter_construct_corruption_;
+    if (!keep_entries_for_postverify) {
+      ResetEntries();
     }
-    hash_entries_.clear();
 
     uint32_t seed = banding.GetOrdinalSeed();
     assert(seed < 256);
 
-    std::unique_ptr<char[]> mutable_buf(new char[len_with_metadata]);
+    std::unique_ptr<char[]> mutable_buf;
+    std::unique_ptr<CacheReservationManager::CacheReservationHandle>
+        final_filter_cache_res_handle;
+    len_with_metadata =
+        AllocateMaybeRounding(len_with_metadata, num_entries, &mutable_buf);
+    // Cache charging for mutable_buf
+    if (cache_res_mgr_) {
+      Status s = cache_res_mgr_->MakeCacheReservation(
+          len_with_metadata * sizeof(char), &final_filter_cache_res_handle);
+      s.PermitUncheckedError();
+    }
 
     SolnType soln(mutable_buf.get(), len_with_metadata);
     soln.BackSubstFrom(banding);
@@ -494,39 +736,165 @@ class Standard128RibbonBitsBuilder : public XXH3pFilterBitsBuilder {
 
     // See BloomFilterPolicy::GetBloomBitsReader re: metadata
     // -2 = Marker for Standard128 Ribbon
-    mutable_buf[len] = static_cast<char>(-2);
+    mutable_buf[len_with_metadata - 5] = static_cast<char>(-2);
     // Hash seed
-    mutable_buf[len + 1] = static_cast<char>(seed);
+    mutable_buf[len_with_metadata - 4] = static_cast<char>(seed);
     // Number of blocks, in 24 bits
     // (Along with bytes, we can derive other settings)
-    mutable_buf[len + 2] = static_cast<char>(num_blocks & 255);
-    mutable_buf[len + 3] = static_cast<char>((num_blocks >> 8) & 255);
-    mutable_buf[len + 4] = static_cast<char>((num_blocks >> 16) & 255);
+    mutable_buf[len_with_metadata - 3] = static_cast<char>(num_blocks & 255);
+    mutable_buf[len_with_metadata - 2] =
+        static_cast<char>((num_blocks >> 8) & 255);
+    mutable_buf[len_with_metadata - 1] =
+        static_cast<char>((num_blocks >> 16) & 255);
+
+    auto TEST_arg_pair __attribute__((__unused__)) =
+        std::make_pair(&mutable_buf, len_with_metadata);
+    TEST_SYNC_POINT_CALLBACK("XXPH3FilterBitsBuilder::Finish::TamperFilter",
+                             &TEST_arg_pair);
 
     Slice rv(mutable_buf.get(), len_with_metadata);
     *buf = std::move(mutable_buf);
+    final_filter_cache_res_handles_.push_back(
+        std::move(final_filter_cache_res_handle));
+    if (status) {
+      *status = Status::OK();
+    }
     return rv;
   }
 
-  uint32_t CalculateSpace(const int num_entries) override {
-    // NB: the BuiltinFilterBitsBuilder API presumes len fits in uint32_t.
-    uint32_t num_slots =
-        NumEntriesToNumSlots(static_cast<uint32_t>(num_entries));
-    uint32_t ribbon = static_cast<uint32_t>(
-        SolnType::GetBytesForOneInFpRate(num_slots, desired_one_in_fp_rate_,
-                                         /*rounding*/ 0) +
-        /*metadata*/ 5);
+  // Setting num_slots to 0 means "fall back on Bloom filter."
+  // And note this implementation does not support num_entries or num_slots
+  // beyond uint32_t; see kMaxRibbonEntries.
+  void CalculateSpaceAndSlots(size_t num_entries,
+                              size_t* target_len_with_metadata,
+                              uint32_t* num_slots) {
+    if (num_entries > kMaxRibbonEntries) {
+      // More entries than supported by this Ribbon
+      *num_slots = 0;  // use Bloom
+      *target_len_with_metadata = bloom_fallback_.CalculateSpace(num_entries);
+      return;
+    }
+    uint32_t entropy = 0;
+    if (!hash_entries_info_.entries.empty()) {
+      entropy = Upper32of64(hash_entries_info_.entries.front());
+    }
+
+    *num_slots = NumEntriesToNumSlots(static_cast<uint32_t>(num_entries));
+    *target_len_with_metadata =
+        SolnType::GetBytesForOneInFpRate(*num_slots, desired_one_in_fp_rate_,
+                                         /*rounding*/ entropy) +
+        kMetadataLen;
+
     // Consider possible Bloom fallback for small filters
-    if (num_slots < 1024) {
-      uint32_t bloom = bloom_fallback_.CalculateSpace(num_entries);
-      return std::min(bloom, ribbon);
+    if (*num_slots < 1024) {
+      size_t bloom = bloom_fallback_.CalculateSpace(num_entries);
+      if (bloom < *target_len_with_metadata) {
+        *num_slots = 0;  // use Bloom
+        *target_len_with_metadata = bloom;
+        return;
+      }
+    }
+  }
+
+  size_t CalculateSpace(size_t num_entries) override {
+    if (num_entries == 0) {
+      // See FinishAlwaysFalse
+      return 0;
+    }
+    size_t target_len_with_metadata;
+    uint32_t num_slots;
+    CalculateSpaceAndSlots(num_entries, &target_len_with_metadata, &num_slots);
+    (void)num_slots;
+    return target_len_with_metadata;
+  }
+
+  // This is a somewhat ugly but reasonably fast and reasonably accurate
+  // reversal of CalculateSpace.
+  size_t ApproximateNumEntries(size_t bytes) override {
+    size_t len_no_metadata =
+        RoundDownUsableSpace(std::max(bytes, size_t{kMetadataLen})) -
+        kMetadataLen;
+
+    if (!(desired_one_in_fp_rate_ > 1.0)) {
+      // Effectively asking for 100% FP rate, or NaN etc.
+      // Note that NaN is neither < 1.0 nor > 1.0
+      return kMaxRibbonEntries;
+    }
+
+    // Find a slight under-estimate for actual average bits per slot
+    double min_real_bits_per_slot;
+    if (desired_one_in_fp_rate_ >= 1.0 + std::numeric_limits<uint32_t>::max()) {
+      // Max of 32 solution columns (result bits)
+      min_real_bits_per_slot = 32.0;
     } else {
-      return ribbon;
+      // Account for mix of b and b+1 solution columns being slightly
+      // suboptimal vs. ideal log2(1/fp_rate) bits.
+      uint32_t rounded = static_cast<uint32_t>(desired_one_in_fp_rate_);
+      int upper_bits_per_key = 1 + FloorLog2(rounded);
+      double fp_rate_for_upper = std::pow(2.0, -upper_bits_per_key);
+      double portion_lower =
+          (1.0 / desired_one_in_fp_rate_ - fp_rate_for_upper) /
+          fp_rate_for_upper;
+      min_real_bits_per_slot = upper_bits_per_key - portion_lower;
+      assert(min_real_bits_per_slot > 0.0);
+      assert(min_real_bits_per_slot <= 32.0);
+    }
+
+    // An overestimate, but this should only be O(1) slots away from truth.
+    double max_slots = len_no_metadata * 8.0 / min_real_bits_per_slot;
+
+    // Let's not bother accounting for overflow to Bloom filter
+    // (Includes NaN case)
+    if (!(max_slots < ConfigHelper::GetNumSlots(kMaxRibbonEntries))) {
+      return kMaxRibbonEntries;
+    }
+
+    // Set up for short iteration
+    uint32_t slots = static_cast<uint32_t>(max_slots);
+    slots = SolnType::RoundUpNumSlots(slots);
+
+    // Assert that we have a valid upper bound on slots
+    assert(SolnType::GetBytesForOneInFpRate(
+               SolnType::RoundUpNumSlots(slots + 1), desired_one_in_fp_rate_,
+               /*rounding*/ 0) > len_no_metadata);
+
+    // Iterate up to a few times to rather precisely account for small effects
+    for (int i = 0; slots > 0; ++i) {
+      size_t reqd_bytes =
+          SolnType::GetBytesForOneInFpRate(slots, desired_one_in_fp_rate_,
+                                           /*rounding*/ 0);
+      if (reqd_bytes <= len_no_metadata) {
+        break;  // done
+      }
+      if (i >= 2) {
+        // should have been enough iterations
+        assert(false);
+        break;
+      }
+      slots = SolnType::RoundDownNumSlots(slots - 1);
+    }
+
+    uint32_t num_entries = ConfigHelper::GetNumToAdd(slots);
+
+    // Consider possible Bloom fallback for small filters
+    if (slots < 1024) {
+      size_t bloom = bloom_fallback_.ApproximateNumEntries(bytes);
+      if (bloom > num_entries) {
+        return bloom;
+      } else {
+        return num_entries;
+      }
+    } else {
+      return std::min(num_entries, kMaxRibbonEntries);
     }
   }
 
   double EstimatedFpRate(size_t num_entries,
                          size_t len_with_metadata) override {
+    if (num_entries > kMaxRibbonEntries) {
+      // More entries than supported by this Ribbon
+      return bloom_fallback_.EstimatedFpRate(num_entries, len_with_metadata);
+    }
     uint32_t num_slots =
         NumEntriesToNumSlots(static_cast<uint32_t>(num_entries));
     SolnType fake_soln(nullptr, len_with_metadata);
@@ -534,16 +902,42 @@ class Standard128RibbonBitsBuilder : public XXH3pFilterBitsBuilder {
     return fake_soln.ExpectedFpRate();
   }
 
+  Status MaybePostVerify(const Slice& filter_content) override {
+    bool fall_back = (bloom_fallback_.EstimateEntriesAdded() > 0);
+    return fall_back ? bloom_fallback_.MaybePostVerify(filter_content)
+                     : XXPH3FilterBitsBuilder::MaybePostVerify(filter_content);
+  }
+
+ protected:
+  size_t RoundDownUsableSpace(size_t available_size) override {
+    size_t rv = available_size - kMetadataLen;
+
+    // round down to multiple of 16 (segment size)
+    rv &= ~size_t{15};
+
+    return rv + kMetadataLen;
+  }
+
  private:
   using TS = Standard128RibbonTypesAndSettings;
   using SolnType = ribbon::SerializableInterleavedSolution<TS>;
   using BandingType = ribbon::StandardBanding<TS>;
+  using ConfigHelper = ribbon::BandingConfigHelper1TS<ribbon::kOneIn20, TS>;
 
   static uint32_t NumEntriesToNumSlots(uint32_t num_entries) {
-    uint32_t num_slots1 = BandingType::GetNumSlotsFor95PctSuccess(num_entries);
+    uint32_t num_slots1 = ConfigHelper::GetNumSlots(num_entries);
     return SolnType::RoundUpNumSlots(num_slots1);
   }
 
+  // Approximate num_entries to ensure number of bytes fits in 32 bits, which
+  // is not an inherent limitation but does ensure somewhat graceful Bloom
+  // fallback for crazy high number of entries, since the Bloom implementation
+  // does not support number of bytes bigger than fits in 32 bits. This is
+  // within an order of magnitude of implementation limit on num_slots
+  // fitting in 32 bits, and even closer for num_blocks fitting in 24 bits
+  // (for filter metadata).
+  static constexpr uint32_t kMaxRibbonEntries = 950000000;  // ~ 1 billion
+
   // A desired value for 1/fp_rate. For example, 100 -> 1% fp rate.
   double desired_one_in_fp_rate_;
 
@@ -555,7 +949,10 @@ class Standard128RibbonBitsBuilder : public XXH3pFilterBitsBuilder {
   FastLocalBloomBitsBuilder bloom_fallback_;
 };
 
-class Standard128RibbonBitsReader : public FilterBitsReader {
+// for the linker, at least with DEBUG_LEVEL=2
+constexpr uint32_t Standard128RibbonBitsBuilder::kMaxRibbonEntries;
+
+class Standard128RibbonBitsReader : public BuiltinFilterBitsReader {
  public:
   Standard128RibbonBitsReader(const char* data, size_t len_bytes,
                               uint32_t num_blocks, uint32_t seed)
@@ -576,16 +973,29 @@ class Standard128RibbonBitsReader : public FilterBitsReader {
   }
 
   virtual void MayMatch(int num_keys, Slice** keys, bool* may_match) override {
-    std::array<uint64_t, MultiGetContext::MAX_BATCH_SIZE> hashes;
+    struct SavedData {
+      uint64_t seeded_hash;
+      uint32_t segment_num;
+      uint32_t num_columns;
+      uint32_t start_bits;
+    };
+    std::array<SavedData, MultiGetContext::MAX_BATCH_SIZE> saved;
     for (int i = 0; i < num_keys; ++i) {
-      hashes[i] = GetSliceHash64(*keys[i]);
-      // FIXME: batched get optimization
+      ribbon::InterleavedPrepareQuery(
+          GetSliceHash64(*keys[i]), hasher_, soln_, &saved[i].seeded_hash,
+          &saved[i].segment_num, &saved[i].num_columns, &saved[i].start_bits);
     }
     for (int i = 0; i < num_keys; ++i) {
-      may_match[i] = soln_.FilterQuery(hashes[i], hasher_);
+      may_match[i] = ribbon::InterleavedFilterQuery(
+          saved[i].seeded_hash, saved[i].segment_num, saved[i].num_columns,
+          saved[i].start_bits, hasher_, soln_);
     }
   }
 
+  bool HashMayMatch(const uint64_t h) override {
+    return soln_.FilterQuery(h, hasher_);
+  }
+
  private:
   using TS = Standard128RibbonTypesAndSettings;
   ribbon::SerializableInterleavedSolution<TS> soln_;
@@ -608,21 +1018,27 @@ class LegacyBloomBitsBuilder : public BuiltinFilterBitsBuilder {
 
   void AddKey(const Slice& key) override;
 
-  Slice Finish(std::unique_ptr<const char[]>* buf) override;
+  virtual size_t EstimateEntriesAdded() override {
+    return hash_entries_.size();
+  }
 
-  int CalculateNumEntry(const uint32_t bytes) override;
+  using FilterBitsBuilder::Finish;
 
-  uint32_t CalculateSpace(const int num_entry) override {
+  Slice Finish(std::unique_ptr<const char[]>* buf) override;
+
+  size_t CalculateSpace(size_t num_entries) override {
     uint32_t dont_care1;
     uint32_t dont_care2;
-    return CalculateSpace(num_entry, &dont_care1, &dont_care2);
+    return CalculateSpace(num_entries, &dont_care1, &dont_care2);
   }
 
   double EstimatedFpRate(size_t keys, size_t bytes) override {
-    return LegacyBloomImpl::EstimatedFpRate(keys, bytes - /*metadata*/ 5,
+    return LegacyBloomImpl::EstimatedFpRate(keys, bytes - kMetadataLen,
                                             num_probes_);
   }
 
+  size_t ApproximateNumEntries(size_t bytes) override;
+
  private:
   int bits_per_key_;
   int num_probes_;
@@ -633,11 +1049,11 @@ class LegacyBloomBitsBuilder : public BuiltinFilterBitsBuilder {
   uint32_t GetTotalBitsForLocality(uint32_t total_bits);
 
   // Reserve space for new filter
-  char* ReserveSpace(const int num_entry, uint32_t* total_bits,
+  char* ReserveSpace(size_t num_entries, uint32_t* total_bits,
                      uint32_t* num_lines);
 
   // Implementation-specific variant of public CalculateSpace
-  uint32_t CalculateSpace(const int num_entry, uint32_t* total_bits,
+  uint32_t CalculateSpace(size_t num_entries, uint32_t* total_bits,
                           uint32_t* num_lines);
 
   // Assuming single threaded access to this function.
@@ -705,7 +1121,29 @@ Slice LegacyBloomBitsBuilder::Finish(std::unique_ptr<const char[]>* buf) {
   buf->reset(const_data);
   hash_entries_.clear();
 
-  return Slice(data, total_bits / 8 + 5);
+  return Slice(data, total_bits / 8 + kMetadataLen);
+}
+
+size_t LegacyBloomBitsBuilder::ApproximateNumEntries(size_t bytes) {
+  assert(bits_per_key_);
+  assert(bytes > 0);
+
+  uint64_t total_bits_tmp = bytes * 8;
+  // total bits, including temporary computations, cannot exceed 2^32
+  // for compatibility
+  total_bits_tmp = std::min(total_bits_tmp, uint64_t{0xffff0000});
+
+  uint32_t high = static_cast<uint32_t>(total_bits_tmp) /
+                      static_cast<uint32_t>(bits_per_key_) +
+                  1;
+  uint32_t low = 1;
+  uint32_t n = high;
+  for (; n >= low; n--) {
+    if (CalculateSpace(n) <= bytes) {
+      break;
+    }
+  }
+  return n;
 }
 
 uint32_t LegacyBloomBitsBuilder::GetTotalBitsForLocality(uint32_t total_bits) {
@@ -720,14 +1158,18 @@ uint32_t LegacyBloomBitsBuilder::GetTotalBitsForLocality(uint32_t total_bits) {
   return num_lines * (CACHE_LINE_SIZE * 8);
 }
 
-uint32_t LegacyBloomBitsBuilder::CalculateSpace(const int num_entry,
+uint32_t LegacyBloomBitsBuilder::CalculateSpace(size_t num_entries,
                                                 uint32_t* total_bits,
                                                 uint32_t* num_lines) {
   assert(bits_per_key_);
-  if (num_entry != 0) {
-    uint32_t total_bits_tmp = static_cast<uint32_t>(num_entry * bits_per_key_);
-
-    *total_bits = GetTotalBitsForLocality(total_bits_tmp);
+  if (num_entries != 0) {
+    size_t total_bits_tmp = num_entries * bits_per_key_;
+    // total bits, including temporary computations, cannot exceed 2^32
+    // for compatibility
+    total_bits_tmp = std::min(total_bits_tmp, size_t{0xffff0000});
+
+    *total_bits =
+        GetTotalBitsForLocality(static_cast<uint32_t>(total_bits_tmp));
     *num_lines = *total_bits / (CACHE_LINE_SIZE * 8);
     assert(*total_bits > 0 && *total_bits % 8 == 0);
   } else {
@@ -738,34 +1180,19 @@ uint32_t LegacyBloomBitsBuilder::CalculateSpace(const int num_entry,
 
   // Reserve space for Filter
   uint32_t sz = *total_bits / 8;
-  sz += 5;  // 4 bytes for num_lines, 1 byte for num_probes
+  sz += kMetadataLen;  // 4 bytes for num_lines, 1 byte for num_probes
   return sz;
 }
 
-char* LegacyBloomBitsBuilder::ReserveSpace(const int num_entry,
+char* LegacyBloomBitsBuilder::ReserveSpace(size_t num_entries,
                                            uint32_t* total_bits,
                                            uint32_t* num_lines) {
-  uint32_t sz = CalculateSpace(num_entry, total_bits, num_lines);
+  uint32_t sz = CalculateSpace(num_entries, total_bits, num_lines);
   char* data = new char[sz];
   memset(data, 0, sz);
   return data;
 }
 
-int LegacyBloomBitsBuilder::CalculateNumEntry(const uint32_t bytes) {
-  assert(bits_per_key_);
-  assert(bytes > 0);
-  int high = static_cast<int>(bytes * 8 / bits_per_key_ + 1);
-  int low = 1;
-  int n = high;
-  for (; n >= low; n--) {
-    if (CalculateSpace(n) <= bytes) {
-      break;
-    }
-  }
-  assert(n < high);  // High should be an overestimation
-  return n;
-}
-
 inline void LegacyBloomBitsBuilder::AddHash(uint32_t h, char* data,
                                             uint32_t num_lines,
                                             uint32_t total_bits) {
@@ -775,10 +1202,10 @@ inline void LegacyBloomBitsBuilder::AddHash(uint32_t h, char* data,
   assert(num_lines > 0 && total_bits > 0);
 
   LegacyBloomImpl::AddHash(h, num_lines, num_probes_, data,
-                           folly::constexpr_log2(CACHE_LINE_SIZE));
+                           ConstexprFloorLog2(CACHE_LINE_SIZE));
 }
 
-class LegacyBloomBitsReader : public FilterBitsReader {
+class LegacyBloomBitsReader : public BuiltinFilterBitsReader {
  public:
   LegacyBloomBitsReader(const char* data, int num_probes, uint32_t num_lines,
                         uint32_t log2_cache_line_size)
@@ -823,6 +1250,8 @@ class LegacyBloomBitsReader : public FilterBitsReader {
     }
   }
 
+  bool HashMayMatch(const uint64_t /* h */) override { return false; }
+
  private:
   const char* data_;
   const int num_probes_;
@@ -830,37 +1259,79 @@ class LegacyBloomBitsReader : public FilterBitsReader {
   const uint32_t log2_cache_line_size_;
 };
 
-class AlwaysTrueFilter : public FilterBitsReader {
+class AlwaysTrueFilter : public BuiltinFilterBitsReader {
  public:
   bool MayMatch(const Slice&) override { return true; }
   using FilterBitsReader::MayMatch;  // inherit overload
+  bool HashMayMatch(const uint64_t) override { return true; }
+  using BuiltinFilterBitsReader::HashMayMatch;  // inherit overload
 };
 
-class AlwaysFalseFilter : public FilterBitsReader {
+class AlwaysFalseFilter : public BuiltinFilterBitsReader {
  public:
   bool MayMatch(const Slice&) override { return false; }
   using FilterBitsReader::MayMatch;  // inherit overload
+  bool HashMayMatch(const uint64_t) override { return false; }
+  using BuiltinFilterBitsReader::HashMayMatch;  // inherit overload
 };
 
+Status XXPH3FilterBitsBuilder::MaybePostVerify(const Slice& filter_content) {
+  Status s = Status::OK();
+
+  if (!detect_filter_construct_corruption_) {
+    return s;
+  }
+
+  std::unique_ptr<BuiltinFilterBitsReader> bits_reader(
+      BuiltinFilterPolicy::GetBuiltinFilterBitsReader(filter_content));
+
+  for (uint64_t h : hash_entries_info_.entries) {
+    // The current approach will not detect corruption from XXPH3Filter to
+    // AlwaysTrueFilter, which can lead to performance cost later due to
+    // AlwaysTrueFilter not filtering anything. But this cost is acceptable
+    // given the extra implementation complixity to detect such case.
+    bool may_match = bits_reader->HashMayMatch(h);
+    if (!may_match) {
+      s = Status::Corruption("Corrupted filter content");
+      break;
+    }
+  }
+
+  ResetEntries();
+  return s;
+}
 }  // namespace
 
-const std::vector<BloomFilterPolicy::Mode> BloomFilterPolicy::kAllFixedImpls = {
-    kLegacyBloom,
-    kDeprecatedBlock,
-    kFastLocalBloom,
-    kStandard128Ribbon,
-};
+const char* BuiltinFilterPolicy::kClassName() {
+  return "rocksdb.internal.BuiltinFilter";
+}
 
-const std::vector<BloomFilterPolicy::Mode> BloomFilterPolicy::kAllUserModes = {
-    kDeprecatedBlock,
-    kAutoBloom,
-    kStandard128Ribbon,
-};
+bool BuiltinFilterPolicy::IsInstanceOf(const std::string& name) const {
+  if (name == kClassName()) {
+    return true;
+  } else {
+    return FilterPolicy::IsInstanceOf(name);
+  }
+}
+
+static const char* kBuiltinFilterMetadataName = "rocksdb.BuiltinBloomFilter";
 
-BloomFilterPolicy::BloomFilterPolicy(double bits_per_key, Mode mode)
-    : mode_(mode), warned_(false), aggregate_rounding_balance_(0) {
+const char* BuiltinFilterPolicy::kCompatibilityName() {
+  return kBuiltinFilterMetadataName;
+}
+
+const char* BuiltinFilterPolicy::CompatibilityName() const {
+  return kBuiltinFilterMetadataName;
+}
+
+BloomLikeFilterPolicy::BloomLikeFilterPolicy(double bits_per_key)
+    : warned_(false), aggregate_rounding_balance_(0) {
   // Sanitize bits_per_key
-  if (bits_per_key < 1.0) {
+  if (bits_per_key < 0.5) {
+    // Round down to no filter
+    bits_per_key = 0;
+  } else if (bits_per_key < 1.0) {
+    // Minimum 1 bit per key (equiv) when creating filter
     bits_per_key = 1.0;
   } else if (!(bits_per_key < 100.0)) {  // including NaN
     bits_per_key = 100.0;
@@ -886,127 +1357,146 @@ BloomFilterPolicy::BloomFilterPolicy(double bits_per_key, Mode mode)
   whole_bits_per_key_ = (millibits_per_key_ + 500) / 1000;
 }
 
-BloomFilterPolicy::~BloomFilterPolicy() {}
-
-const char* BloomFilterPolicy::Name() const {
-  return "rocksdb.BuiltinBloomFilter";
+BloomLikeFilterPolicy::~BloomLikeFilterPolicy() {}
+const char* BloomLikeFilterPolicy::kClassName() {
+  return "rocksdb.internal.BloomLikeFilter";
 }
 
-void BloomFilterPolicy::CreateFilter(const Slice* keys, int n,
-                                     std::string* dst) const {
-  // We should ideally only be using this deprecated interface for
-  // appropriately constructed BloomFilterPolicy
-  assert(mode_ == kDeprecatedBlock);
-
-  // Compute bloom filter size (in both bits and bytes)
-  uint32_t bits = static_cast<uint32_t>(n * whole_bits_per_key_);
+bool BloomLikeFilterPolicy::IsInstanceOf(const std::string& name) const {
+  if (name == kClassName()) {
+    return true;
+  } else {
+    return BuiltinFilterPolicy::IsInstanceOf(name);
+  }
+}
 
-  // For small n, we can see a very high false positive rate.  Fix it
-  // by enforcing a minimum bloom filter length.
-  if (bits < 64) bits = 64;
+const char* ReadOnlyBuiltinFilterPolicy::kClassName() {
+  return kBuiltinFilterMetadataName;
+}
 
-  uint32_t bytes = (bits + 7) / 8;
-  bits = bytes * 8;
+std::string BloomLikeFilterPolicy::GetId() const {
+  return Name() + GetBitsPerKeySuffix();
+}
 
-  int num_probes =
-      LegacyNoLocalityBloomImpl::ChooseNumProbes(whole_bits_per_key_);
+BloomFilterPolicy::BloomFilterPolicy(double bits_per_key)
+    : BloomLikeFilterPolicy(bits_per_key) {}
 
-  const size_t init_size = dst->size();
-  dst->resize(init_size + bytes, 0);
-  dst->push_back(static_cast<char>(num_probes));  // Remember # of probes
-  char* array = &(*dst)[init_size];
-  for (int i = 0; i < n; i++) {
-    LegacyNoLocalityBloomImpl::AddHash(BloomHash(keys[i]), bits, num_probes,
-                                       array);
+FilterBitsBuilder* BloomFilterPolicy::GetBuilderWithContext(
+    const FilterBuildingContext& context) const {
+  if (GetMillibitsPerKey() == 0) {
+    // "No filter" special case
+    return nullptr;
+  } else if (context.table_options.format_version < 5) {
+    return GetLegacyBloomBuilderWithContext(context);
+  } else {
+    return GetFastLocalBloomBuilderWithContext(context);
   }
 }
 
-bool BloomFilterPolicy::KeyMayMatch(const Slice& key,
-                                    const Slice& bloom_filter) const {
-  const size_t len = bloom_filter.size();
-  if (len < 2 || len > 0xffffffffU) {
-    return false;
-  }
+const char* BloomFilterPolicy::kClassName() { return "bloomfilter"; }
+const char* BloomFilterPolicy::kNickName() { return "rocksdb.BloomFilter"; }
 
-  const char* array = bloom_filter.data();
-  const uint32_t bits = static_cast<uint32_t>(len - 1) * 8;
+std::string BloomFilterPolicy::GetId() const {
+  // Including ":false" for better forward-compatibility with 6.29 and earlier
+  // which required a boolean `use_block_based_builder` parameter
+  return BloomLikeFilterPolicy::GetId() + ":false";
+}
 
-  // Use the encoded k so that we can read filters generated by
-  // bloom filters created using different parameters.
-  const int k = static_cast<uint8_t>(array[len - 1]);
-  if (k > 30) {
-    // Reserved for potentially new encodings for short bloom filters.
-    // Consider it a match.
-    return true;
+FilterBitsBuilder* BloomLikeFilterPolicy::GetFastLocalBloomBuilderWithContext(
+    const FilterBuildingContext& context) const {
+  bool offm = context.table_options.optimize_filters_for_memory;
+  const auto options_overrides_iter =
+      context.table_options.cache_usage_options.options_overrides.find(
+          CacheEntryRole::kFilterConstruction);
+  const auto filter_construction_charged =
+      options_overrides_iter !=
+              context.table_options.cache_usage_options.options_overrides.end()
+          ? options_overrides_iter->second.charged
+          : context.table_options.cache_usage_options.options.charged;
+
+  std::shared_ptr<CacheReservationManager> cache_res_mgr;
+  if (context.table_options.block_cache &&
+      filter_construction_charged ==
+          CacheEntryRoleOptions::Decision::kEnabled) {
+    cache_res_mgr = std::make_shared<
+        CacheReservationManagerImpl<CacheEntryRole::kFilterConstruction>>(
+        context.table_options.block_cache);
   }
-  // NB: using stored k not num_probes for whole_bits_per_key_
-  return LegacyNoLocalityBloomImpl::HashMayMatch(BloomHash(key), bits, k,
-                                                 array);
+  return new FastLocalBloomBitsBuilder(
+      millibits_per_key_, offm ? &aggregate_rounding_balance_ : nullptr,
+      cache_res_mgr, context.table_options.detect_filter_construct_corruption);
 }
 
-FilterBitsBuilder* BloomFilterPolicy::GetFilterBitsBuilder() const {
-  // This code path should no longer be used, for the built-in
-  // BloomFilterPolicy. Internal to RocksDB and outside
-  // BloomFilterPolicy, only get a FilterBitsBuilder with
-  // BloomFilterPolicy::GetBuilderFromContext(), which will call
-  // BloomFilterPolicy::GetBuilderWithContext(). RocksDB users have
-  // been warned (HISTORY.md) that they can no longer call this on
-  // the built-in BloomFilterPolicy (unlikely).
-  assert(false);
-  return GetBuilderWithContext(FilterBuildingContext(BlockBasedTableOptions()));
+FilterBitsBuilder* BloomLikeFilterPolicy::GetLegacyBloomBuilderWithContext(
+    const FilterBuildingContext& context) const {
+  if (whole_bits_per_key_ >= 14 && context.info_log &&
+      !warned_.load(std::memory_order_relaxed)) {
+    warned_ = true;
+    const char* adjective;
+    if (whole_bits_per_key_ >= 20) {
+      adjective = "Dramatic";
+    } else {
+      adjective = "Significant";
+    }
+    // For more details, see
+    // https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter
+    ROCKS_LOG_WARN(context.info_log,
+                   "Using legacy Bloom filter with high (%d) bits/key. "
+                   "%s filter space and/or accuracy improvement is available "
+                   "with format_version>=5.",
+                   whole_bits_per_key_, adjective);
+  }
+  return new LegacyBloomBitsBuilder(whole_bits_per_key_, context.info_log);
 }
 
-FilterBitsBuilder* BloomFilterPolicy::GetBuilderWithContext(
+FilterBitsBuilder*
+BloomLikeFilterPolicy::GetStandard128RibbonBuilderWithContext(
     const FilterBuildingContext& context) const {
-  Mode cur = mode_;
+  // FIXME: code duplication with GetFastLocalBloomBuilderWithContext
   bool offm = context.table_options.optimize_filters_for_memory;
-  // Unusual code construction so that we can have just
-  // one exhaustive switch without (risky) recursion
-  for (int i = 0; i < 2; ++i) {
-    switch (cur) {
-      case kAutoBloom:
-        if (context.table_options.format_version < 5) {
-          cur = kLegacyBloom;
-        } else {
-          cur = kFastLocalBloom;
-        }
-        break;
-      case kDeprecatedBlock:
-        return nullptr;
-      case kFastLocalBloom:
-        return new FastLocalBloomBitsBuilder(
-            millibits_per_key_, offm ? &aggregate_rounding_balance_ : nullptr);
-      case kLegacyBloom:
-        if (whole_bits_per_key_ >= 14 && context.info_log &&
-            !warned_.load(std::memory_order_relaxed)) {
-          warned_ = true;
-          const char* adjective;
-          if (whole_bits_per_key_ >= 20) {
-            adjective = "Dramatic";
-          } else {
-            adjective = "Significant";
-          }
-          // For more details, see
-          // https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter
-          ROCKS_LOG_WARN(
-              context.info_log,
-              "Using legacy Bloom filter with high (%d) bits/key. "
-              "%s filter space and/or accuracy improvement is available "
-              "with format_version>=5.",
-              whole_bits_per_key_, adjective);
-        }
-        return new LegacyBloomBitsBuilder(whole_bits_per_key_,
-                                          context.info_log);
-      case kStandard128Ribbon:
-        return new Standard128RibbonBitsBuilder(
-            desired_one_in_fp_rate_, millibits_per_key_, context.info_log);
+  const auto options_overrides_iter =
+      context.table_options.cache_usage_options.options_overrides.find(
+          CacheEntryRole::kFilterConstruction);
+  const auto filter_construction_charged =
+      options_overrides_iter !=
+              context.table_options.cache_usage_options.options_overrides.end()
+          ? options_overrides_iter->second.charged
+          : context.table_options.cache_usage_options.options.charged;
+
+  std::shared_ptr<CacheReservationManager> cache_res_mgr;
+  if (context.table_options.block_cache &&
+      filter_construction_charged ==
+          CacheEntryRoleOptions::Decision::kEnabled) {
+    cache_res_mgr = std::make_shared<
+        CacheReservationManagerImpl<CacheEntryRole::kFilterConstruction>>(
+        context.table_options.block_cache);
+  }
+  return new Standard128RibbonBitsBuilder(
+      desired_one_in_fp_rate_, millibits_per_key_,
+      offm ? &aggregate_rounding_balance_ : nullptr, cache_res_mgr,
+      context.table_options.detect_filter_construct_corruption,
+      context.info_log);
+}
+
+std::string BloomLikeFilterPolicy::GetBitsPerKeySuffix() const {
+  std::string rv = ":" + std::to_string(millibits_per_key_ / 1000);
+  int frac = millibits_per_key_ % 1000;
+  if (frac > 0) {
+    rv.push_back('.');
+    rv.push_back(static_cast<char>('0' + (frac / 100)));
+    frac %= 100;
+    if (frac > 0) {
+      rv.push_back(static_cast<char>('0' + (frac / 10)));
+      frac %= 10;
+      if (frac > 0) {
+        rv.push_back(static_cast<char>('0' + frac));
+      }
     }
   }
-  assert(false);
-  return nullptr;  // something legal
+  return rv;
 }
 
-FilterBitsBuilder* BloomFilterPolicy::GetBuilderFromContext(
+FilterBitsBuilder* BuiltinFilterPolicy::GetBuilderFromContext(
     const FilterBuildingContext& context) {
   if (context.table_options.filter_policy) {
     return context.table_options.filter_policy->GetBuilderWithContext(context);
@@ -1015,12 +1505,54 @@ FilterBitsBuilder* BloomFilterPolicy::GetBuilderFromContext(
   }
 }
 
-// Read metadata to determine what kind of FilterBitsReader is needed
-// and return a new one.
-FilterBitsReader* BloomFilterPolicy::GetFilterBitsReader(
-    const Slice& contents) const {
+// For testing only, but always constructable with internal names
+namespace test {
+
+const char* LegacyBloomFilterPolicy::kClassName() {
+  return "rocksdb.internal.LegacyBloomFilter";
+}
+
+FilterBitsBuilder* LegacyBloomFilterPolicy::GetBuilderWithContext(
+    const FilterBuildingContext& context) const {
+  if (GetMillibitsPerKey() == 0) {
+    // "No filter" special case
+    return nullptr;
+  }
+  return GetLegacyBloomBuilderWithContext(context);
+}
+
+const char* FastLocalBloomFilterPolicy::kClassName() {
+  return "rocksdb.internal.FastLocalBloomFilter";
+}
+
+FilterBitsBuilder* FastLocalBloomFilterPolicy::GetBuilderWithContext(
+    const FilterBuildingContext& context) const {
+  if (GetMillibitsPerKey() == 0) {
+    // "No filter" special case
+    return nullptr;
+  }
+  return GetFastLocalBloomBuilderWithContext(context);
+}
+
+const char* Standard128RibbonFilterPolicy::kClassName() {
+  return "rocksdb.internal.Standard128RibbonFilter";
+}
+
+FilterBitsBuilder* Standard128RibbonFilterPolicy::GetBuilderWithContext(
+    const FilterBuildingContext& context) const {
+  if (GetMillibitsPerKey() == 0) {
+    // "No filter" special case
+    return nullptr;
+  }
+  return GetStandard128RibbonBuilderWithContext(context);
+}
+
+}  // namespace test
+
+BuiltinFilterBitsReader* BuiltinFilterPolicy::GetBuiltinFilterBitsReader(
+    const Slice& contents) {
   uint32_t len_with_meta = static_cast<uint32_t>(contents.size());
-  if (len_with_meta <= 5) {
+  if (len_with_meta <= kMetadataLen) {
     // filter is empty or broken. Treat like zero keys added.
     return new AlwaysFalseFilter();
   }
@@ -1038,7 +1570,7 @@ FilterBitsReader* BloomFilterPolicy::GetFilterBitsReader(
   // len_with_meta +-----------------------------------+
 
   int8_t raw_num_probes =
-      static_cast<int8_t>(contents.data()[len_with_meta - 5]);
+      static_cast<int8_t>(contents.data()[len_with_meta - kMetadataLen]);
   // NB: *num_probes > 30 and < 128 probably have not been used, because of
   // BloomFilterPolicy::initialize, unless directly calling
   // LegacyBloomBitsBuilder as an API, but we are leaving those cases in
@@ -1068,7 +1600,7 @@ FilterBitsReader* BloomFilterPolicy::GetFilterBitsReader(
   assert(num_probes >= 1);
   assert(num_probes <= 127);
 
-  uint32_t len = len_with_meta - 5;
+  uint32_t len = len_with_meta - kMetadataLen;
   assert(len > 0);
 
   uint32_t num_lines = DecodeFixed32(contents.data() + len_with_meta - 4);
@@ -1076,7 +1608,7 @@ FilterBitsReader* BloomFilterPolicy::GetFilterBitsReader(
 
   if (num_lines * CACHE_LINE_SIZE == len) {
     // Common case
-    log2_cache_line_size = folly::constexpr_log2(CACHE_LINE_SIZE);
+    log2_cache_line_size = ConstexprFloorLog2(CACHE_LINE_SIZE);
   } else if (num_lines == 0 || len % num_lines != 0) {
     // Invalid (no solution to num_lines * x == len)
     // Treat as zero probes (always FP) for now.
@@ -1098,10 +1630,17 @@ FilterBitsReader* BloomFilterPolicy::GetFilterBitsReader(
                                    log2_cache_line_size);
 }
 
-FilterBitsReader* BloomFilterPolicy::GetRibbonBitsReader(
+// Read metadata to determine what kind of FilterBitsReader is needed
+// and return a new one.
+FilterBitsReader* BuiltinFilterPolicy::GetFilterBitsReader(
     const Slice& contents) const {
+  return BuiltinFilterPolicy::GetBuiltinFilterBitsReader(contents);
+}
+
+BuiltinFilterBitsReader* BuiltinFilterPolicy::GetRibbonBitsReader(
+    const Slice& contents) {
   uint32_t len_with_meta = static_cast<uint32_t>(contents.size());
-  uint32_t len = len_with_meta - 5;
+  uint32_t len = len_with_meta - kMetadataLen;
 
   assert(len > 0);  // precondition
 
@@ -1122,10 +1661,10 @@ FilterBitsReader* BloomFilterPolicy::GetRibbonBitsReader(
 }
 
 // For newer Bloom filter implementations
-FilterBitsReader* BloomFilterPolicy::GetBloomBitsReader(
-    const Slice& contents) const {
+BuiltinFilterBitsReader* BuiltinFilterPolicy::GetBloomBitsReader(
+    const Slice& contents) {
   uint32_t len_with_meta = static_cast<uint32_t>(contents.size());
-  uint32_t len = len_with_meta - 5;
+  uint32_t len = len_with_meta - kMetadataLen;
 
   assert(len > 0);  // precondition
 
@@ -1182,65 +1721,253 @@ FilterBitsReader* BloomFilterPolicy::GetBloomBitsReader(
 }
 
 const FilterPolicy* NewBloomFilterPolicy(double bits_per_key,
-                                         bool use_block_based_builder) {
-  BloomFilterPolicy::Mode m;
-  if (use_block_based_builder) {
-    m = BloomFilterPolicy::kDeprecatedBlock;
+                                         bool /*use_block_based_builder*/) {
+  // NOTE: use_block_based_builder now ignored so block-based filter is no
+  // longer accessible in public API.
+  return new BloomFilterPolicy(bits_per_key);
+}
+
+RibbonFilterPolicy::RibbonFilterPolicy(double bloom_equivalent_bits_per_key,
+                                       int bloom_before_level)
+    : BloomLikeFilterPolicy(bloom_equivalent_bits_per_key),
+      bloom_before_level_(bloom_before_level) {}
+
+FilterBitsBuilder* RibbonFilterPolicy::GetBuilderWithContext(
+    const FilterBuildingContext& context) const {
+  if (GetMillibitsPerKey() == 0) {
+    // "No filter" special case
+    return nullptr;
+  }
+  // Treat unknown same as bottommost
+  int levelish = INT_MAX;
+
+  switch (context.compaction_style) {
+    case kCompactionStyleLevel:
+    case kCompactionStyleUniversal: {
+      if (context.reason == TableFileCreationReason::kFlush) {
+        // Treat flush as level -1
+        assert(context.level_at_creation == 0);
+        levelish = -1;
+      } else if (context.level_at_creation == -1) {
+        // Unknown level
+        assert(levelish == INT_MAX);
+      } else {
+        levelish = context.level_at_creation;
+      }
+      break;
+    }
+    case kCompactionStyleFIFO:
+    case kCompactionStyleNone:
+      // Treat as bottommost
+      assert(levelish == INT_MAX);
+      break;
+  }
+  if (levelish < bloom_before_level_) {
+    return GetFastLocalBloomBuilderWithContext(context);
   } else {
-    m = BloomFilterPolicy::kAutoBloom;
+    return GetStandard128RibbonBuilderWithContext(context);
   }
-  assert(std::find(BloomFilterPolicy::kAllUserModes.begin(),
-                   BloomFilterPolicy::kAllUserModes.end(),
-                   m) != BloomFilterPolicy::kAllUserModes.end());
-  return new BloomFilterPolicy(bits_per_key, m);
 }
 
-extern const FilterPolicy* NewExperimentalRibbonFilterPolicy(
-    double bloom_equivalent_bits_per_key) {
-  return new BloomFilterPolicy(bloom_equivalent_bits_per_key,
-                               BloomFilterPolicy::kStandard128Ribbon);
+const char* RibbonFilterPolicy::kClassName() { return "ribbonfilter"; }
+const char* RibbonFilterPolicy::kNickName() { return "rocksdb.RibbonFilter"; }
+
+std::string RibbonFilterPolicy::GetId() const {
+  return BloomLikeFilterPolicy::GetId() + ":" +
+         std::to_string(bloom_before_level_);
+}
+
+const FilterPolicy* NewRibbonFilterPolicy(double bloom_equivalent_bits_per_key,
+                                          int bloom_before_level) {
+  return new RibbonFilterPolicy(bloom_equivalent_bits_per_key,
+                                bloom_before_level);
 }
 
 FilterBuildingContext::FilterBuildingContext(
     const BlockBasedTableOptions& _table_options)
     : table_options(_table_options) {}
 
-FilterPolicy::~FilterPolicy() { }
+FilterPolicy::~FilterPolicy() {}
+
+std::shared_ptr<const FilterPolicy> BloomLikeFilterPolicy::Create(
+    const std::string& name, double bits_per_key) {
+  if (name == test::LegacyBloomFilterPolicy::kClassName()) {
+    return std::make_shared<test::LegacyBloomFilterPolicy>(bits_per_key);
+  } else if (name == test::FastLocalBloomFilterPolicy::kClassName()) {
+    return std::make_shared<test::FastLocalBloomFilterPolicy>(bits_per_key);
+  } else if (name == test::Standard128RibbonFilterPolicy::kClassName()) {
+    return std::make_shared<test::Standard128RibbonFilterPolicy>(bits_per_key);
+  } else if (name == BloomFilterPolicy::kClassName()) {
+    // For testing
+    return std::make_shared<BloomFilterPolicy>(bits_per_key);
+  } else if (name == RibbonFilterPolicy::kClassName()) {
+    // For testing
+    return std::make_shared<RibbonFilterPolicy>(bits_per_key,
+                                                /*bloom_before_level*/ 0);
+  } else {
+    return nullptr;
+  }
+}
+
+#ifndef ROCKSDB_LITE
+namespace {
+static ObjectLibrary::PatternEntry FilterPatternEntryWithBits(
+    const char* name) {
+  return ObjectLibrary::PatternEntry(name, false).AddNumber(":", false);
+}
+
+template <typename T>
+T* NewBuiltinFilterPolicyWithBits(const std::string& uri) {
+  const std::vector<std::string> vals = StringSplit(uri, ':');
+  double bits_per_key = ParseDouble(vals[1]);
+  return new T(bits_per_key);
+}
+static int RegisterBuiltinFilterPolicies(ObjectLibrary& library,
+                                         const std::string& /*arg*/) {
+  library.AddFactory<const FilterPolicy>(
+      ReadOnlyBuiltinFilterPolicy::kClassName(),
+      [](const std::string& /*uri*/, std::unique_ptr<const FilterPolicy>* guard,
+         std::string* /* errmsg */) {
+        guard->reset(new ReadOnlyBuiltinFilterPolicy());
+        return guard->get();
+      });
+
+  library.AddFactory<const FilterPolicy>(
+      FilterPatternEntryWithBits(BloomFilterPolicy::kClassName())
+          .AnotherName(BloomFilterPolicy::kNickName()),
+      [](const std::string& uri, std::unique_ptr<const FilterPolicy>* guard,
+         std::string* /* errmsg */) {
+        guard->reset(NewBuiltinFilterPolicyWithBits<BloomFilterPolicy>(uri));
+        return guard->get();
+      });
+  library.AddFactory<const FilterPolicy>(
+      FilterPatternEntryWithBits(BloomFilterPolicy::kClassName())
+          .AnotherName(BloomFilterPolicy::kNickName())
+          .AddSuffix(":false"),
+      [](const std::string& uri, std::unique_ptr<const FilterPolicy>* guard,
+         std::string* /* errmsg */) {
+        guard->reset(NewBuiltinFilterPolicyWithBits<BloomFilterPolicy>(uri));
+        return guard->get();
+      });
+  library.AddFactory<const FilterPolicy>(
+      FilterPatternEntryWithBits(BloomFilterPolicy::kClassName())
+          .AnotherName(BloomFilterPolicy::kNickName())
+          .AddSuffix(":true"),
+      [](const std::string& uri, std::unique_ptr<const FilterPolicy>* guard,
+         std::string* /* errmsg */) {
+        const std::vector<std::string> vals = StringSplit(uri, ':');
+        double bits_per_key = ParseDouble(vals[1]);
+        // NOTE: This case previously configured the deprecated block-based
+        // filter, but old ways of configuring that now map to full filter. We
+        // defer to the corresponding API to ensure consistency in case that
+        // change is reverted.
+        guard->reset(NewBloomFilterPolicy(bits_per_key, true));
+        return guard->get();
+      });
+  library.AddFactory<const FilterPolicy>(
+      FilterPatternEntryWithBits(RibbonFilterPolicy::kClassName())
+          .AnotherName(RibbonFilterPolicy::kNickName()),
+      [](const std::string& uri, std::unique_ptr<const FilterPolicy>* guard,
+         std::string* /* errmsg */) {
+        const std::vector<std::string> vals = StringSplit(uri, ':');
+        double bits_per_key = ParseDouble(vals[1]);
+        guard->reset(NewRibbonFilterPolicy(bits_per_key));
+        return guard->get();
+      });
+  library.AddFactory<const FilterPolicy>(
+      FilterPatternEntryWithBits(RibbonFilterPolicy::kClassName())
+          .AnotherName(RibbonFilterPolicy::kNickName())
+          .AddNumber(":", true),
+      [](const std::string& uri, std::unique_ptr<const FilterPolicy>* guard,
+         std::string* /* errmsg */) {
+        const std::vector<std::string> vals = StringSplit(uri, ':');
+        double bits_per_key = ParseDouble(vals[1]);
+        int bloom_before_level = ParseInt(vals[2]);
+        guard->reset(NewRibbonFilterPolicy(bits_per_key, bloom_before_level));
+        return guard->get();
+      });
+  library.AddFactory<const FilterPolicy>(
+      FilterPatternEntryWithBits(test::LegacyBloomFilterPolicy::kClassName()),
+      [](const std::string& uri, std::unique_ptr<const FilterPolicy>* guard,
+         std::string* /* errmsg */) {
+        guard->reset(
+            NewBuiltinFilterPolicyWithBits<test::LegacyBloomFilterPolicy>(uri));
+        return guard->get();
+      });
+  library.AddFactory<const FilterPolicy>(
+      FilterPatternEntryWithBits(
+          test::FastLocalBloomFilterPolicy::kClassName()),
+      [](const std::string& uri, std::unique_ptr<const FilterPolicy>* guard,
+         std::string* /* errmsg */) {
+        guard->reset(
+            NewBuiltinFilterPolicyWithBits<test::FastLocalBloomFilterPolicy>(
+                uri));
+        return guard->get();
+      });
+  library.AddFactory<const FilterPolicy>(
+      FilterPatternEntryWithBits(
+          test::Standard128RibbonFilterPolicy::kClassName()),
+      [](const std::string& uri, std::unique_ptr<const FilterPolicy>* guard,
+         std::string* /* errmsg */) {
+        guard->reset(
+            NewBuiltinFilterPolicyWithBits<test::Standard128RibbonFilterPolicy>(
+                uri));
+        return guard->get();
+      });
+  size_t num_types;
+  return static_cast<int>(library.GetFactoryCount(&num_types));
+}
+}  // namespace
+#endif  // ROCKSDB_LITE
 
 Status FilterPolicy::CreateFromString(
-    const ConfigOptions& /*options*/, const std::string& value,
+    const ConfigOptions& options, const std::string& value,
     std::shared_ptr<const FilterPolicy>* policy) {
-  const std::string kBloomName = "bloomfilter:";
-  const std::string kExpRibbonName = "experimental_ribbon:";
-  if (value == kNullptrString || value == "rocksdb.BuiltinBloomFilter") {
+  if (value == kNullptrString || value.empty()) {
     policy->reset();
-#ifndef ROCKSDB_LITE
-  } else if (value.compare(0, kBloomName.size(), kBloomName) == 0) {
-    size_t pos = value.find(':', kBloomName.size());
-    if (pos == std::string::npos) {
-      return Status::InvalidArgument(
-          "Invalid filter policy config, missing bits_per_key");
-    } else {
-      double bits_per_key = ParseDouble(
-          trim(value.substr(kBloomName.size(), pos - kBloomName.size())));
-      bool use_block_based_builder =
-          ParseBoolean("use_block_based_builder", trim(value.substr(pos + 1)));
-      policy->reset(
-          NewBloomFilterPolicy(bits_per_key, use_block_based_builder));
-    }
-  } else if (value.compare(0, kExpRibbonName.size(), kExpRibbonName) == 0) {
-    double bloom_equivalent_bits_per_key =
-        ParseDouble(trim(value.substr(kExpRibbonName.size())));
-    policy->reset(
-        NewExperimentalRibbonFilterPolicy(bloom_equivalent_bits_per_key));
+    return Status::OK();
+  } else if (value == ReadOnlyBuiltinFilterPolicy::kClassName()) {
+    *policy = std::make_shared<ReadOnlyBuiltinFilterPolicy>();
+    return Status::OK();
+  }
+
+  std::string id;
+  std::unordered_map<std::string, std::string> opt_map;
+  Status status =
+      Customizable::GetOptionsMap(options, policy->get(), value, &id, &opt_map);
+  if (!status.ok()) {  // GetOptionsMap failed
+    return status;
+  } else if (id.empty()) {  // We have no Id but have options.  Not good
+    return Status::NotSupported("Cannot reset object ", id);
   } else {
-    return Status::NotFound("Invalid filter policy name ", value);
+#ifndef ROCKSDB_LITE
+    static std::once_flag loaded;
+    std::call_once(loaded, [&]() {
+      RegisterBuiltinFilterPolicies(*(ObjectLibrary::Default().get()), "");
+    });
+    status = options.registry->NewSharedObject(id, policy);
 #else
-  } else {
-    return Status::NotSupported("Cannot load filter policy in LITE mode ",
-                                value);
+    status =
+        Status::NotSupported("Cannot load filter policy in LITE mode ", value);
 #endif  // ROCKSDB_LITE
   }
-  return Status::OK();
+  if (options.ignore_unsupported_options && status.IsNotSupported()) {
+    return Status::OK();
+  } else if (status.ok()) {
+    status = Customizable::ConfigureNewObject(
+        options, const_cast<FilterPolicy*>(policy->get()), opt_map);
+  }
+  return status;
 }
+
+const std::vector<std::string>& BloomLikeFilterPolicy::GetAllFixedImpls() {
+  STATIC_AVOID_DESTRUCTION(std::vector<std::string>, impls){
+      // Match filter_bench -impl=x ordering
+      test::LegacyBloomFilterPolicy::kClassName(),
+      test::FastLocalBloomFilterPolicy::kClassName(),
+      test::Standard128RibbonFilterPolicy::kClassName(),
+  };
+  return impls;
+}
+
 }  // namespace ROCKSDB_NAMESPACE