1 // Copyright (c) 2013, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
21 #include "rocksdb/cache.h"
22 #include "rocksdb/comparator.h"
23 #include "rocksdb/persistent_cache.h"
25 #include "utilities/persistent_cache/block_cache_tier_file.h"
26 #include "utilities/persistent_cache/block_cache_tier_metadata.h"
27 #include "utilities/persistent_cache/persistent_cache_util.h"
29 #include "memtable/skiplist.h"
30 #include "monitoring/histogram.h"
31 #include "port/port.h"
32 #include "util/arena.h"
33 #include "util/coding.h"
34 #include "util/crc32c.h"
35 #include "util/mutexlock.h"
40 // Block cache tier implementation
42 class BlockCacheTier
: public PersistentCacheTier
{
44 explicit BlockCacheTier(const PersistentCacheConfig
& opt
)
46 insert_ops_(opt_
.max_write_pipeline_backlog_size
),
47 buffer_allocator_(opt
.write_buffer_size
, opt
.write_buffer_count()),
48 writer_(this, opt_
.writer_qdepth
, opt_
.writer_dispatch_size
) {
49 Info(opt_
.log
, "Initializing allocator. size=%d B count=%d",
50 opt_
.write_buffer_size
, opt_
.write_buffer_count());
53 virtual ~BlockCacheTier() {
54 // Close is re-entrant so we can call close even if it is already closed
56 assert(!insert_th_
.joinable());
59 Status
Insert(const Slice
& key
, const char* data
, const size_t size
) override
;
60 Status
Lookup(const Slice
& key
, std::unique_ptr
<char[]>* data
,
61 size_t* size
) override
;
62 Status
Open() override
;
63 Status
Close() override
;
64 bool Erase(const Slice
& key
) override
;
65 bool Reserve(const size_t size
) override
;
67 bool IsCompressed() override
{ return opt_
.is_compressed
; }
69 std::string
GetPrintableOptions() const override
{ return opt_
.ToString(); }
71 PersistentCache::StatsType
Stats() override
;
73 void TEST_Flush() override
{
74 while (insert_ops_
.Size()) {
76 Env::Default()->SleepForMicroseconds(1000000);
81 // Percentage of cache to be evicted when the cache is full
82 static const size_t kEvictPct
= 10;
83 // Max attempts to insert key, value to cache in pipelined mode
84 static const size_t kMaxRetry
= 3;
86 // Pipelined operation
88 explicit InsertOp(const bool signal
) : signal_(signal
) {}
89 explicit InsertOp(std::string
&& key
, const std::string
& data
)
90 : key_(std::move(key
)), data_(data
) {}
94 InsertOp(InsertOp
&& rhs
) = default;
95 InsertOp
& operator=(InsertOp
&& rhs
) = default;
97 // used for estimating size by bounded queue
98 size_t Size() { return data_
.size() + key_
.size(); }
102 const bool signal_
= false; // signal to request processing thread to exit
105 // entry point for insert thread
107 // insert implementation
108 Status
InsertImpl(const Slice
& key
, const Slice
& data
);
109 // Create a new cache file
110 Status
NewCacheFile();
111 // Get cache directory path
112 std::string
GetCachePath() const { return opt_
.path
+ "/cache"; }
114 Status
CleanupCacheFolder(const std::string
& folder
);
118 HistogramImpl bytes_pipelined_
;
119 HistogramImpl bytes_written_
;
120 HistogramImpl bytes_read_
;
121 HistogramImpl read_hit_latency_
;
122 HistogramImpl read_miss_latency_
;
123 HistogramImpl write_latency_
;
124 uint64_t cache_hits_
= 0;
125 uint64_t cache_misses_
= 0;
126 uint64_t cache_errors_
= 0;
127 uint64_t insert_dropped_
= 0;
129 double CacheHitPct() const {
130 const auto lookups
= cache_hits_
+ cache_misses_
;
131 return lookups
? 100 * cache_hits_
/ static_cast<double>(lookups
) : 0.0;
134 double CacheMissPct() const {
135 const auto lookups
= cache_hits_
+ cache_misses_
;
136 return lookups
? 100 * cache_misses_
/ static_cast<double>(lookups
) : 0.0;
140 port::RWMutex lock_
; // Synchronization
141 const PersistentCacheConfig opt_
; // BlockCache options
142 BoundedQueue
<InsertOp
> insert_ops_
; // Ops waiting for insert
143 rocksdb::port::Thread insert_th_
; // Insert thread
144 uint32_t writer_cache_id_
= 0; // Current cache file identifier
145 WriteableCacheFile
* cache_file_
= nullptr; // Current cache file reference
146 CacheWriteBufferAllocator buffer_allocator_
; // Buffer provider
147 ThreadedWriter writer_
; // Writer threads
148 BlockCacheTierMetadata metadata_
; // Cache meta data manager
149 std::atomic
<uint64_t> size_
{0}; // Size of the cache
150 Statistics stats_
; // Statistics
153 } // namespace rocksdb