// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "rocksdb/write_buffer_manager.h"
-#include <mutex>
-#include "util/coding.h"
-namespace ROCKSDB_NAMESPACE {
-#ifndef ROCKSDB_LITE
-namespace {
-const size_t kSizeDummyEntry = 256 * 1024;
-// The key will be longer than keys for blocks in SST files so they won't
-// conflict.
-const size_t kCacheKeyPrefix = kMaxVarint64Length * 4 + 1;
-} // namespace
-
-struct WriteBufferManager::CacheRep {
- std::shared_ptr<Cache> cache_;
- std::mutex cache_mutex_;
- std::atomic<size_t> cache_allocated_size_;
- // The non-prefix part will be updated according to the ID to use.
- char cache_key_[kCacheKeyPrefix + kMaxVarint64Length];
- uint64_t next_cache_key_id_ = 0;
- std::vector<Cache::Handle*> dummy_handles_;
-
- explicit CacheRep(std::shared_ptr<Cache> cache)
- : cache_(cache), cache_allocated_size_(0) {
- memset(cache_key_, 0, kCacheKeyPrefix);
- size_t pointer_size = sizeof(const void*);
- assert(pointer_size <= kCacheKeyPrefix);
- memcpy(cache_key_, static_cast<const void*>(this), pointer_size);
- }
+#include <memory>
- Slice GetNextCacheKey() {
- memset(cache_key_ + kCacheKeyPrefix, 0, kMaxVarint64Length);
- char* end =
- EncodeVarint64(cache_key_ + kCacheKeyPrefix, next_cache_key_id_++);
- return Slice(cache_key_, static_cast<size_t>(end - cache_key_));
- }
-};
-#else
-struct WriteBufferManager::CacheRep {};
-#endif // ROCKSDB_LITE
+#include "cache/cache_entry_roles.h"
+#include "cache/cache_reservation_manager.h"
+#include "db/db_impl/db_impl.h"
+#include "rocksdb/status.h"
+#include "util/coding.h"
+namespace ROCKSDB_NAMESPACE {
WriteBufferManager::WriteBufferManager(size_t _buffer_size,
- std::shared_ptr<Cache> cache)
+ std::shared_ptr<Cache> cache,
+ bool allow_stall)
: buffer_size_(_buffer_size),
mutable_limit_(buffer_size_ * 7 / 8),
memory_used_(0),
memory_active_(0),
- cache_rep_(nullptr) {
+ cache_res_mgr_(nullptr),
+ allow_stall_(allow_stall),
+ stall_active_(false) {
#ifndef ROCKSDB_LITE
if (cache) {
- // Construct the cache key using the pointer to this.
- cache_rep_.reset(new CacheRep(cache));
+ // Memtable's memory usage tends to fluctuate frequently
+ // therefore we set delayed_decrease = true to save some dummy entry
+ // insertion on memory increase right after memory decrease
+ cache_res_mgr_ = std::make_shared<
+ CacheReservationManagerImpl<CacheEntryRole::kWriteBuffer>>(
+ cache, true /* delayed_decrease */);
}
#else
(void)cache;
}
WriteBufferManager::~WriteBufferManager() {
-#ifndef ROCKSDB_LITE
- if (cache_rep_) {
- for (auto* handle : cache_rep_->dummy_handles_) {
- if (handle != nullptr) {
- cache_rep_->cache_->Release(handle, true);
- }
- }
+#ifndef NDEBUG
+ std::unique_lock<std::mutex> lock(mu_);
+ assert(queue_.empty());
+#endif
+}
+
+std::size_t WriteBufferManager::dummy_entries_in_cache_usage() const {
+ if (cache_res_mgr_ != nullptr) {
+ return cache_res_mgr_->GetTotalReservedCacheSize();
+ } else {
+ return 0;
+ }
+}
+
+void WriteBufferManager::ReserveMem(size_t mem) {
+ if (cache_res_mgr_ != nullptr) {
+ ReserveMemWithCache(mem);
+ } else if (enabled()) {
+ memory_used_.fetch_add(mem, std::memory_order_relaxed);
+ }
+ if (enabled()) {
+ memory_active_.fetch_add(mem, std::memory_order_relaxed);
}
-#endif // ROCKSDB_LITE
}
// Should only be called from write thread
void WriteBufferManager::ReserveMemWithCache(size_t mem) {
#ifndef ROCKSDB_LITE
- assert(cache_rep_ != nullptr);
+ assert(cache_res_mgr_ != nullptr);
// Use a mutex to protect various data structures. Can be optimized to a
// lock-free solution if it ends up with a performance bottleneck.
- std::lock_guard<std::mutex> lock(cache_rep_->cache_mutex_);
+ std::lock_guard<std::mutex> lock(cache_res_mgr_mu_);
size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) + mem;
memory_used_.store(new_mem_used, std::memory_order_relaxed);
- while (new_mem_used > cache_rep_->cache_allocated_size_) {
- // Expand size by at least 256KB.
- // Add a dummy record to the cache
- Cache::Handle* handle = nullptr;
- Status s =
- cache_rep_->cache_->Insert(cache_rep_->GetNextCacheKey(), nullptr,
- kSizeDummyEntry, nullptr, &handle);
- s.PermitUncheckedError(); // TODO: What to do on error?
- // We keep the handle even if insertion fails and a null handle is
- // returned, so that when memory shrinks, we don't release extra
- // entries from cache.
- // Ideallly we should prevent this allocation from happening if
- // this insertion fails. However, the callers to this code path
- // are not able to handle failures properly. We'll need to improve
- // it in the future.
- cache_rep_->dummy_handles_.push_back(handle);
- cache_rep_->cache_allocated_size_ += kSizeDummyEntry;
- }
+ Status s = cache_res_mgr_->UpdateCacheReservation(new_mem_used);
+
+ // We absorb the error since WriteBufferManager is not able to handle
+ // this failure properly. Ideallly we should prevent this allocation
+ // from happening if this cache charging fails.
+ // [TODO] We'll need to improve it in the future and figure out what to do on
+ // error
+ s.PermitUncheckedError();
#else
(void)mem;
#endif // ROCKSDB_LITE
}
+void WriteBufferManager::ScheduleFreeMem(size_t mem) {
+ if (enabled()) {
+ memory_active_.fetch_sub(mem, std::memory_order_relaxed);
+ }
+}
+
+void WriteBufferManager::FreeMem(size_t mem) {
+ if (cache_res_mgr_ != nullptr) {
+ FreeMemWithCache(mem);
+ } else if (enabled()) {
+ memory_used_.fetch_sub(mem, std::memory_order_relaxed);
+ }
+ // Check if stall is active and can be ended.
+ MaybeEndWriteStall();
+}
+
void WriteBufferManager::FreeMemWithCache(size_t mem) {
#ifndef ROCKSDB_LITE
- assert(cache_rep_ != nullptr);
+ assert(cache_res_mgr_ != nullptr);
// Use a mutex to protect various data structures. Can be optimized to a
// lock-free solution if it ends up with a performance bottleneck.
- std::lock_guard<std::mutex> lock(cache_rep_->cache_mutex_);
+ std::lock_guard<std::mutex> lock(cache_res_mgr_mu_);
size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) - mem;
memory_used_.store(new_mem_used, std::memory_order_relaxed);
- // Gradually shrink memory costed in the block cache if the actual
- // usage is less than 3/4 of what we reserve from the block cache.
- // We do this because:
- // 1. we don't pay the cost of the block cache immediately a memtable is
- // freed, as block cache insert is expensive;
- // 2. eventually, if we walk away from a temporary memtable size increase,
- // we make sure shrink the memory costed in block cache over time.
- // In this way, we only shrink costed memory showly even there is enough
- // margin.
- if (new_mem_used < cache_rep_->cache_allocated_size_ / 4 * 3 &&
- cache_rep_->cache_allocated_size_ - kSizeDummyEntry > new_mem_used) {
- assert(!cache_rep_->dummy_handles_.empty());
- auto* handle = cache_rep_->dummy_handles_.back();
- // If insert failed, handle is null so we should not release.
- if (handle != nullptr) {
- cache_rep_->cache_->Release(handle, true);
- }
- cache_rep_->dummy_handles_.pop_back();
- cache_rep_->cache_allocated_size_ -= kSizeDummyEntry;
- }
+ Status s = cache_res_mgr_->UpdateCacheReservation(new_mem_used);
+
+ // We absorb the error since WriteBufferManager is not able to handle
+ // this failure properly.
+ // [TODO] We'll need to improve it in the future and figure out what to do on
+ // error
+ s.PermitUncheckedError();
#else
(void)mem;
#endif // ROCKSDB_LITE
}
+
+void WriteBufferManager::BeginWriteStall(StallInterface* wbm_stall) {
+ assert(wbm_stall != nullptr);
+ assert(allow_stall_);
+
+ // Allocate outside of the lock.
+ std::list<StallInterface*> new_node = {wbm_stall};
+
+ {
+ std::unique_lock<std::mutex> lock(mu_);
+ // Verify if the stall conditions are stil active.
+ if (ShouldStall()) {
+ stall_active_.store(true, std::memory_order_relaxed);
+ queue_.splice(queue_.end(), std::move(new_node));
+ }
+ }
+
+ // If the node was not consumed, the stall has ended already and we can signal
+ // the caller.
+ if (!new_node.empty()) {
+ new_node.front()->Signal();
+ }
+}
+
+// Called when memory is freed in FreeMem or the buffer size has changed.
+void WriteBufferManager::MaybeEndWriteStall() {
+ // Cannot early-exit on !enabled() because SetBufferSize(0) needs to unblock
+ // the writers.
+ if (!allow_stall_) {
+ return;
+ }
+
+ if (IsStallThresholdExceeded()) {
+ return; // Stall conditions have not resolved.
+ }
+
+ // Perform all deallocations outside of the lock.
+ std::list<StallInterface*> cleanup;
+
+ std::unique_lock<std::mutex> lock(mu_);
+ if (!stall_active_.load(std::memory_order_relaxed)) {
+ return; // Nothing to do.
+ }
+
+ // Unblock new writers.
+ stall_active_.store(false, std::memory_order_relaxed);
+
+ // Unblock the writers in the queue.
+ for (StallInterface* wbm_stall : queue_) {
+ wbm_stall->Signal();
+ }
+ cleanup = std::move(queue_);
+}
+
+void WriteBufferManager::RemoveDBFromQueue(StallInterface* wbm_stall) {
+ assert(wbm_stall != nullptr);
+
+ // Deallocate the removed nodes outside of the lock.
+ std::list<StallInterface*> cleanup;
+
+ if (enabled() && allow_stall_) {
+ std::unique_lock<std::mutex> lock(mu_);
+ for (auto it = queue_.begin(); it != queue_.end();) {
+ auto next = std::next(it);
+ if (*it == wbm_stall) {
+ cleanup.splice(cleanup.end(), queue_, std::move(it));
+ }
+ it = next;
+ }
+ }
+ wbm_stall->Signal();
+}
+
} // namespace ROCKSDB_NAMESPACE