]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/cache/sharded_cache.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / cache / sharded_cache.h
index ce9e459dc141529fdc25739ac4d6e1ed9bd29231..e3271cc7bd3beef0f18ee6bafc6484746c33c26d 100644 (file)
 #pragma once
 
 #include <atomic>
+#include <cstdint>
 #include <string>
 
+#include "port/lang.h"
 #include "port/port.h"
 #include "rocksdb/cache.h"
 #include "util/hash.h"
+#include "util/mutexlock.h"
 
 namespace ROCKSDB_NAMESPACE {
 
-// Single cache shard interface.
-class CacheShard {
+// Optional base class for classes implementing the CacheShard concept
+class CacheShardBase {
  public:
-  CacheShard() = default;
-  virtual ~CacheShard() = default;
-
-  virtual Status Insert(const Slice& key, uint32_t hash, void* value,
-                        size_t charge,
-                        void (*deleter)(const Slice& key, void* value),
-                        Cache::Handle** handle, Cache::Priority priority) = 0;
-  virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash) = 0;
-  virtual bool Ref(Cache::Handle* handle) = 0;
-  virtual bool Release(Cache::Handle* handle, bool force_erase = false) = 0;
-  virtual void Erase(const Slice& key, uint32_t hash) = 0;
-  virtual void SetCapacity(size_t capacity) = 0;
-  virtual void SetStrictCapacityLimit(bool strict_capacity_limit) = 0;
-  virtual size_t GetUsage() const = 0;
-  virtual size_t GetPinnedUsage() const = 0;
-  virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t),
-                                      bool thread_safe) = 0;
-  virtual void EraseUnRefEntries() = 0;
-  virtual std::string GetPrintableOptions() const { return ""; }
-  void set_metadata_charge_policy(
-      CacheMetadataChargePolicy metadata_charge_policy) {
-    metadata_charge_policy_ = metadata_charge_policy;
+  explicit CacheShardBase(CacheMetadataChargePolicy metadata_charge_policy)
+      : metadata_charge_policy_(metadata_charge_policy) {}
+
+  using DeleterFn = Cache::DeleterFn;
+
+  // Expected by concept CacheShard (TODO with C++20 support)
+  // Some Defaults
+  std::string GetPrintableOptions() const { return ""; }
+  using HashVal = uint64_t;
+  using HashCref = uint64_t;
+  static inline HashVal ComputeHash(const Slice& key) {
+    return GetSliceNPHash64(key);
   }
+  static inline uint32_t HashPieceForSharding(HashCref hash) {
+    return Lower32of64(hash);
+  }
+  void AppendPrintableOptions(std::string& /*str*/) const {}
+
+  // Must be provided for concept CacheShard (TODO with C++20 support)
+  /*
+  struct HandleImpl {  // for concept HandleImpl
+    HashVal hash;
+    HashCref GetHash() const;
+    ...
+  };
+  Status Insert(const Slice& key, HashCref hash, void* value, size_t charge,
+                DeleterFn deleter, HandleImpl** handle,
+                Cache::Priority priority) = 0;
+  Status Insert(const Slice& key, HashCref hash, void* value,
+                const Cache::CacheItemHelper* helper, size_t charge,
+                HandleImpl** handle, Cache::Priority priority) = 0;
+  HandleImpl* Lookup(const Slice& key, HashCref hash) = 0;
+  HandleImpl* Lookup(const Slice& key, HashCref hash,
+                        const Cache::CacheItemHelper* helper,
+                        const Cache::CreateCallback& create_cb,
+                        Cache::Priority priority, bool wait,
+                        Statistics* stats) = 0;
+  bool Release(HandleImpl* handle, bool useful, bool erase_if_last_ref) = 0;
+  bool IsReady(HandleImpl* handle) = 0;
+  void Wait(HandleImpl* handle) = 0;
+  bool Ref(HandleImpl* handle) = 0;
+  void Erase(const Slice& key, HashCref hash) = 0;
+  void SetCapacity(size_t capacity) = 0;
+  void SetStrictCapacityLimit(bool strict_capacity_limit) = 0;
+  size_t GetUsage() const = 0;
+  size_t GetPinnedUsage() const = 0;
+  size_t GetOccupancyCount() const = 0;
+  size_t GetTableAddressCount() const = 0;
+  // Handles iterating over roughly `average_entries_per_lock` entries, using
+  // `state` to somehow record where it last ended up. Caller initially uses
+  // *state == 0 and implementation sets *state = SIZE_MAX to indicate
+  // completion.
+  void ApplyToSomeEntries(
+      const std::function<void(const Slice& key, void* value, size_t charge,
+                               DeleterFn deleter)>& callback,
+      size_t average_entries_per_lock, size_t* state) = 0;
+  void EraseUnRefEntries() = 0;
+  */
 
  protected:
-  CacheMetadataChargePolicy metadata_charge_policy_ = kDontChargeCacheMetadata;
+  const CacheMetadataChargePolicy metadata_charge_policy_;
+};
+
+// Portions of ShardedCache that do not depend on the template parameter
+class ShardedCacheBase : public Cache {
+ public:
+  ShardedCacheBase(size_t capacity, int num_shard_bits,
+                   bool strict_capacity_limit,
+                   std::shared_ptr<MemoryAllocator> memory_allocator);
+  virtual ~ShardedCacheBase() = default;
+
+  int GetNumShardBits() const;
+  uint32_t GetNumShards() const;
+
+  uint64_t NewId() override;
+
+  bool HasStrictCapacityLimit() const override;
+  size_t GetCapacity() const override;
+
+  using Cache::GetUsage;
+  size_t GetUsage(Handle* handle) const override;
+  std::string GetPrintableOptions() const override;
+
+ protected:  // fns
+  virtual void AppendPrintableOptions(std::string& str) const = 0;
+  size_t GetPerShardCapacity() const;
+  size_t ComputePerShardCapacity(size_t capacity) const;
+
+ protected:                        // data
+  std::atomic<uint64_t> last_id_;  // For NewId
+  const uint32_t shard_mask_;
+
+  // Dynamic configuration parameters, guarded by config_mutex_
+  bool strict_capacity_limit_;
+  size_t capacity_;
+  mutable port::Mutex config_mutex_;
 };
 
