1 // Copyright (c) 2013, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
22 #include "rocksdb/cache.h"
23 #include "rocksdb/comparator.h"
24 #include "rocksdb/persistent_cache.h"
26 #include "utilities/persistent_cache/block_cache_tier_file.h"
27 #include "utilities/persistent_cache/block_cache_tier_metadata.h"
28 #include "utilities/persistent_cache/persistent_cache_util.h"
30 #include "memtable/skiplist.h"
31 #include "monitoring/histogram.h"
32 #include "port/port.h"
33 #include "util/arena.h"
34 #include "util/coding.h"
35 #include "util/crc32c.h"
36 #include "util/mutexlock.h"
41 // Block cache tier implementation
43 class BlockCacheTier
: public PersistentCacheTier
{
45 explicit BlockCacheTier(const PersistentCacheConfig
& opt
)
47 insert_ops_(static_cast<size_t>(opt_
.max_write_pipeline_backlog_size
)),
48 buffer_allocator_(opt
.write_buffer_size
, opt
.write_buffer_count()),
49 writer_(this, opt_
.writer_qdepth
, static_cast<size_t>(opt_
.writer_dispatch_size
)) {
50 Info(opt_
.log
, "Initializing allocator. size=%d B count=%d",
51 opt_
.write_buffer_size
, opt_
.write_buffer_count());
54 virtual ~BlockCacheTier() {
55 // Close is re-entrant so we can call close even if it is already closed
57 assert(!insert_th_
.joinable());
60 Status
Insert(const Slice
& key
, const char* data
, const size_t size
) override
;
61 Status
Lookup(const Slice
& key
, std::unique_ptr
<char[]>* data
,
62 size_t* size
) override
;
63 Status
Open() override
;
64 Status
Close() override
;
65 bool Erase(const Slice
& key
) override
;
66 bool Reserve(const size_t size
) override
;
68 bool IsCompressed() override
{ return opt_
.is_compressed
; }
70 std::string
GetPrintableOptions() const override
{ return opt_
.ToString(); }
72 PersistentCache::StatsType
Stats() override
;
74 void TEST_Flush() override
{
75 while (insert_ops_
.Size()) {
77 Env::Default()->SleepForMicroseconds(1000000);
82 // Percentage of cache to be evicted when the cache is full
83 static const size_t kEvictPct
= 10;
84 // Max attempts to insert key, value to cache in pipelined mode
85 static const size_t kMaxRetry
= 3;
87 // Pipelined operation
89 explicit InsertOp(const bool signal
) : signal_(signal
) {}
90 explicit InsertOp(std::string
&& key
, const std::string
& data
)
91 : key_(std::move(key
)), data_(data
) {}
95 InsertOp(InsertOp
&& /*rhs*/) = default;
96 InsertOp
& operator=(InsertOp
&& rhs
) = default;
98 // used for estimating size by bounded queue
99 size_t Size() { return data_
.size() + key_
.size(); }
103 const bool signal_
= false; // signal to request processing thread to exit
106 // entry point for insert thread
108 // insert implementation
109 Status
InsertImpl(const Slice
& key
, const Slice
& data
);
110 // Create a new cache file
111 Status
NewCacheFile();
112 // Get cache directory path
113 std::string
GetCachePath() const { return opt_
.path
+ "/cache"; }
115 Status
CleanupCacheFolder(const std::string
& folder
);
119 HistogramImpl bytes_pipelined_
;
120 HistogramImpl bytes_written_
;
121 HistogramImpl bytes_read_
;
122 HistogramImpl read_hit_latency_
;
123 HistogramImpl read_miss_latency_
;
124 HistogramImpl write_latency_
;
125 std::atomic
<uint64_t> cache_hits_
{0};
126 std::atomic
<uint64_t> cache_misses_
{0};
127 std::atomic
<uint64_t> cache_errors_
{0};
128 std::atomic
<uint64_t> insert_dropped_
{0};
130 double CacheHitPct() const {
131 const auto lookups
= cache_hits_
+ cache_misses_
;
132 return lookups
? 100 * cache_hits_
/ static_cast<double>(lookups
) : 0.0;
135 double CacheMissPct() const {
136 const auto lookups
= cache_hits_
+ cache_misses_
;
137 return lookups
? 100 * cache_misses_
/ static_cast<double>(lookups
) : 0.0;
141 port::RWMutex lock_
; // Synchronization
142 const PersistentCacheConfig opt_
; // BlockCache options
143 BoundedQueue
<InsertOp
> insert_ops_
; // Ops waiting for insert
144 rocksdb::port::Thread insert_th_
; // Insert thread
145 uint32_t writer_cache_id_
= 0; // Current cache file identifier
146 WriteableCacheFile
* cache_file_
= nullptr; // Current cache file reference
147 CacheWriteBufferAllocator buffer_allocator_
; // Buffer provider
148 ThreadedWriter writer_
; // Writer threads
149 BlockCacheTierMetadata metadata_
; // Cache meta data manager
150 std::atomic
<uint64_t> size_
{0}; // Size of the cache
151 Statistics stats_
; // Statistics
154 } // namespace rocksdb