]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/memtable/write_buffer_manager.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / memtable / write_buffer_manager.cc
index 9b74708708175d627b64d27193630f99c3c8b335..8db9816bef71e5020360a2c2bd696a964fbef9c1 100644 (file)
@@ -8,57 +8,34 @@
 // found in the LICENSE file. See the AUTHORS file for names of contributors.
 
 #include "rocksdb/write_buffer_manager.h"
-#include <mutex>
-#include "util/coding.h"
 
-namespace ROCKSDB_NAMESPACE {
-#ifndef ROCKSDB_LITE
-namespace {
-const size_t kSizeDummyEntry = 256 * 1024;
-// The key will be longer than keys for blocks in SST files so they won't
-// conflict.
-const size_t kCacheKeyPrefix = kMaxVarint64Length * 4 + 1;
-}  // namespace
-
-struct WriteBufferManager::CacheRep {
-  std::shared_ptr<Cache> cache_;
-  std::mutex cache_mutex_;
-  std::atomic<size_t> cache_allocated_size_;
-  // The non-prefix part will be updated according to the ID to use.
-  char cache_key_[kCacheKeyPrefix + kMaxVarint64Length];
-  uint64_t next_cache_key_id_ = 0;
-  std::vector<Cache::Handle*> dummy_handles_;
-
-  explicit CacheRep(std::shared_ptr<Cache> cache)
-      : cache_(cache), cache_allocated_size_(0) {
-    memset(cache_key_, 0, kCacheKeyPrefix);
-    size_t pointer_size = sizeof(const void*);
-    assert(pointer_size <= kCacheKeyPrefix);
-    memcpy(cache_key_, static_cast<const void*>(this), pointer_size);
-  }
+#include <memory>
 
-  Slice GetNextCacheKey() {
-    memset(cache_key_ + kCacheKeyPrefix, 0, kMaxVarint64Length);
-    char* end =
-        EncodeVarint64(cache_key_ + kCacheKeyPrefix, next_cache_key_id_++);
-    return Slice(cache_key_, static_cast<size_t>(end - cache_key_));
-  }
-};
-#else
-struct WriteBufferManager::CacheRep {};
-#endif  // ROCKSDB_LITE
+#include "cache/cache_entry_roles.h"
+#include "cache/cache_reservation_manager.h"
+#include "db/db_impl/db_impl.h"
+#include "rocksdb/status.h"
+#include "util/coding.h"
 
+namespace ROCKSDB_NAMESPACE {
 WriteBufferManager::WriteBufferManager(size_t _buffer_size,
-                                       std::shared_ptr<Cache> cache)
+                                       std::shared_ptr<Cache> cache,
+                                       bool allow_stall)
     : buffer_size_(_buffer_size),
       mutable_limit_(buffer_size_ * 7 / 8),
       memory_used_(0),
       memory_active_(0),
-      cache_rep_(nullptr) {
+      cache_res_mgr_(nullptr),
+      allow_stall_(allow_stall),
+      stall_active_(false) {
 #ifndef ROCKSDB_LITE
   if (cache) {
-    // Construct the cache key using the pointer to this.
-    cache_rep_.reset(new CacheRep(cache));
+    // Memtable's memory usage tends to fluctuate frequently
+    // therefore we set delayed_decrease = true to save some dummy entry
+    // insertion on memory increase right after memory decrease
+    cache_res_mgr_ = std::make_shared<
+        CacheReservationManagerImpl<CacheEntryRole::kWriteBuffer>>(
+        cache, true /* delayed_decrease */);
   }
 #else
   (void)cache;
@@ -66,80 +43,160 @@ WriteBufferManager::WriteBufferManager(size_t _buffer_size,
 }
 
 WriteBufferManager::~WriteBufferManager() {
-#ifndef ROCKSDB_LITE
-  if (cache_rep_) {
-    for (auto* handle : cache_rep_->dummy_handles_) {
-      if (handle != nullptr) {
-        cache_rep_->cache_->Release(handle, true);
-      }
-    }
+#ifndef NDEBUG
+  std::unique_lock<std::mutex> lock(mu_);
+  assert(queue_.empty());
+#endif
+}
+
+std::size_t WriteBufferManager::dummy_entries_in_cache_usage() const {
+  if (cache_res_mgr_ != nullptr) {
+    return cache_res_mgr_->GetTotalReservedCacheSize();
+  } else {
+    return 0;
+  }
+}
+
+void WriteBufferManager::ReserveMem(size_t mem) {
+  if (cache_res_mgr_ != nullptr) {
+    ReserveMemWithCache(mem);
+  } else if (enabled()) {
+    memory_used_.fetch_add(mem, std::memory_order_relaxed);
+  }
+  if (enabled()) {
+    memory_active_.fetch_add(mem, std::memory_order_relaxed);
   }
-#endif  // ROCKSDB_LITE
 }
 
 // Should only be called from write thread
 void WriteBufferManager::ReserveMemWithCache(size_t mem) {
 #ifndef ROCKSDB_LITE
-  assert(cache_rep_ != nullptr);
+  assert(cache_res_mgr_ != nullptr);
   // Use a mutex to protect various data structures. Can be optimized to a
   // lock-free solution if it ends up with a performance bottleneck.
-  std::lock_guard<std::mutex> lock(cache_rep_->cache_mutex_);
+  std::lock_guard<std::mutex> lock(cache_res_mgr_mu_);
 
   size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) + mem;
   memory_used_.store(new_mem_used, std::memory_order_relaxed);
-  while (new_mem_used > cache_rep_->cache_allocated_size_) {
-    // Expand size by at least 256KB.
-    // Add a dummy record to the cache
-    Cache::Handle* handle = nullptr;
-    Status s =
-        cache_rep_->cache_->Insert(cache_rep_->GetNextCacheKey(), nullptr,
-                                   kSizeDummyEntry, nullptr, &handle);
-    s.PermitUncheckedError();  // TODO: What to do on error?
-    // We keep the handle even if insertion fails and a null handle is
-    // returned, so that when memory shrinks, we don't release extra
-    // entries from cache.
-    // Ideallly we should prevent this allocation from happening if
-    // this insertion fails. However, the callers to this code path
-    // are not able to handle failures properly. We'll need to improve
-    // it in the future.
-    cache_rep_->dummy_handles_.push_back(handle);
-    cache_rep_->cache_allocated_size_ += kSizeDummyEntry;
-  }
+  Status s = cache_res_mgr_->UpdateCacheReservation(new_mem_used);
+
+  // We absorb the error since WriteBufferManager is not able to handle
+  // this failure properly. Ideallly we should prevent this allocation
+  // from happening if this cache charging fails.
+  // [TODO] We'll need to improve it in the future and figure out what to do on
+  // error
+  s.PermitUncheckedError();
 #else
   (void)mem;
 #endif  // ROCKSDB_LITE
 }
 
+void WriteBufferManager::ScheduleFreeMem(size_t mem) {
+  if (enabled()) {
+    memory_active_.fetch_sub(mem, std::memory_order_relaxed);
+  }
+}
+
+void WriteBufferManager::FreeMem(size_t mem) {
+  if (cache_res_mgr_ != nullptr) {
+    FreeMemWithCache(mem);
+  } else if (enabled()) {
+    memory_used_.fetch_sub(mem, std::memory_order_relaxed);
+  }
+  // Check if stall is active and can be ended.
+  MaybeEndWriteStall();
+}
+
 void WriteBufferManager::FreeMemWithCache(size_t mem) {
 #ifndef ROCKSDB_LITE
-  assert(cache_rep_ != nullptr);
+  assert(cache_res_mgr_ != nullptr);
   // Use a mutex to protect various data structures. Can be optimized to a
   // lock-free solution if it ends up with a performance bottleneck.
-  std::lock_guard<std::mutex> lock(cache_rep_->cache_mutex_);
+  std::lock_guard<std::mutex> lock(cache_res_mgr_mu_);
   size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) - mem;
   memory_used_.store(new_mem_used, std::memory_order_relaxed);
-  // Gradually shrink memory costed in the block cache if the actual
-  // usage is less than 3/4 of what we reserve from the block cache.
-  // We do this because:
-  // 1. we don't pay the cost of the block cache immediately a memtable is
-  //    freed, as block cache insert is expensive;
-  // 2. eventually, if we walk away from a temporary memtable size increase,
-  //    we make sure shrink the memory costed in block cache over time.
-  // In this way, we only shrink costed memory showly even there is enough
-  // margin.
-  if (new_mem_used < cache_rep_->cache_allocated_size_ / 4 * 3 &&
-      cache_rep_->cache_allocated_size_ - kSizeDummyEntry > new_mem_used) {
-    assert(!cache_rep_->dummy_handles_.empty());
-    auto* handle = cache_rep_->dummy_handles_.back();
-    // If insert failed, handle is null so we should not release.
-    if (handle != nullptr) {
-      cache_rep_->cache_->Release(handle, true);
-    }
-    cache_rep_->dummy_handles_.pop_back();
-    cache_rep_->cache_allocated_size_ -= kSizeDummyEntry;
-  }
+  Status s = cache_res_mgr_->UpdateCacheReservation(new_mem_used);
+
+  // We absorb the error since WriteBufferManager is not able to handle
+  // this failure properly.
+  // [TODO] We'll need to improve it in the future and figure out what to do on
+  // error
+  s.PermitUncheckedError();
 #else
   (void)mem;
 #endif  // ROCKSDB_LITE
 }
+
+void WriteBufferManager::BeginWriteStall(StallInterface* wbm_stall) {
+  assert(wbm_stall != nullptr);
+  assert(allow_stall_);
+
+  // Allocate outside of the lock.
+  std::list<StallInterface*> new_node = {wbm_stall};
+
+  {
+    std::unique_lock<std::mutex> lock(mu_);
+    // Verify if the stall conditions are stil active.
+    if (ShouldStall()) {
+      stall_active_.store(true, std::memory_order_relaxed);
+      queue_.splice(queue_.end(), std::move(new_node));
+    }
+  }
+
+  // If the node was not consumed, the stall has ended already and we can signal
+  // the caller.
+  if (!new_node.empty()) {
+    new_node.front()->Signal();
+  }
+}
+
+// Called when memory is freed in FreeMem or the buffer size has changed.
+void WriteBufferManager::MaybeEndWriteStall() {
+  // Cannot early-exit on !enabled() because SetBufferSize(0) needs to unblock
+  // the writers.
+  if (!allow_stall_) {
+    return;
+  }
+
+  if (IsStallThresholdExceeded()) {
+    return;  // Stall conditions have not resolved.
+  }
+
+  // Perform all deallocations outside of the lock.
+  std::list<StallInterface*> cleanup;
+
+  std::unique_lock<std::mutex> lock(mu_);
+  if (!stall_active_.load(std::memory_order_relaxed)) {
+    return;  // Nothing to do.
+  }
+
+  // Unblock new writers.
+  stall_active_.store(false, std::memory_order_relaxed);
+
+  // Unblock the writers in the queue.
+  for (StallInterface* wbm_stall : queue_) {
+    wbm_stall->Signal();
+  }
+  cleanup = std::move(queue_);
+}
+
+void WriteBufferManager::RemoveDBFromQueue(StallInterface* wbm_stall) {
+  assert(wbm_stall != nullptr);
+
+  // Deallocate the removed nodes outside of the lock.
+  std::list<StallInterface*> cleanup;
+
+  if (enabled() && allow_stall_) {
+    std::unique_lock<std::mutex> lock(mu_);
+    for (auto it = queue_.begin(); it != queue_.end();) {
+      auto next = std::next(it);
+      if (*it == wbm_stall) {
+        cleanup.splice(cleanup.end(), queue_, std::move(it));
+      }
+      it = next;
+    }
+  }
+  wbm_stall->Signal();
+}
+
 }  // namespace ROCKSDB_NAMESPACE