-// Generic cache interface which shards cache by hash of keys. 2^num_shard_bits
+// Generic cache interface that shards cache by hash of keys. 2^num_shard_bits
 // shards will be created, with capacity split evenly to each of the shards.
-// Keys are sharded by the highest num_shard_bits bits of hash value.
-class ShardedCache : public Cache {
+// Keys are typically sharded by the lowest num_shard_bits bits of hash value
+// so that the upper bits of the hash value can keep a stable ordering of
+// table entries even as the table grows (using more upper hash bits).
+// See CacheShardBase above for what is expected of the CacheShard parameter.
+template <class CacheShard>
+class ShardedCache : public ShardedCacheBase {
  public:
+  using HashVal = typename CacheShard::HashVal;
+  using HashCref = typename CacheShard::HashCref;
+  using HandleImpl = typename CacheShard::HandleImpl;
+
   ShardedCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit,
-               std::shared_ptr<MemoryAllocator> memory_allocator = nullptr);
-  virtual ~ShardedCache() = default;
-  virtual const char* Name() const override = 0;
-  virtual CacheShard* GetShard(int shard) = 0;
-  virtual const CacheShard* GetShard(int shard) const = 0;
-  virtual void* Value(Handle* handle) override = 0;
-  virtual size_t GetCharge(Handle* handle) const override = 0;
-
-  virtual uint32_t GetHash(Handle* handle) const = 0;
-  virtual void DisownData() override = 0;
-
-  virtual void SetCapacity(size_t capacity) override;
-  virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override;
-
-  virtual Status Insert(const Slice& key, void* value, size_t charge,
-                        void (*deleter)(const Slice& key, void* value),
-                        Handle** handle, Priority priority) override;
-  virtual Handle* Lookup(const Slice& key, Statistics* stats) override;
-  virtual bool Ref(Handle* handle) override;
-  virtual bool Release(Handle* handle, bool force_erase = false) override;
-  virtual void Erase(const Slice& key) override;
-  virtual uint64_t NewId() override;
-  virtual size_t GetCapacity() const override;
-  virtual bool HasStrictCapacityLimit() const override;
-  virtual size_t GetUsage() const override;
-  virtual size_t GetUsage(Handle* handle) const override;
-  virtual size_t GetPinnedUsage() const override;
-  virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t),
-                                      bool thread_safe) override;
-  virtual void EraseUnRefEntries() override;
-  virtual std::string GetPrintableOptions() const override;
-
-  int GetNumShardBits() const { return num_shard_bits_; }
+               std::shared_ptr<MemoryAllocator> allocator)
+      : ShardedCacheBase(capacity, num_shard_bits, strict_capacity_limit,
+                         allocator),
+        shards_(reinterpret_cast<CacheShard*>(port::cacheline_aligned_alloc(
+            sizeof(CacheShard) * GetNumShards()))),
+        destroy_shards_in_dtor_(false) {}
 
