]>
git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/persistent_cache/block_cache_tier.cc
1 // Copyright (c) 2013, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
7 #include "utilities/persistent_cache/block_cache_tier.h"
13 #include "logging/logging.h"
14 #include "port/port.h"
15 #include "test_util/sync_point.h"
16 #include "util/stop_watch.h"
17 #include "utilities/persistent_cache/block_cache_tier_file.h"
19 namespace ROCKSDB_NAMESPACE
{
24 Status
BlockCacheTier::Open() {
31 // Check the validity of the options
32 status
= opt_
.ValidateSettings();
35 Error(opt_
.log
, "Invalid block cache options");
39 // Create base directory or cleanup existing directory
40 status
= opt_
.env
->CreateDirIfMissing(opt_
.path
);
42 Error(opt_
.log
, "Error creating directory %s. %s", opt_
.path
.c_str(),
43 status
.ToString().c_str());
47 // Create base/<cache dir> directory
48 status
= opt_
.env
->CreateDir(GetCachePath());
50 // directory already exists, clean it up
51 status
= CleanupCacheFolder(GetCachePath());
54 Error(opt_
.log
, "Error creating directory %s. %s", opt_
.path
.c_str(),
55 status
.ToString().c_str());
62 status
= NewCacheFile();
64 Error(opt_
.log
, "Error creating new file %s. %s", opt_
.path
.c_str(),
65 status
.ToString().c_str());
71 if (opt_
.pipeline_writes
) {
72 assert(!insert_th_
.joinable());
73 insert_th_
= port::Thread(&BlockCacheTier::InsertMain
, this);
79 bool IsCacheFile(const std::string
& file
) {
80 // check if the file has .rc suffix
81 // Unfortunately regex support across compilers is not even, so we use simple
83 size_t pos
= file
.find(".");
84 if (pos
== std::string::npos
) {
88 std::string suffix
= file
.substr(pos
);
89 return suffix
== ".rc";
92 Status
BlockCacheTier::CleanupCacheFolder(const std::string
& folder
) {
93 std::vector
<std::string
> files
;
94 Status status
= opt_
.env
->GetChildren(folder
, &files
);
96 Error(opt_
.log
, "Error getting files for %s. %s", folder
.c_str(),
97 status
.ToString().c_str());
101 // cleanup files with the patter :digi:.rc
102 for (auto file
: files
) {
103 if (IsCacheFile(file
)) {
105 Info(opt_
.log
, "Removing file %s.", file
.c_str());
106 status
= opt_
.env
->DeleteFile(folder
+ "/" + file
);
108 Error(opt_
.log
, "Error deleting file %s. %s", file
.c_str(),
109 status
.ToString().c_str());
113 ROCKS_LOG_DEBUG(opt_
.log
, "Skipping file %s", file
.c_str());
119 Status
BlockCacheTier::Close() {
120 // stop the insert thread
121 if (opt_
.pipeline_writes
&& insert_th_
.joinable()) {
122 InsertOp
op(/*quit=*/true);
123 insert_ops_
.Push(std::move(op
));
127 // stop the writer before
130 // clear all metadata
137 void Add(std::map
<std::string
, double>* stats
, const std::string
& key
,
139 stats
->insert({key
, static_cast<double>(t
)});
142 PersistentCache::StatsType
BlockCacheTier::Stats() {
143 std::map
<std::string
, double> stats
;
144 Add(&stats
, "persistentcache.blockcachetier.bytes_piplined",
145 stats_
.bytes_pipelined_
.Average());
146 Add(&stats
, "persistentcache.blockcachetier.bytes_written",
147 stats_
.bytes_written_
.Average());
148 Add(&stats
, "persistentcache.blockcachetier.bytes_read",
149 stats_
.bytes_read_
.Average());
150 Add(&stats
, "persistentcache.blockcachetier.insert_dropped",
151 stats_
.insert_dropped_
);
152 Add(&stats
, "persistentcache.blockcachetier.cache_hits",
154 Add(&stats
, "persistentcache.blockcachetier.cache_misses",
155 stats_
.cache_misses_
);
156 Add(&stats
, "persistentcache.blockcachetier.cache_errors",
157 stats_
.cache_errors_
);
158 Add(&stats
, "persistentcache.blockcachetier.cache_hits_pct",
159 stats_
.CacheHitPct());
160 Add(&stats
, "persistentcache.blockcachetier.cache_misses_pct",
161 stats_
.CacheMissPct());
162 Add(&stats
, "persistentcache.blockcachetier.read_hit_latency",
163 stats_
.read_hit_latency_
.Average());
164 Add(&stats
, "persistentcache.blockcachetier.read_miss_latency",
165 stats_
.read_miss_latency_
.Average());
166 Add(&stats
, "persistentcache.blockcachetier.write_latency",
167 stats_
.write_latency_
.Average());
169 auto out
= PersistentCacheTier::Stats();
170 out
.push_back(stats
);
174 Status
BlockCacheTier::Insert(const Slice
& key
, const char* data
,
177 stats_
.bytes_pipelined_
.Add(size
);
179 if (opt_
.pipeline_writes
) {
180 // off load the write to the write thread
182 InsertOp(key
.ToString(), std::move(std::string(data
, size
))));
186 assert(!opt_
.pipeline_writes
);
187 return InsertImpl(key
, Slice(data
, size
));
190 void BlockCacheTier::InsertMain() {
192 InsertOp
op(insert_ops_
.Pop());
195 // that is a secret signal to exit
201 while ((s
= InsertImpl(Slice(op
.key_
), Slice(op
.data_
))).IsTryAgain()) {
202 if (retry
> kMaxRetry
) {
206 // this can happen when the buffers are full, we wait till some buffers
207 // are free. Why don't we wait inside the code. This is because we want
208 // to support both pipelined and non-pipelined mode
209 buffer_allocator_
.WaitUntilUsable();
214 stats_
.insert_dropped_
++;
219 Status
BlockCacheTier::InsertImpl(const Slice
& key
, const Slice
& data
) {
225 StopWatchNano
timer(opt_
.env
, /*auto_start=*/ true);
230 if (metadata_
.Lookup(key
, &lba
)) {
231 // the key already exists, this is duplicate insert
235 while (!cache_file_
->Append(key
, data
, &lba
)) {
236 if (!cache_file_
->Eof()) {
237 ROCKS_LOG_DEBUG(opt_
.log
, "Error inserting to cache file %d",
238 cache_file_
->cacheid());
239 stats_
.write_latency_
.Add(timer
.ElapsedNanos() / 1000);
240 return Status::TryAgain();
243 assert(cache_file_
->Eof());
244 Status status
= NewCacheFile();
250 // Insert into lookup index
251 BlockInfo
* info
= metadata_
.Insert(key
, lba
);
254 return Status::IOError("Unexpected error inserting to index");
257 // insert to cache file reverse mapping
258 cache_file_
->Add(info
);
261 stats_
.bytes_written_
.Add(data
.size());
262 stats_
.write_latency_
.Add(timer
.ElapsedNanos() / 1000);
266 Status
BlockCacheTier::Lookup(const Slice
& key
, std::unique_ptr
<char[]>* val
,
268 StopWatchNano
timer(opt_
.env
, /*auto_start=*/ true);
272 status
= metadata_
.Lookup(key
, &lba
);
274 stats_
.cache_misses_
++;
275 stats_
.read_miss_latency_
.Add(timer
.ElapsedNanos() / 1000);
276 return Status::NotFound("blockcache: key not found");
279 BlockCacheFile
* const file
= metadata_
.Lookup(lba
.cache_id_
);
281 // this can happen because the block index and cache file index are
282 // different, and the cache file might be removed between the two lookups
283 stats_
.cache_misses_
++;
284 stats_
.read_miss_latency_
.Add(timer
.ElapsedNanos() / 1000);
285 return Status::NotFound("blockcache: cache file not found");
290 std::unique_ptr
<char[]> scratch(new char[lba
.size_
]);
294 status
= file
->Read(lba
, &blk_key
, &blk_val
, scratch
.get());
297 stats_
.cache_misses_
++;
298 stats_
.cache_errors_
++;
299 stats_
.read_miss_latency_
.Add(timer
.ElapsedNanos() / 1000);
300 return Status::NotFound("blockcache: error reading data");
303 assert(blk_key
== key
);
305 val
->reset(new char[blk_val
.size()]);
306 memcpy(val
->get(), blk_val
.data(), blk_val
.size());
307 *size
= blk_val
.size();
309 stats_
.bytes_read_
.Add(*size
);
310 stats_
.cache_hits_
++;
311 stats_
.read_hit_latency_
.Add(timer
.ElapsedNanos() / 1000);
316 bool BlockCacheTier::Erase(const Slice
& key
) {
318 BlockInfo
* info
= metadata_
.Remove(key
);
324 Status
BlockCacheTier::NewCacheFile() {
327 TEST_SYNC_POINT_CALLBACK("BlockCacheTier::NewCacheFile:DeleteDir",
328 (void*)(GetCachePath().c_str()));
330 std::unique_ptr
<WriteableCacheFile
> f(
331 new WriteableCacheFile(opt_
.env
, &buffer_allocator_
, &writer_
,
332 GetCachePath(), writer_cache_id_
,
333 opt_
.cache_file_size
, opt_
.log
));
335 bool status
= f
->Create(opt_
.enable_direct_writes
, opt_
.enable_direct_reads
);
337 return Status::IOError("Error creating file");
340 Info(opt_
.log
, "Created cache file %d", writer_cache_id_
);
343 cache_file_
= f
.release();
345 // insert to cache files tree
346 status
= metadata_
.Insert(cache_file_
);
349 Error(opt_
.log
, "Error inserting to metadata");
350 return Status::IOError("Error inserting to metadata");
356 bool BlockCacheTier::Reserve(const size_t size
) {
358 assert(size_
<= opt_
.cache_size
);
360 if (size
+ size_
<= opt_
.cache_size
) {
361 // there is enough space to write
366 assert(size
+ size_
>= opt_
.cache_size
);
367 // there is not enough space to fit the requested data
368 // we can clear some space by evicting cold data
370 const double retain_fac
= (100 - kEvictPct
) / static_cast<double>(100);
371 while (size
+ size_
> opt_
.cache_size
* retain_fac
) {
372 std::unique_ptr
<BlockCacheFile
> f(metadata_
.Evict());
374 // nothing is evictable
379 if (!f
->Delete(&file_size
).ok()) {
380 // unable to delete file
384 assert(file_size
<= size_
);
389 assert(size_
<= opt_
.cache_size
* 0.9);
393 Status
NewPersistentCache(Env
* const env
, const std::string
& path
,
395 const std::shared_ptr
<Logger
>& log
,
396 const bool optimized_for_nvm
,
397 std::shared_ptr
<PersistentCache
>* cache
) {
399 return Status::IOError("invalid argument cache");
402 auto opt
= PersistentCacheConfig(env
, path
, size
, log
);
403 if (optimized_for_nvm
) {
404 // the default settings are optimized for SSD
405 // NVM devices are better accessed with 4K direct IO and written with
407 opt
.enable_direct_writes
= true;
408 opt
.writer_qdepth
= 4;
409 opt
.writer_dispatch_size
= 4 * 1024;
412 auto pcache
= std::make_shared
<BlockCacheTier
>(opt
);
413 Status s
= pcache
->Open();
423 } // namespace ROCKSDB_NAMESPACE
425 #endif // ifndef ROCKSDB_LITE