#pragma once
#include <atomic>
+#include <condition_variable>
#include <cstddef>
+#include <list>
+#include <mutex>
+
#include "rocksdb/cache.h"
namespace ROCKSDB_NAMESPACE {
+class CacheReservationManager;
+
+// Interface to block and signal DB instances, intended for RocksDB
+// internal use only. Each DB instance contains ptr to StallInterface.
+class StallInterface {
+ public:
+ virtual ~StallInterface() {}
+
+ virtual void Block() = 0;
-class WriteBufferManager {
+ virtual void Signal() = 0;
+};
+
+class WriteBufferManager final {
public:
- // _buffer_size = 0 indicates no limit. Memory won't be capped.
+ // Parameters:
+ // _buffer_size: _buffer_size = 0 indicates no limit. Memory won't be capped.
// memory_usage() won't be valid and ShouldFlush() will always return true.
- // if `cache` is provided, we'll put dummy entries in the cache and cost
- // the memory allocated to the cache. It can be used even if _buffer_size = 0.
+ //
+ // cache_: if `cache` is provided, we'll put dummy entries in the cache and
+ // cost the memory allocated to the cache. It can be used even if _buffer_size
+ // = 0.
+ //
+ // allow_stall: if set true, it will enable stalling of writes when
+ // memory_usage() exceeds buffer_size. It will wait for flush to complete and
+ // memory usage to drop down.
explicit WriteBufferManager(size_t _buffer_size,
- std::shared_ptr<Cache> cache = {});
+ std::shared_ptr<Cache> cache = {},
+ bool allow_stall = false);
// No copying allowed
WriteBufferManager(const WriteBufferManager&) = delete;
WriteBufferManager& operator=(const WriteBufferManager&) = delete;
~WriteBufferManager();
- bool enabled() const { return buffer_size_ != 0; }
+ // Returns true if buffer_limit is passed to limit the total memory usage and
+ // is greater than 0.
+ bool enabled() const { return buffer_size() > 0; }
- bool cost_to_cache() const { return cache_rep_ != nullptr; }
+ // Returns true if pointer to cache is passed.
+ bool cost_to_cache() const { return cache_res_mgr_ != nullptr; }
+ // Returns the total memory used by memtables.
// Only valid if enabled()
size_t memory_usage() const {
return memory_used_.load(std::memory_order_relaxed);
}
+
+ // Returns the total memory used by active memtables.
size_t mutable_memtable_memory_usage() const {
return memory_active_.load(std::memory_order_relaxed);
}
- size_t buffer_size() const { return buffer_size_; }
+
+ size_t dummy_entries_in_cache_usage() const;
+
+ // Returns the buffer_size.
+ size_t buffer_size() const {
+ return buffer_size_.load(std::memory_order_relaxed);
+ }
+
+ void SetBufferSize(size_t new_size) {
+ buffer_size_.store(new_size, std::memory_order_relaxed);
+ mutable_limit_.store(new_size * 7 / 8, std::memory_order_relaxed);
+ // Check if stall is active and can be ended.
+ MaybeEndWriteStall();
+ }
+
+ // Below functions should be called by RocksDB internally.
// Should only be called from write thread
bool ShouldFlush() const {
if (enabled()) {
- if (mutable_memtable_memory_usage() > mutable_limit_) {
+ if (mutable_memtable_memory_usage() >
+ mutable_limit_.load(std::memory_order_relaxed)) {
return true;
}
- if (memory_usage() >= buffer_size_ &&
- mutable_memtable_memory_usage() >= buffer_size_ / 2) {
+ size_t local_size = buffer_size();
+ if (memory_usage() >= local_size &&
+ mutable_memtable_memory_usage() >= local_size / 2) {
// If the memory exceeds the buffer size, we trigger more aggressive
// flush. But if already more than half memory is being flushed,
// triggering more flush may not help. We will hold it instead.
return false;
}
- void ReserveMem(size_t mem) {
- if (cache_rep_ != nullptr) {
- ReserveMemWithCache(mem);
- } else if (enabled()) {
- memory_used_.fetch_add(mem, std::memory_order_relaxed);
- }
- if (enabled()) {
- memory_active_.fetch_add(mem, std::memory_order_relaxed);
+ // Returns true if total memory usage exceeded buffer_size.
+ // We stall the writes untill memory_usage drops below buffer_size. When the
+ // function returns true, all writer threads (including one checking this
+ // condition) across all DBs will be stalled. Stall is allowed only if user
+ // pass allow_stall = true during WriteBufferManager instance creation.
+ //
+ // Should only be called by RocksDB internally .
+ bool ShouldStall() const {
+ if (!allow_stall_ || !enabled()) {
+ return false;
}
+
+ return IsStallActive() || IsStallThresholdExceeded();
}
- // We are in the process of freeing `mem` bytes, so it is not considered
- // when checking the soft limit.
- void ScheduleFreeMem(size_t mem) {
- if (enabled()) {
- memory_active_.fetch_sub(mem, std::memory_order_relaxed);
- }
+
+ // Returns true if stall is active.
+ bool IsStallActive() const {
+ return stall_active_.load(std::memory_order_relaxed);
}
- void FreeMem(size_t mem) {
- if (cache_rep_ != nullptr) {
- FreeMemWithCache(mem);
- } else if (enabled()) {
- memory_used_.fetch_sub(mem, std::memory_order_relaxed);
- }
+
+ // Returns true if stalling condition is met.
+ bool IsStallThresholdExceeded() const {
+ return memory_usage() >= buffer_size_;
}
+ void ReserveMem(size_t mem);
+
+ // We are in the process of freeing `mem` bytes, so it is not considered
+ // when checking the soft limit.
+ void ScheduleFreeMem(size_t mem);
+
+ void FreeMem(size_t mem);
+
+ // Add the DB instance to the queue and block the DB.
+ // Should only be called by RocksDB internally.
+ void BeginWriteStall(StallInterface* wbm_stall);
+
+ // If stall conditions have resolved, remove DB instances from queue and
+ // signal them to continue.
+ void MaybeEndWriteStall();
+
+ void RemoveDBFromQueue(StallInterface* wbm_stall);
+
private:
- const size_t buffer_size_;
- const size_t mutable_limit_;
+ std::atomic<size_t> buffer_size_;
+ std::atomic<size_t> mutable_limit_;
std::atomic<size_t> memory_used_;
// Memory that hasn't been scheduled to free.
std::atomic<size_t> memory_active_;
- struct CacheRep;
- std::unique_ptr<CacheRep> cache_rep_;
+ std::shared_ptr<CacheReservationManager> cache_res_mgr_;
+ // Protects cache_res_mgr_
+ std::mutex cache_res_mgr_mu_;
+
+ std::list<StallInterface*> queue_;
+ // Protects the queue_ and stall_active_.
+ std::mutex mu_;
+ bool allow_stall_;
+ // Value should only be changed by BeginWriteStall() and MaybeEndWriteStall()
+ // while holding mu_, but it can be read without a lock.
+ std::atomic<bool> stall_active_;
void ReserveMemWithCache(size_t mem);
void FreeMemWithCache(size_t mem);