- private:
-  static inline uint32_t HashSlice(const Slice& s) {
-    return static_cast<uint32_t>(GetSliceNPHash64(s));
+  virtual ~ShardedCache() {
+    if (destroy_shards_in_dtor_) {
+      ForEachShard([](CacheShard* cs) { cs->~CacheShard(); });
+    }
+    port::cacheline_aligned_free(shards_);
   }
 
-  uint32_t Shard(uint32_t hash) {
-    // Note, hash >> 32 yields hash in gcc, not the zero we expect!
-    return (num_shard_bits_ > 0) ? (hash >> (32 - num_shard_bits_)) : 0;
+  CacheShard& GetShard(HashCref hash) {
+    return shards_[CacheShard::HashPieceForSharding(hash) & shard_mask_];
   }
 
-  int num_shard_bits_;
-  mutable port::Mutex capacity_mutex_;
-  size_t capacity_;
-  bool strict_capacity_limit_;
-  std::atomic<uint64_t> last_id_;
+  const CacheShard& GetShard(HashCref hash) const {
+    return shards_[CacheShard::HashPieceForSharding(hash) & shard_mask_];
+  }
+
+  void SetCapacity(size_t capacity) override {
+    MutexLock l(&config_mutex_);
+    capacity_ = capacity;
+    auto per_shard = ComputePerShardCapacity(capacity);
+    ForEachShard([=](CacheShard* cs) { cs->SetCapacity(per_shard); });
+  }
+
+  void SetStrictCapacityLimit(bool s_c_l) override {
+    MutexLock l(&config_mutex_);
+    strict_capacity_limit_ = s_c_l;
+    ForEachShard(
+        [s_c_l](CacheShard* cs) { cs->SetStrictCapacityLimit(s_c_l); });
+  }
+
+  Status Insert(const Slice& key, void* value, size_t charge, DeleterFn deleter,
+                Handle** handle, Priority priority) override {
+    HashVal hash = CacheShard::ComputeHash(key);
+    auto h_out = reinterpret_cast<HandleImpl**>(handle);
+    return GetShard(hash).Insert(key, hash, value, charge, deleter, h_out,
+                                 priority);
+  }
+  Status Insert(const Slice& key, void* value, const CacheItemHelper* helper,
+                size_t charge, Handle** handle = nullptr,
+                Priority priority = Priority::LOW) override {
+    if (!helper) {
+      return Status::InvalidArgument();
+    }
+    HashVal hash = CacheShard::ComputeHash(key);
+    auto h_out = reinterpret_cast<HandleImpl**>(handle);
+    return GetShard(hash).Insert(key, hash, value, helper, charge, h_out,
+                                 priority);
+  }
+
+  Handle* Lookup(const Slice& key, Statistics* /*stats*/) override {
+    HashVal hash = CacheShard::ComputeHash(key);
+    HandleImpl* result = GetShard(hash).Lookup(key, hash);
+    return reinterpret_cast<Handle*>(result);
+  }
+  Handle* Lookup(const Slice& key, const CacheItemHelper* helper,
+                 const CreateCallback& create_cb, Priority priority, bool wait,
+                 Statistics* stats = nullptr) override {
+    HashVal hash = CacheShard::ComputeHash(key);
+    HandleImpl* result = GetShard(hash).Lookup(key, hash, helper, create_cb,
+                                               priority, wait, stats);
+    return reinterpret_cast<Handle*>(result);
+  }
+
+  void Erase(const Slice& key) override {
+    HashVal hash = CacheShard::ComputeHash(key);
+    GetShard(hash).Erase(key, hash);
+  }
+
+  bool Release(Handle* handle, bool useful,
+               bool erase_if_last_ref = false) override {
+    auto h = reinterpret_cast<HandleImpl*>(handle);
+    return GetShard(h->GetHash()).Release(h, useful, erase_if_last_ref);
+  }
+  bool IsReady(Handle* handle) override {
+    auto h = reinterpret_cast<HandleImpl*>(handle);
+    return GetShard(h->GetHash()).IsReady(h);
+  }
+  void Wait(Handle* handle) override {
+    auto h = reinterpret_cast<HandleImpl*>(handle);
+    GetShard(h->GetHash()).Wait(h);
+  }
+  bool Ref(Handle* handle) override {
+    auto h = reinterpret_cast<HandleImpl*>(handle);
+    return GetShard(h->GetHash()).Ref(h);
+  }
+  bool Release(Handle* handle, bool erase_if_last_ref = false) override {
+    return Release(handle, true /*useful*/, erase_if_last_ref);
+  }
+  using ShardedCacheBase::GetUsage;
+  size_t GetUsage() const override {
+    return SumOverShards2(&CacheShard::GetUsage);
+  }
+  size_t GetPinnedUsage() const override {
+    return SumOverShards2(&CacheShard::GetPinnedUsage);
+  }
+  size_t GetOccupancyCount() const override {
+    return SumOverShards2(&CacheShard::GetPinnedUsage);
+  }
+  size_t GetTableAddressCount() const override {
+    return SumOverShards2(&CacheShard::GetTableAddressCount);
+  }
+  void ApplyToAllEntries(
+      const std::function<void(const Slice& key, void* value, size_t charge,
+                               DeleterFn deleter)>& callback,
+      const ApplyToAllEntriesOptions& opts) override {
+    uint32_t num_shards = GetNumShards();
+    // Iterate over part of each shard, rotating between shards, to
+    // minimize impact on latency of concurrent operations.
+    std::unique_ptr<size_t[]> states(new size_t[num_shards]{});
+
+    size_t aepl = opts.average_entries_per_lock;
+    aepl = std::min(aepl, size_t{1});
+
+    bool remaining_work;
+    do {
+      remaining_work = false;
+      for (uint32_t i = 0; i < num_shards; i++) {
+        if (states[i] != SIZE_MAX) {
+          shards_[i].ApplyToSomeEntries(callback, aepl, &states[i]);
+          remaining_work |= states[i] != SIZE_MAX;
+        }
+      }
+    } while (remaining_work);
+  }
+
+  virtual void EraseUnRefEntries() override {
+    ForEachShard([](CacheShard* cs) { cs->EraseUnRefEntries(); });
+  }
+
+  void DisownData() override {
+    // Leak data only if that won't generate an ASAN/valgrind warning.
+    if (!kMustFreeHeapAllocations) {
+      destroy_shards_in_dtor_ = false;
+    }
+  }
+
+ protected:
+  inline void ForEachShard(const std::function<void(CacheShard*)>& fn) {
+    uint32_t num_shards = GetNumShards();
+    for (uint32_t i = 0; i < num_shards; i++) {
+      fn(shards_ + i);
+    }
+  }
+
+  inline size_t SumOverShards(
+      const std::function<size_t(CacheShard&)>& fn) const {
+    uint32_t num_shards = GetNumShards();
+    size_t result = 0;
+    for (uint32_t i = 0; i < num_shards; i++) {
+      result += fn(shards_[i]);
+    }
+    return result;
+  }
+
+  inline size_t SumOverShards2(size_t (CacheShard::*fn)() const) const {
+    return SumOverShards([fn](CacheShard& cs) { return (cs.*fn)(); });
+  }
+
+  // Must be called exactly once by derived class constructor
+  void InitShards(const std::function<void(CacheShard*)>& placement_new) {
+    ForEachShard(placement_new);
+    destroy_shards_in_dtor_ = true;
+  }
+
+  void AppendPrintableOptions(std::string& str) const override {
+    shards_[0].AppendPrintableOptions(str);
+  }
+
+ private:
+  CacheShard* const shards_;
+  bool destroy_shards_in_dtor_;
 };
 
-extern int GetDefaultCacheShardBits(size_t capacity);
+// 512KB is traditional minimum shard size.
+int GetDefaultCacheShardBits(size_t capacity,
+                             size_t min_shard_size = 512U * 1024U);
 
 }  // namespace ROCKSDB_NAMESPACE