]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/table/block_based/filter_policy.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / table / block_based / filter_policy.cc
index c8f23ee33382bb677ecd8175abcec07a6572955d..a7ab907d4bc783ae74b6721da11d71e512fdaffe 100644 (file)
@@ -7,30 +7,89 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file. See the AUTHORS file for names of contributors.
 
+#include "rocksdb/filter_policy.h"
+
 #include <array>
 #include <deque>
 
-#include "rocksdb/filter_policy.h"
-
 #include "rocksdb/slice.h"
 #include "table/block_based/block_based_filter_block.h"
-#include "table/block_based/full_filter_block.h"
 #include "table/block_based/filter_policy_internal.h"
+#include "table/block_based/full_filter_block.h"
 #include "third-party/folly/folly/ConstexprMath.h"
 #include "util/bloom_impl.h"
 #include "util/coding.h"
 #include "util/hash.h"
+#include "util/ribbon_impl.h"
 
 namespace ROCKSDB_NAMESPACE {
 
+int BuiltinFilterBitsBuilder::CalculateNumEntry(const uint32_t bytes) {
+  int cur = 1;
+  // Find overestimate
+  while (CalculateSpace(cur) <= bytes && cur * 2 > cur) {
+    cur *= 2;
+  }
+  // Change to underestimate less than factor of two from answer
+  cur /= 2;
+  // Binary search
+  int delta = cur / 2;
+  while (delta > 0) {
+    if (CalculateSpace(cur + delta) <= bytes) {
+      cur += delta;
+    }
+    delta /= 2;
+  }
+  return cur;
+}
+
 namespace {
 
+Slice FinishAlwaysFalse(std::unique_ptr<const char[]>* /*buf*/) {
+  // Missing metadata, treated as zero entries
+  return Slice(nullptr, 0);
+}
+
+// Base class for filter builders using the XXH3 preview hash,
+// also known as Hash64 or GetSliceHash64.
+class XXH3pFilterBitsBuilder : public BuiltinFilterBitsBuilder {
+ public:
+  ~XXH3pFilterBitsBuilder() override {}
+
+  virtual void AddKey(const Slice& key) override {
+    uint64_t hash = GetSliceHash64(key);
+    // Especially with prefixes, it is common to have repetition,
+    // though only adjacent repetition, which we want to immediately
+    // recognize and collapse for estimating true filter space
+    // requirements.
+    if (hash_entries_.empty() || hash != hash_entries_.back()) {
+      hash_entries_.push_back(hash);
+    }
+  }
+
+ protected:
+  // For delegating between XXH3pFilterBitsBuilders
+  void SwapEntriesWith(XXH3pFilterBitsBuilder* other) {
+    std::swap(hash_entries_, other->hash_entries_);
+  }
+
+  // A deque avoids unnecessary copying of already-saved values
+  // and has near-minimal peak memory use.
+  std::deque<uint64_t> hash_entries_;
+};
+
+// #################### FastLocalBloom implementation ################## //
+// ############## also known as format_version=5 Bloom filter ########## //
+
 // See description in FastLocalBloomImpl
-class FastLocalBloomBitsBuilder : public BuiltinFilterBitsBuilder {
+class FastLocalBloomBitsBuilder : public XXH3pFilterBitsBuilder {
  public:
-  explicit FastLocalBloomBitsBuilder(const int millibits_per_key)
+  // Non-null aggregate_rounding_balance implies optimize_filters_for_memory
+  explicit FastLocalBloomBitsBuilder(
+      const int millibits_per_key,
+      std::atomic<int64_t>* aggregate_rounding_balance)
       : millibits_per_key_(millibits_per_key),
-        num_probes_(FastLocalBloomImpl::ChooseNumProbes(millibits_per_key_)) {
+        aggregate_rounding_balance_(aggregate_rounding_balance) {
     assert(millibits_per_key >= 1000);
   }
 
@@ -40,41 +99,37 @@ class FastLocalBloomBitsBuilder : public BuiltinFilterBitsBuilder {
 
   ~FastLocalBloomBitsBuilder() override {}
 
-  virtual void AddKey(const Slice& key) override {
-    uint64_t hash = GetSliceHash64(key);
-    if (hash_entries_.empty() || hash != hash_entries_.back()) {
-      hash_entries_.push_back(hash);
-    }
-  }
-
   virtual Slice Finish(std::unique_ptr<const char[]>* buf) override {
+    size_t num_entry = hash_entries_.size();
+    std::unique_ptr<char[]> mutable_buf;
     uint32_t len_with_metadata =
-        CalculateSpace(static_cast<uint32_t>(hash_entries_.size()));
-    char* data = new char[len_with_metadata];
-    memset(data, 0, len_with_metadata);
+        CalculateAndAllocate(num_entry, &mutable_buf, /*update_balance*/ true);
 
-    assert(data);
+    assert(mutable_buf);
     assert(len_with_metadata >= 5);
 
+    // Compute num_probes after any rounding / adjustments
+    int num_probes = GetNumProbes(num_entry, len_with_metadata);
+
     uint32_t len = len_with_metadata - 5;
     if (len > 0) {
-      AddAllEntries(data, len);
+      AddAllEntries(mutable_buf.get(), len, num_probes);
     }
 
+    assert(hash_entries_.empty());
+
     // See BloomFilterPolicy::GetBloomBitsReader re: metadata
     // -1 = Marker for newer Bloom implementations
-    data[len] = static_cast<char>(-1);
+    mutable_buf[len] = static_cast<char>(-1);
     // 0 = Marker for this sub-implementation
-    data[len + 1] = static_cast<char>(0);
+    mutable_buf[len + 1] = static_cast<char>(0);
     // num_probes (and 0 in upper bits for 64-byte block size)
-    data[len + 2] = static_cast<char>(num_probes_);
+    mutable_buf[len + 2] = static_cast<char>(num_probes);
     // rest of metadata stays zero
 
-    const char* const_data = data;
-    buf->reset(const_data);
-    assert(hash_entries_.empty());
-
-    return Slice(data, len_with_metadata);
+    Slice rv(mutable_buf.get(), len_with_metadata);
+    *buf = std::move(mutable_buf);
+    return rv;
   }
 
   int CalculateNumEntry(const uint32_t bytes) override {
@@ -84,26 +139,163 @@ class FastLocalBloomBitsBuilder : public BuiltinFilterBitsBuilder {
   }
 
   uint32_t CalculateSpace(const int num_entry) override {
-    uint32_t num_cache_lines = 0;
-    if (millibits_per_key_ > 0 && num_entry > 0) {
-      num_cache_lines = static_cast<uint32_t>(
-          (int64_t{num_entry} * millibits_per_key_ + 511999) / 512000);
+    // NB: the BuiltinFilterBitsBuilder API presumes len fits in uint32_t.
+    return static_cast<uint32_t>(
+        CalculateAndAllocate(static_cast<size_t>(num_entry),
+                             /* buf */ nullptr,
+                             /*update_balance*/ false));
+  }
+
+  // To choose size using malloc_usable_size, we have to actually allocate.
+  uint32_t CalculateAndAllocate(size_t num_entry, std::unique_ptr<char[]>* buf,
+                                bool update_balance) {
+    std::unique_ptr<char[]> tmpbuf;
+
+    // If not for cache line blocks in the filter, what would the target
+    // length in bytes be?
+    size_t raw_target_len = static_cast<size_t>(
+        (uint64_t{num_entry} * millibits_per_key_ + 7999) / 8000);
+
+    if (raw_target_len >= size_t{0xffffffc0}) {
+      // Max supported for this data structure implementation
+      raw_target_len = size_t{0xffffffc0};
+    }
+
+    // Round up to nearest multiple of 64 (block size). This adjustment is
+    // used for target FP rate only so that we don't receive complaints about
+    // lower FP rate vs. historic Bloom filter behavior.
+    uint32_t target_len =
+        static_cast<uint32_t>(raw_target_len + 63) & ~uint32_t{63};
+
+    // Return value set to a default; overwritten in some cases
+    uint32_t rv = target_len + /* metadata */ 5;
+#ifdef ROCKSDB_MALLOC_USABLE_SIZE
+    if (aggregate_rounding_balance_ != nullptr) {
+      // Do optimize_filters_for_memory, using malloc_usable_size.
+      // Approach: try to keep FP rate balance better than or on
+      // target (negative aggregate_rounding_balance_). We can then select a
+      // lower bound filter size (within reasonable limits) that gets us as
+      // close to on target as possible. We request allocation for that filter
+      // size and use malloc_usable_size to "round up" to the actual
+      // allocation size.
+
+      // Although it can be considered bad practice to use malloc_usable_size
+      // to access an object beyond its original size, this approach should
+      // quite general: working for all allocators that properly support
+      // malloc_usable_size.
+
+      // Race condition on balance is OK because it can only cause temporary
+      // skew in rounding up vs. rounding down, as long as updates are atomic
+      // and relative.
+      int64_t balance = aggregate_rounding_balance_->load();
+
+      double target_fp_rate = EstimatedFpRate(num_entry, target_len + 5);
+      double rv_fp_rate = target_fp_rate;
+
+      if (balance < 0) {
+        // See formula for BloomFilterPolicy::aggregate_rounding_balance_
+        double for_balance_fp_rate =
+            -balance / double{0x100000000} + target_fp_rate;
+
+        // To simplify, we just try a few modified smaller sizes. This also
+        // caps how much we vary filter size vs. target, to avoid outlier
+        // behavior from excessive variance.
+        for (uint64_t maybe_len64 :
+             {uint64_t{3} * target_len / 4, uint64_t{13} * target_len / 16,
+              uint64_t{7} * target_len / 8, uint64_t{15} * target_len / 16}) {
+          uint32_t maybe_len =
+              static_cast<uint32_t>(maybe_len64) & ~uint32_t{63};
+          double maybe_fp_rate = EstimatedFpRate(num_entry, maybe_len + 5);
+          if (maybe_fp_rate <= for_balance_fp_rate) {
+            rv = maybe_len + /* metadata */ 5;
+            rv_fp_rate = maybe_fp_rate;
+            break;
+          }
+        }
+      }
+
+      // Filter blocks are loaded into block cache with their block trailer.
+      // We need to make sure that's accounted for in choosing a
+      // fragmentation-friendly size.
+      const uint32_t kExtraPadding = kBlockTrailerSize;
+      size_t requested = rv + kExtraPadding;
+
+      // Allocate and get usable size
+      tmpbuf.reset(new char[requested]);
+      size_t usable = malloc_usable_size(tmpbuf.get());
+
+      if (usable - usable / 4 > requested) {
+        // Ratio greater than 4/3 is too much for utilizing, if it's
+        // not a buggy or mislinked malloc_usable_size implementation.
+        // Non-linearity of FP rates with bits/key means rapidly
+        // diminishing returns in overall accuracy for additional
+        // storage on disk.
+        // Nothing to do, except assert that the result is accurate about
+        // the usable size. (Assignment never used.)
+        assert((tmpbuf[usable - 1] = 'x'));
+      } else if (usable > requested) {
+        // Adjust for reasonably larger usable size
+        size_t usable_len = (usable - kExtraPadding - /* metadata */ 5);
+        if (usable_len >= size_t{0xffffffc0}) {
+          // Max supported for this data structure implementation
+          usable_len = size_t{0xffffffc0};
+        }
+
+        rv = (static_cast<uint32_t>(usable_len) & ~uint32_t{63}) +
+             /* metadata */ 5;
+        rv_fp_rate = EstimatedFpRate(num_entry, rv);
+      } else {
+        // Too small means bad malloc_usable_size
+        assert(usable == requested);
+      }
+      memset(tmpbuf.get(), 0, rv);
+
+      if (update_balance) {
+        int64_t diff = static_cast<int64_t>((rv_fp_rate - target_fp_rate) *
+                                            double{0x100000000});
+        *aggregate_rounding_balance_ += diff;
+      }
     }
-    return num_cache_lines * 64 + /*metadata*/ 5;
+#else
+    (void)update_balance;
+#endif  // ROCKSDB_MALLOC_USABLE_SIZE
+    if (buf) {
+      if (tmpbuf) {
+        *buf = std::move(tmpbuf);
+      } else {
+        buf->reset(new char[rv]());
+      }
+    }
+    return rv;
   }
 
-  double EstimatedFpRate(size_t keys, size_t bytes) override {
-    return FastLocalBloomImpl::EstimatedFpRate(keys, bytes - /*metadata*/ 5,
-                                               num_probes_, /*hash bits*/ 64);
+  double EstimatedFpRate(size_t keys, size_t len_with_metadata) override {
+    int num_probes = GetNumProbes(keys, len_with_metadata);
+    return FastLocalBloomImpl::EstimatedFpRate(
+        keys, len_with_metadata - /*metadata*/ 5, num_probes, /*hash bits*/ 64);
   }
 
  private:
-  void AddAllEntries(char* data, uint32_t len) {
+  // Compute num_probes after any rounding / adjustments
+  int GetNumProbes(size_t keys, size_t len_with_metadata) {
+    uint64_t millibits = uint64_t{len_with_metadata - 5} * 8000;
+    int actual_millibits_per_key =
+        static_cast<int>(millibits / std::max(keys, size_t{1}));
+    // BEGIN XXX/TODO(peterd): preserving old/default behavior for now to
+    // minimize unit test churn. Remove this some time.
+    if (!aggregate_rounding_balance_) {
+      actual_millibits_per_key = millibits_per_key_;
+    }
+    // END XXX/TODO
+    return FastLocalBloomImpl::ChooseNumProbes(actual_millibits_per_key);
+  }
+
+  void AddAllEntries(char* data, uint32_t len, int num_probes) {
     // Simple version without prefetching:
     //
     // for (auto h : hash_entries_) {
     //   FastLocalBloomImpl::AddHash(Lower32of64(h), Upper32of64(h), len,
-    //                               num_probes_, data);
+    //                               num_probes, data);
     // }
 
     const size_t num_entries = hash_entries_.size();
@@ -129,7 +321,7 @@ class FastLocalBloomBitsBuilder : public BuiltinFilterBitsBuilder {
       uint32_t& hash_ref = hashes[i & kBufferMask];
       uint32_t& byte_offset_ref = byte_offsets[i & kBufferMask];
       // Process (add)
-      FastLocalBloomImpl::AddHashPrepared(hash_ref, num_probes_,
+      FastLocalBloomImpl::AddHashPrepared(hash_ref, num_probes,
                                           data + byte_offset_ref);
       // And buffer
       uint64_t h = hash_entries_.front();
@@ -141,16 +333,16 @@ class FastLocalBloomBitsBuilder : public BuiltinFilterBitsBuilder {
 
     // Finish processing
     for (i = 0; i <= kBufferMask && i < num_entries; ++i) {
-      FastLocalBloomImpl::AddHashPrepared(hashes[i], num_probes_,
+      FastLocalBloomImpl::AddHashPrepared(hashes[i], num_probes,
                                           data + byte_offsets[i]);
     }
   }
 
+  // Target allocation per added key, in thousandths of a bit.
   int millibits_per_key_;
-  int num_probes_;
-  // A deque avoids unnecessary copying of already-saved values
-  // and has near-minimal peak memory use.
-  std::deque<uint64_t> hash_entries_;
+  // See BloomFilterPolicy::aggregate_rounding_balance_. If nullptr,
+  // always "round up" like historic behavior.
+  std::atomic<int64_t>* aggregate_rounding_balance_;
 };
 
 // See description in FastLocalBloomImpl
@@ -195,6 +387,213 @@ class FastLocalBloomBitsReader : public FilterBitsReader {
   const uint32_t len_bytes_;
 };
 
+// ##################### Ribbon filter implementation ################### //
+
+// Implements concept RehasherTypesAndSettings in ribbon_impl.h
+struct Standard128RibbonRehasherTypesAndSettings {
+  // These are schema-critical. Any change almost certainly changes
+  // underlying data.
+  static constexpr bool kIsFilter = true;
+  static constexpr bool kFirstCoeffAlwaysOne = true;
+  static constexpr bool kUseSmash = false;
+  using CoeffRow = ROCKSDB_NAMESPACE::Unsigned128;
+  using Hash = uint64_t;
+  using Seed = uint32_t;
+  // Changing these doesn't necessarily change underlying data,
+  // but might affect supported scalability of those dimensions.
+  using Index = uint32_t;
+  using ResultRow = uint32_t;
+  // Save a conditional in Ribbon queries
+  static constexpr bool kAllowZeroStarts = false;
+};
+
+using Standard128RibbonTypesAndSettings =
+    ribbon::StandardRehasherAdapter<Standard128RibbonRehasherTypesAndSettings>;
+
+class Standard128RibbonBitsBuilder : public XXH3pFilterBitsBuilder {
+ public:
+  explicit Standard128RibbonBitsBuilder(double desired_one_in_fp_rate,
+                                        int bloom_millibits_per_key,
+                                        Logger* info_log)
+      : desired_one_in_fp_rate_(desired_one_in_fp_rate),
+        info_log_(info_log),
+        bloom_fallback_(bloom_millibits_per_key, nullptr) {
+    assert(desired_one_in_fp_rate >= 1.0);
+  }
+
+  // No Copy allowed
+  Standard128RibbonBitsBuilder(const Standard128RibbonBitsBuilder&) = delete;
+  void operator=(const Standard128RibbonBitsBuilder&) = delete;
+
+  ~Standard128RibbonBitsBuilder() override {}
+
+  virtual Slice Finish(std::unique_ptr<const char[]>* buf) override {
+    // More than 2^30 entries (~1 billion) not supported
+    if (hash_entries_.size() >= (size_t{1} << 30)) {
+      ROCKS_LOG_WARN(info_log_, "Too many keys for Ribbon filter: %llu",
+                     static_cast<unsigned long long>(hash_entries_.size()));
+      SwapEntriesWith(&bloom_fallback_);
+      assert(hash_entries_.empty());
+      return bloom_fallback_.Finish(buf);
+    }
+    if (hash_entries_.size() == 0) {
+      // Save a conditional in Ribbon queries by using alternate reader
+      // for zero entries added.
+      return FinishAlwaysFalse(buf);
+    }
+    uint32_t num_entries = static_cast<uint32_t>(hash_entries_.size());
+    uint32_t num_slots = BandingType::GetNumSlotsFor95PctSuccess(num_entries);
+    num_slots = SolnType::RoundUpNumSlots(num_slots);
+
+    uint32_t entropy = 0;
+    if (num_entries > 0) {
+      entropy = Lower32of64(hash_entries_.front());
+    }
+    size_t len = SolnType::GetBytesForOneInFpRate(
+        num_slots, desired_one_in_fp_rate_, /*rounding*/ entropy);
+    size_t len_with_metadata = len + 5;
+
+    // Use Bloom filter when it's better for small filters
+    if (num_slots < 1024 && bloom_fallback_.CalculateSpace(static_cast<int>(
+                                num_entries)) < len_with_metadata) {
+      SwapEntriesWith(&bloom_fallback_);
+      assert(hash_entries_.empty());
+      return bloom_fallback_.Finish(buf);
+    }
+
+    BandingType banding;
+    bool success = banding.ResetAndFindSeedToSolve(
+        num_slots, hash_entries_.begin(), hash_entries_.end(),
+        /*starting seed*/ entropy & 255, /*seed mask*/ 255);
+    if (!success) {
+      ROCKS_LOG_WARN(info_log_,
+                     "Too many re-seeds (256) for Ribbon filter, %llu / %llu",
+                     static_cast<unsigned long long>(hash_entries_.size()),
+                     static_cast<unsigned long long>(num_slots));
+      SwapEntriesWith(&bloom_fallback_);
+      assert(hash_entries_.empty());
+      return bloom_fallback_.Finish(buf);
+    }
+    hash_entries_.clear();
+
+    uint32_t seed = banding.GetOrdinalSeed();
+    assert(seed < 256);
+
+    std::unique_ptr<char[]> mutable_buf(new char[len_with_metadata]);
+
+    SolnType soln(mutable_buf.get(), len_with_metadata);
+    soln.BackSubstFrom(banding);
+    uint32_t num_blocks = soln.GetNumBlocks();
+    // This should be guaranteed:
+    // num_entries < 2^30
+    // => (overhead_factor < 2.0)
+    // num_entries * overhead_factor == num_slots < 2^31
+    // => (num_blocks = num_slots / 128)
+    // num_blocks < 2^24
+    assert(num_blocks < 0x1000000U);
+
+    // See BloomFilterPolicy::GetBloomBitsReader re: metadata
+    // -2 = Marker for Standard128 Ribbon
+    mutable_buf[len] = static_cast<char>(-2);
+    // Hash seed
+    mutable_buf[len + 1] = static_cast<char>(seed);
+    // Number of blocks, in 24 bits
+    // (Along with bytes, we can derive other settings)
+    mutable_buf[len + 2] = static_cast<char>(num_blocks & 255);
+    mutable_buf[len + 3] = static_cast<char>((num_blocks >> 8) & 255);
+    mutable_buf[len + 4] = static_cast<char>((num_blocks >> 16) & 255);
+
+    Slice rv(mutable_buf.get(), len_with_metadata);
+    *buf = std::move(mutable_buf);
+    return rv;
+  }
+
+  uint32_t CalculateSpace(const int num_entries) override {
+    // NB: the BuiltinFilterBitsBuilder API presumes len fits in uint32_t.
+    uint32_t num_slots =
+        NumEntriesToNumSlots(static_cast<uint32_t>(num_entries));
+    uint32_t ribbon = static_cast<uint32_t>(
+        SolnType::GetBytesForOneInFpRate(num_slots, desired_one_in_fp_rate_,
+                                         /*rounding*/ 0) +
+        /*metadata*/ 5);
+    // Consider possible Bloom fallback for small filters
+    if (num_slots < 1024) {
+      uint32_t bloom = bloom_fallback_.CalculateSpace(num_entries);
+      return std::min(bloom, ribbon);
+    } else {
+      return ribbon;
+    }
+  }
+
+  double EstimatedFpRate(size_t num_entries,
+                         size_t len_with_metadata) override {
+    uint32_t num_slots =
+        NumEntriesToNumSlots(static_cast<uint32_t>(num_entries));
+    SolnType fake_soln(nullptr, len_with_metadata);
+    fake_soln.ConfigureForNumSlots(num_slots);
+    return fake_soln.ExpectedFpRate();
+  }
+
+ private:
+  using TS = Standard128RibbonTypesAndSettings;
+  using SolnType = ribbon::SerializableInterleavedSolution<TS>;
+  using BandingType = ribbon::StandardBanding<TS>;
+
+  static uint32_t NumEntriesToNumSlots(uint32_t num_entries) {
+    uint32_t num_slots1 = BandingType::GetNumSlotsFor95PctSuccess(num_entries);
+    return SolnType::RoundUpNumSlots(num_slots1);
+  }
+
+  // A desired value for 1/fp_rate. For example, 100 -> 1% fp rate.
+  double desired_one_in_fp_rate_;
+
+  // For warnings, or can be nullptr
+  Logger* info_log_;
+
+  // For falling back on Bloom filter in some exceptional cases and
+  // very small filter cases
+  FastLocalBloomBitsBuilder bloom_fallback_;
+};
+
+class Standard128RibbonBitsReader : public FilterBitsReader {
+ public:
+  Standard128RibbonBitsReader(const char* data, size_t len_bytes,
+                              uint32_t num_blocks, uint32_t seed)
+      : soln_(const_cast<char*>(data), len_bytes) {
+    soln_.ConfigureForNumBlocks(num_blocks);
+    hasher_.SetOrdinalSeed(seed);
+  }
+
+  // No Copy allowed
+  Standard128RibbonBitsReader(const Standard128RibbonBitsReader&) = delete;
+  void operator=(const Standard128RibbonBitsReader&) = delete;
+
+  ~Standard128RibbonBitsReader() override {}
+
+  bool MayMatch(const Slice& key) override {
+    uint64_t h = GetSliceHash64(key);
+    return soln_.FilterQuery(h, hasher_);
+  }
+
+  virtual void MayMatch(int num_keys, Slice** keys, bool* may_match) override {
+    std::array<uint64_t, MultiGetContext::MAX_BATCH_SIZE> hashes;
+    for (int i = 0; i < num_keys; ++i) {
+      hashes[i] = GetSliceHash64(*keys[i]);
+      // FIXME: batched get optimization
+    }
+    for (int i = 0; i < num_keys; ++i) {
+      may_match[i] = soln_.FilterQuery(hashes[i], hasher_);
+    }
+  }
+
+ private:
+  using TS = Standard128RibbonTypesAndSettings;
+  ribbon::SerializableInterleavedSolution<TS> soln_;
+  ribbon::StandardHasher<TS> hasher_;
+};
+
+// ##################### Legacy Bloom implementation ################### //
+
 using LegacyBloomImpl = LegacyLocalityBloomImpl</*ExtraRotates*/ false>;
 
 class LegacyBloomBitsBuilder : public BuiltinFilterBitsBuilder {
@@ -449,15 +848,17 @@ const std::vector<BloomFilterPolicy::Mode> BloomFilterPolicy::kAllFixedImpls = {
     kLegacyBloom,
     kDeprecatedBlock,
     kFastLocalBloom,
+    kStandard128Ribbon,
 };
 
 const std::vector<BloomFilterPolicy::Mode> BloomFilterPolicy::kAllUserModes = {
     kDeprecatedBlock,
-    kAuto,
+    kAutoBloom,
+    kStandard128Ribbon,
 };
 
 BloomFilterPolicy::BloomFilterPolicy(double bits_per_key, Mode mode)
-    : mode_(mode), warned_(false) {
+    : mode_(mode), warned_(false), aggregate_rounding_balance_(0) {
   // Sanitize bits_per_key
   if (bits_per_key < 1.0) {
     bits_per_key = 1.0;
@@ -470,6 +871,15 @@ BloomFilterPolicy::BloomFilterPolicy(double bits_per_key, Mode mode)
   // point are interpreted accurately.
   millibits_per_key_ = static_cast<int>(bits_per_key * 1000.0 + 0.500001);
 
+  // For now configure Ribbon filter to match Bloom FP rate and save
+  // memory. (Ribbon bits per key will be ~30% less than Bloom bits per key
+  // for same FP rate.)
+  desired_one_in_fp_rate_ =
+      1.0 / BloomMath::CacheLocalFpRate(
+                bits_per_key,
+                FastLocalBloomImpl::ChooseNumProbes(millibits_per_key_),
+                /*cache_line_bits*/ 512);
+
   // For better or worse, this is a rounding up of a nudged rounding up,
   // e.g. 7.4999999999999 will round up to 8, but that provides more
   // predictability against small arithmetic errors in floating point.
@@ -549,11 +959,12 @@ FilterBitsBuilder* BloomFilterPolicy::GetFilterBitsBuilder() const {
 FilterBitsBuilder* BloomFilterPolicy::GetBuilderWithContext(
     const FilterBuildingContext& context) const {
   Mode cur = mode_;
+  bool offm = context.table_options.optimize_filters_for_memory;
   // Unusual code construction so that we can have just
   // one exhaustive switch without (risky) recursion
   for (int i = 0; i < 2; ++i) {
     switch (cur) {
-      case kAuto:
+      case kAutoBloom:
         if (context.table_options.format_version < 5) {
           cur = kLegacyBloom;
         } else {
@@ -563,7 +974,8 @@ FilterBitsBuilder* BloomFilterPolicy::GetBuilderWithContext(
       case kDeprecatedBlock:
         return nullptr;
       case kFastLocalBloom:
-        return new FastLocalBloomBitsBuilder(millibits_per_key_);
+        return new FastLocalBloomBitsBuilder(
+            millibits_per_key_, offm ? &aggregate_rounding_balance_ : nullptr);
       case kLegacyBloom:
         if (whole_bits_per_key_ >= 14 && context.info_log &&
             !warned_.load(std::memory_order_relaxed)) {
@@ -585,6 +997,9 @@ FilterBitsBuilder* BloomFilterPolicy::GetBuilderWithContext(
         }
         return new LegacyBloomBitsBuilder(whole_bits_per_key_,
                                           context.info_log);
+      case kStandard128Ribbon:
+        return new Standard128RibbonBitsBuilder(
+            desired_one_in_fp_rate_, millibits_per_key_, context.info_log);
     }
   }
   assert(false);
@@ -632,13 +1047,20 @@ FilterBitsReader* BloomFilterPolicy::GetFilterBitsReader(
   if (raw_num_probes < 1) {
     // Note: < 0 (or unsigned > 127) indicate special new implementations
     // (or reserved for future use)
-    if (raw_num_probes == -1) {
-      // Marker for newer Bloom implementations
-      return GetBloomBitsReader(contents);
+    switch (raw_num_probes) {
+      case 0:
+        // Treat as zero probes (always FP)
+        return new AlwaysTrueFilter();
+      case -1:
+        // Marker for newer Bloom implementations
+        return GetBloomBitsReader(contents);
+      case -2:
+        // Marker for Ribbon implementations
+        return GetRibbonBitsReader(contents);
+      default:
+        // Reserved (treat as zero probes, always FP, for now)
+        return new AlwaysTrueFilter();
     }
-    // otherwise
-    // Treat as zero probes (always FP) for now.
-    return new AlwaysTrueFilter();
   }
   // else attempt decode for LegacyBloomBitsReader
 
@@ -676,6 +1098,29 @@ FilterBitsReader* BloomFilterPolicy::GetFilterBitsReader(
                                    log2_cache_line_size);
 }
 
+FilterBitsReader* BloomFilterPolicy::GetRibbonBitsReader(
+    const Slice& contents) const {
+  uint32_t len_with_meta = static_cast<uint32_t>(contents.size());
+  uint32_t len = len_with_meta - 5;
+
+  assert(len > 0);  // precondition
+
+  uint32_t seed = static_cast<uint8_t>(contents.data()[len + 1]);
+  uint32_t num_blocks = static_cast<uint8_t>(contents.data()[len + 2]);
+  num_blocks |= static_cast<uint8_t>(contents.data()[len + 3]) << 8;
+  num_blocks |= static_cast<uint8_t>(contents.data()[len + 4]) << 16;
+  if (num_blocks < 2) {
+    // Not supported
+    // num_blocks == 1 is not used because num_starts == 1 is problematic
+    // for the hashing scheme. num_blocks == 0 is unused because there's
+    // already a concise encoding of an "always false" filter.
+    // Return something safe:
+    return new AlwaysTrueFilter();
+  }
+  return new Standard128RibbonBitsReader(contents.data(), len, num_blocks,
+                                         seed);
+}
+
 // For newer Bloom filter implementations
 FilterBitsReader* BloomFilterPolicy::GetBloomBitsReader(
     const Slice& contents) const {
@@ -742,7 +1187,7 @@ const FilterPolicy* NewBloomFilterPolicy(double bits_per_key,
   if (use_block_based_builder) {
     m = BloomFilterPolicy::kDeprecatedBlock;
   } else {
-    m = BloomFilterPolicy::kAuto;
+    m = BloomFilterPolicy::kAutoBloom;
   }
   assert(std::find(BloomFilterPolicy::kAllUserModes.begin(),
                    BloomFilterPolicy::kAllUserModes.end(),
@@ -750,10 +1195,52 @@ const FilterPolicy* NewBloomFilterPolicy(double bits_per_key,
   return new BloomFilterPolicy(bits_per_key, m);
 }
 
+extern const FilterPolicy* NewExperimentalRibbonFilterPolicy(
+    double bloom_equivalent_bits_per_key) {
+  return new BloomFilterPolicy(bloom_equivalent_bits_per_key,
+                               BloomFilterPolicy::kStandard128Ribbon);
+}
+
 FilterBuildingContext::FilterBuildingContext(
     const BlockBasedTableOptions& _table_options)
     : table_options(_table_options) {}
 
 FilterPolicy::~FilterPolicy() { }
 
+Status FilterPolicy::CreateFromString(
+    const ConfigOptions& /*options*/, const std::string& value,
+    std::shared_ptr<const FilterPolicy>* policy) {
+  const std::string kBloomName = "bloomfilter:";
+  const std::string kExpRibbonName = "experimental_ribbon:";
+  if (value == kNullptrString || value == "rocksdb.BuiltinBloomFilter") {
+    policy->reset();
+#ifndef ROCKSDB_LITE
+  } else if (value.compare(0, kBloomName.size(), kBloomName) == 0) {
+    size_t pos = value.find(':', kBloomName.size());
+    if (pos == std::string::npos) {
+      return Status::InvalidArgument(
+          "Invalid filter policy config, missing bits_per_key");
+    } else {
+      double bits_per_key = ParseDouble(
+          trim(value.substr(kBloomName.size(), pos - kBloomName.size())));
+      bool use_block_based_builder =
+          ParseBoolean("use_block_based_builder", trim(value.substr(pos + 1)));
+      policy->reset(
+          NewBloomFilterPolicy(bits_per_key, use_block_based_builder));
+    }
+  } else if (value.compare(0, kExpRibbonName.size(), kExpRibbonName) == 0) {
+    double bloom_equivalent_bits_per_key =
+        ParseDouble(trim(value.substr(kExpRibbonName.size())));
+    policy->reset(
+        NewExperimentalRibbonFilterPolicy(bloom_equivalent_bits_per_key));
+  } else {
+    return Status::NotFound("Invalid filter policy name ", value);
+#else
+  } else {
+    return Status::NotSupported("Cannot load filter policy in LITE mode ",
+                                value);
+#endif  // ROCKSDB_LITE
+  }
+  return Status::OK();
+}
 }  // namespace ROCKSDB_NAMESPACE