]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/include/rocksdb/write_buffer_manager.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / include / rocksdb / write_buffer_manager.h
index ae1c98cafb2d666930bf0d72e6e04594c4d7a960..7fb18196d7e571915dd24c2f951a3c8a0c308836 100644 (file)
 #pragma once
 
 #include <atomic>
+#include <condition_variable>
 #include <cstddef>
+#include <list>
+#include <mutex>
+
 #include "rocksdb/cache.h"
 
 namespace ROCKSDB_NAMESPACE {
+class CacheReservationManager;
+
+// Interface to block and signal DB instances, intended for RocksDB
+// internal use only. Each DB instance contains ptr to StallInterface.
+class StallInterface {
+ public:
+  virtual ~StallInterface() {}
+
+  virtual void Block() = 0;
 
-class WriteBufferManager {
+  virtual void Signal() = 0;
+};
+
+class WriteBufferManager final {
  public:
-  // _buffer_size = 0 indicates no limit. Memory won't be capped.
+  // Parameters:
+  // _buffer_size: _buffer_size = 0 indicates no limit. Memory won't be capped.
   // memory_usage() won't be valid and ShouldFlush() will always return true.
-  // if `cache` is provided, we'll put dummy entries in the cache and cost
-  // the memory allocated to the cache. It can be used even if _buffer_size = 0.
+  //
+  // cache_: if `cache` is provided, we'll put dummy entries in the cache and
+  // cost the memory allocated to the cache. It can be used even if _buffer_size
+  // = 0.
+  //
+  // allow_stall: if set true, it will enable stalling of writes when
+  // memory_usage() exceeds buffer_size. It will wait for flush to complete and
+  // memory usage to drop down.
   explicit WriteBufferManager(size_t _buffer_size,
-                              std::shared_ptr<Cache> cache = {});
+                              std::shared_ptr<Cache> cache = {},
+                              bool allow_stall = false);
   // No copying allowed
   WriteBufferManager(const WriteBufferManager&) = delete;
   WriteBufferManager& operator=(const WriteBufferManager&) = delete;
 
   ~WriteBufferManager();
 
-  bool enabled() const { return buffer_size_ != 0; }
+  // Returns true if buffer_limit is passed to limit the total memory usage and
+  // is greater than 0.
+  bool enabled() const { return buffer_size() > 0; }
 
-  bool cost_to_cache() const { return cache_rep_ != nullptr; }
+  // Returns true if pointer to cache is passed.
+  bool cost_to_cache() const { return cache_res_mgr_ != nullptr; }
 
+  // Returns the total memory used by memtables.
   // Only valid if enabled()
   size_t memory_usage() const {
     return memory_used_.load(std::memory_order_relaxed);
   }
+
+  // Returns the total memory used by active memtables.
   size_t mutable_memtable_memory_usage() const {
     return memory_active_.load(std::memory_order_relaxed);
   }
-  size_t buffer_size() const { return buffer_size_; }
+
+  size_t dummy_entries_in_cache_usage() const;
+
+  // Returns the buffer_size.
+  size_t buffer_size() const {
+    return buffer_size_.load(std::memory_order_relaxed);
+  }
+
+  void SetBufferSize(size_t new_size) {
+    buffer_size_.store(new_size, std::memory_order_relaxed);
+    mutable_limit_.store(new_size * 7 / 8, std::memory_order_relaxed);
+    // Check if stall is active and can be ended.
+    MaybeEndWriteStall();
+  }
+
+  // Below functions should be called by RocksDB internally.
 
   // Should only be called from write thread
   bool ShouldFlush() const {
     if (enabled()) {
-      if (mutable_memtable_memory_usage() > mutable_limit_) {
+      if (mutable_memtable_memory_usage() >
+          mutable_limit_.load(std::memory_order_relaxed)) {
         return true;
       }
-      if (memory_usage() >= buffer_size_ &&
-          mutable_memtable_memory_usage() >= buffer_size_ / 2) {
+      size_t local_size = buffer_size();
+      if (memory_usage() >= local_size &&
+          mutable_memtable_memory_usage() >= local_size / 2) {
         // If the memory exceeds the buffer size, we trigger more aggressive
         // flush. But if already more than half memory is being flushed,
         // triggering more flush may not help. We will hold it instead.
@@ -62,39 +109,66 @@ class WriteBufferManager {
     return false;
   }
 
-  void ReserveMem(size_t mem) {
-    if (cache_rep_ != nullptr) {
-      ReserveMemWithCache(mem);
-    } else if (enabled()) {
-      memory_used_.fetch_add(mem, std::memory_order_relaxed);
-    }
-    if (enabled()) {
-      memory_active_.fetch_add(mem, std::memory_order_relaxed);
+  // Returns true if total memory usage exceeded buffer_size.
+  // We stall the writes untill memory_usage drops below buffer_size. When the
+  // function returns true, all writer threads (including one checking this
+  // condition) across all DBs will be stalled. Stall is allowed only if user
+  // pass allow_stall = true during WriteBufferManager instance creation.
+  //
+  // Should only be called by RocksDB internally .
+  bool ShouldStall() const {
+    if (!allow_stall_ || !enabled()) {
+      return false;
     }
+
+    return IsStallActive() || IsStallThresholdExceeded();
   }
-  // We are in the process of freeing `mem` bytes, so it is not considered
-  // when checking the soft limit.
-  void ScheduleFreeMem(size_t mem) {
-    if (enabled()) {
-      memory_active_.fetch_sub(mem, std::memory_order_relaxed);
-    }
+
+  // Returns true if stall is active.
+  bool IsStallActive() const {
+    return stall_active_.load(std::memory_order_relaxed);
   }
-  void FreeMem(size_t mem) {
-    if (cache_rep_ != nullptr) {
-      FreeMemWithCache(mem);
-    } else if (enabled()) {
-      memory_used_.fetch_sub(mem, std::memory_order_relaxed);
-    }
+
+  // Returns true if stalling condition is met.
+  bool IsStallThresholdExceeded() const {
+    return memory_usage() >= buffer_size_;
   }
 
+  void ReserveMem(size_t mem);
+
+  // We are in the process of freeing `mem` bytes, so it is not considered
+  // when checking the soft limit.
+  void ScheduleFreeMem(size_t mem);
+
+  void FreeMem(size_t mem);
+
+  // Add the DB instance to the queue and block the DB.
+  // Should only be called by RocksDB internally.
+  void BeginWriteStall(StallInterface* wbm_stall);
+
+  // If stall conditions have resolved, remove DB instances from queue and
+  // signal them to continue.
+  void MaybeEndWriteStall();
+
+  void RemoveDBFromQueue(StallInterface* wbm_stall);
+
  private:
-  const size_t buffer_size_;
-  const size_t mutable_limit_;
+  std::atomic<size_t> buffer_size_;
+  std::atomic<size_t> mutable_limit_;
   std::atomic<size_t> memory_used_;
   // Memory that hasn't been scheduled to free.
   std::atomic<size_t> memory_active_;
-  struct CacheRep;
-  std::unique_ptr<CacheRep> cache_rep_;
+  std::shared_ptr<CacheReservationManager> cache_res_mgr_;
+  // Protects cache_res_mgr_
+  std::mutex cache_res_mgr_mu_;
+
+  std::list<StallInterface*> queue_;
+  // Protects the queue_ and stall_active_.
+  std::mutex mu_;
+  bool allow_stall_;
+  // Value should only be changed by BeginWriteStall() and MaybeEndWriteStall()
+  // while holding mu_, but it can be read without a lock.
+  std::atomic<bool> stall_active_;
 
   void ReserveMemWithCache(size_t mem);
   void FreeMemWithCache(size_t mem);