]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/persistent_cache/block_cache_tier.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / utilities / persistent_cache / block_cache_tier.cc
CommitLineData
7c673cae 1// Copyright (c) 2013, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5#ifndef ROCKSDB_LITE
6
7#include "utilities/persistent_cache/block_cache_tier.h"
8
9#include <regex>
10#include <utility>
11#include <vector>
12
13#include "port/port.h"
14#include "util/logging.h"
15#include "util/stop_watch.h"
16#include "util/sync_point.h"
17#include "utilities/persistent_cache/block_cache_tier_file.h"
18
19namespace rocksdb {
20
21//
22// BlockCacheImpl
23//
24Status BlockCacheTier::Open() {
25 Status status;
26
27 WriteLock _(&lock_);
28
29 assert(!size_);
30
31 // Check the validity of the options
32 status = opt_.ValidateSettings();
33 assert(status.ok());
34 if (!status.ok()) {
35 Error(opt_.log, "Invalid block cache options");
36 return status;
37 }
38
39 // Create base directory or cleanup existing directory
40 status = opt_.env->CreateDirIfMissing(opt_.path);
41 if (!status.ok()) {
42 Error(opt_.log, "Error creating directory %s. %s", opt_.path.c_str(),
43 status.ToString().c_str());
44 return status;
45 }
46
47 // Create base/<cache dir> directory
48 status = opt_.env->CreateDir(GetCachePath());
49 if (!status.ok()) {
50 // directory already exists, clean it up
51 status = CleanupCacheFolder(GetCachePath());
52 assert(status.ok());
53 if (!status.ok()) {
54 Error(opt_.log, "Error creating directory %s. %s", opt_.path.c_str(),
55 status.ToString().c_str());
56 return status;
57 }
58 }
59
60 // create a new file
61 assert(!cache_file_);
62 status = NewCacheFile();
63 if (!status.ok()) {
64 Error(opt_.log, "Error creating new file %s. %s", opt_.path.c_str(),
65 status.ToString().c_str());
66 return status;
67 }
68
69 assert(cache_file_);
70
71 if (opt_.pipeline_writes) {
72 assert(!insert_th_.joinable());
73 insert_th_ = port::Thread(&BlockCacheTier::InsertMain, this);
74 }
75
76 return Status::OK();
77}
78
79bool IsCacheFile(const std::string& file) {
80 // check if the file has .rc suffix
81 // Unfortunately regex support across compilers is not even, so we use simple
82 // string parsing
83 size_t pos = file.find(".");
84 if (pos == std::string::npos) {
85 return false;
86 }
87
88 std::string suffix = file.substr(pos);
89 return suffix == ".rc";
90}
91
92Status BlockCacheTier::CleanupCacheFolder(const std::string& folder) {
93 std::vector<std::string> files;
94 Status status = opt_.env->GetChildren(folder, &files);
95 if (!status.ok()) {
96 Error(opt_.log, "Error getting files for %s. %s", folder.c_str(),
97 status.ToString().c_str());
98 return status;
99 }
100
101 // cleanup files with the patter :digi:.rc
102 for (auto file : files) {
103 if (IsCacheFile(file)) {
104 // cache file
105 Info(opt_.log, "Removing file %s.", file.c_str());
106 status = opt_.env->DeleteFile(folder + "/" + file);
107 if (!status.ok()) {
108 Error(opt_.log, "Error deleting file %s. %s", file.c_str(),
109 status.ToString().c_str());
110 return status;
111 }
112 } else {
113 ROCKS_LOG_DEBUG(opt_.log, "Skipping file %s", file.c_str());
114 }
115 }
116 return Status::OK();
117}
118
119Status BlockCacheTier::Close() {
120 // stop the insert thread
121 if (opt_.pipeline_writes && insert_th_.joinable()) {
122 InsertOp op(/*quit=*/true);
123 insert_ops_.Push(std::move(op));
124 insert_th_.join();
125 }
126
127 // stop the writer before
128 writer_.Stop();
129
130 // clear all metadata
131 WriteLock _(&lock_);
132 metadata_.Clear();
133 return Status::OK();
134}
135
136template<class T>
137void Add(std::map<std::string, double>* stats, const std::string& key,
138 const T& t) {
11fdf7f2 139 stats->insert({key, static_cast<double>(t)});
7c673cae
FG
140}
141
142PersistentCache::StatsType BlockCacheTier::Stats() {
143 std::map<std::string, double> stats;
144 Add(&stats, "persistentcache.blockcachetier.bytes_piplined",
145 stats_.bytes_pipelined_.Average());
146 Add(&stats, "persistentcache.blockcachetier.bytes_written",
147 stats_.bytes_written_.Average());
148 Add(&stats, "persistentcache.blockcachetier.bytes_read",
149 stats_.bytes_read_.Average());
150 Add(&stats, "persistentcache.blockcachetier.insert_dropped",
151 stats_.insert_dropped_);
152 Add(&stats, "persistentcache.blockcachetier.cache_hits",
153 stats_.cache_hits_);
154 Add(&stats, "persistentcache.blockcachetier.cache_misses",
155 stats_.cache_misses_);
156 Add(&stats, "persistentcache.blockcachetier.cache_errors",
157 stats_.cache_errors_);
158 Add(&stats, "persistentcache.blockcachetier.cache_hits_pct",
159 stats_.CacheHitPct());
160 Add(&stats, "persistentcache.blockcachetier.cache_misses_pct",
161 stats_.CacheMissPct());
162 Add(&stats, "persistentcache.blockcachetier.read_hit_latency",
163 stats_.read_hit_latency_.Average());
164 Add(&stats, "persistentcache.blockcachetier.read_miss_latency",
165 stats_.read_miss_latency_.Average());
11fdf7f2 166 Add(&stats, "persistentcache.blockcachetier.write_latency",
7c673cae
FG
167 stats_.write_latency_.Average());
168
169 auto out = PersistentCacheTier::Stats();
170 out.push_back(stats);
171 return out;
172}
173
174Status BlockCacheTier::Insert(const Slice& key, const char* data,
175 const size_t size) {
176 // update stats
177 stats_.bytes_pipelined_.Add(size);
178
179 if (opt_.pipeline_writes) {
180 // off load the write to the write thread
181 insert_ops_.Push(
182 InsertOp(key.ToString(), std::move(std::string(data, size))));
183 return Status::OK();
184 }
185
186 assert(!opt_.pipeline_writes);
187 return InsertImpl(key, Slice(data, size));
188}
189
190void BlockCacheTier::InsertMain() {
191 while (true) {
192 InsertOp op(insert_ops_.Pop());
193
194 if (op.signal_) {
195 // that is a secret signal to exit
196 break;
197 }
198
199 size_t retry = 0;
200 Status s;
201 while ((s = InsertImpl(Slice(op.key_), Slice(op.data_))).IsTryAgain()) {
202 if (retry > kMaxRetry) {
203 break;
204 }
205
206 // this can happen when the buffers are full, we wait till some buffers
207 // are free. Why don't we wait inside the code. This is because we want
208 // to support both pipelined and non-pipelined mode
209 buffer_allocator_.WaitUntilUsable();
210 retry++;
211 }
212
213 if (!s.ok()) {
214 stats_.insert_dropped_++;
215 }
216 }
217}
218
219Status BlockCacheTier::InsertImpl(const Slice& key, const Slice& data) {
220 // pre-condition
221 assert(key.size());
222 assert(data.size());
223 assert(cache_file_);
224
225 StopWatchNano timer(opt_.env, /*auto_start=*/ true);
226
227 WriteLock _(&lock_);
228
229 LBA lba;
230 if (metadata_.Lookup(key, &lba)) {
231 // the key already exists, this is duplicate insert
232 return Status::OK();
233 }
234
235 while (!cache_file_->Append(key, data, &lba)) {
236 if (!cache_file_->Eof()) {
237 ROCKS_LOG_DEBUG(opt_.log, "Error inserting to cache file %d",
238 cache_file_->cacheid());
239 stats_.write_latency_.Add(timer.ElapsedNanos() / 1000);
240 return Status::TryAgain();
241 }
242
243 assert(cache_file_->Eof());
244 Status status = NewCacheFile();
245 if (!status.ok()) {
246 return status;
247 }
248 }
249
250 // Insert into lookup index
251 BlockInfo* info = metadata_.Insert(key, lba);
252 assert(info);
253 if (!info) {
254 return Status::IOError("Unexpected error inserting to index");
255 }
256
257 // insert to cache file reverse mapping
258 cache_file_->Add(info);
259
260 // update stats
261 stats_.bytes_written_.Add(data.size());
262 stats_.write_latency_.Add(timer.ElapsedNanos() / 1000);
263 return Status::OK();
264}
265
494da23a 266Status BlockCacheTier::Lookup(const Slice& key, std::unique_ptr<char[]>* val,
7c673cae
FG
267 size_t* size) {
268 StopWatchNano timer(opt_.env, /*auto_start=*/ true);
269
270 LBA lba;
271 bool status;
272 status = metadata_.Lookup(key, &lba);
273 if (!status) {
274 stats_.cache_misses_++;
275 stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);
276 return Status::NotFound("blockcache: key not found");
277 }
278
279 BlockCacheFile* const file = metadata_.Lookup(lba.cache_id_);
280 if (!file) {
281 // this can happen because the block index and cache file index are
282 // different, and the cache file might be removed between the two lookups
283 stats_.cache_misses_++;
284 stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);
285 return Status::NotFound("blockcache: cache file not found");
286 }
287
288 assert(file->refs_);
289
494da23a 290 std::unique_ptr<char[]> scratch(new char[lba.size_]);
7c673cae
FG
291 Slice blk_key;
292 Slice blk_val;
293
294 status = file->Read(lba, &blk_key, &blk_val, scratch.get());
295 --file->refs_;
296 if (!status) {
297 stats_.cache_misses_++;
298 stats_.cache_errors_++;
299 stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);
300 return Status::NotFound("blockcache: error reading data");
301 }
302
303 assert(blk_key == key);
304
305 val->reset(new char[blk_val.size()]);
306 memcpy(val->get(), blk_val.data(), blk_val.size());
307 *size = blk_val.size();
308
309 stats_.bytes_read_.Add(*size);
310 stats_.cache_hits_++;
311 stats_.read_hit_latency_.Add(timer.ElapsedNanos() / 1000);
312
313 return Status::OK();
314}
315
316bool BlockCacheTier::Erase(const Slice& key) {
317 WriteLock _(&lock_);
318 BlockInfo* info = metadata_.Remove(key);
319 assert(info);
320 delete info;
321 return true;
322}
323
324Status BlockCacheTier::NewCacheFile() {
325 lock_.AssertHeld();
326
327 TEST_SYNC_POINT_CALLBACK("BlockCacheTier::NewCacheFile:DeleteDir",
328 (void*)(GetCachePath().c_str()));
329
330 std::unique_ptr<WriteableCacheFile> f(
331 new WriteableCacheFile(opt_.env, &buffer_allocator_, &writer_,
332 GetCachePath(), writer_cache_id_,
333 opt_.cache_file_size, opt_.log));
334
335 bool status = f->Create(opt_.enable_direct_writes, opt_.enable_direct_reads);
336 if (!status) {
337 return Status::IOError("Error creating file");
338 }
339
340 Info(opt_.log, "Created cache file %d", writer_cache_id_);
341
342 writer_cache_id_++;
343 cache_file_ = f.release();
344
345 // insert to cache files tree
346 status = metadata_.Insert(cache_file_);
347 assert(status);
348 if (!status) {
349 Error(opt_.log, "Error inserting to metadata");
350 return Status::IOError("Error inserting to metadata");
351 }
352
353 return Status::OK();
354}
355
356bool BlockCacheTier::Reserve(const size_t size) {
357 WriteLock _(&lock_);
358 assert(size_ <= opt_.cache_size);
359
360 if (size + size_ <= opt_.cache_size) {
361 // there is enough space to write
362 size_ += size;
363 return true;
364 }
365
366 assert(size + size_ >= opt_.cache_size);
367 // there is not enough space to fit the requested data
368 // we can clear some space by evicting cold data
369
370 const double retain_fac = (100 - kEvictPct) / static_cast<double>(100);
371 while (size + size_ > opt_.cache_size * retain_fac) {
494da23a 372 std::unique_ptr<BlockCacheFile> f(metadata_.Evict());
7c673cae
FG
373 if (!f) {
374 // nothing is evictable
375 return false;
376 }
377 assert(!f->refs_);
378 uint64_t file_size;
379 if (!f->Delete(&file_size).ok()) {
380 // unable to delete file
381 return false;
382 }
383
384 assert(file_size <= size_);
385 size_ -= file_size;
386 }
387
388 size_ += size;
389 assert(size_ <= opt_.cache_size * 0.9);
390 return true;
391}
392
393Status NewPersistentCache(Env* const env, const std::string& path,
394 const uint64_t size,
395 const std::shared_ptr<Logger>& log,
396 const bool optimized_for_nvm,
397 std::shared_ptr<PersistentCache>* cache) {
398 if (!cache) {
399 return Status::IOError("invalid argument cache");
400 }
401
402 auto opt = PersistentCacheConfig(env, path, size, log);
403 if (optimized_for_nvm) {
404 // the default settings are optimized for SSD
405 // NVM devices are better accessed with 4K direct IO and written with
406 // parallelism
407 opt.enable_direct_writes = true;
408 opt.writer_qdepth = 4;
409 opt.writer_dispatch_size = 4 * 1024;
410 }
411
412 auto pcache = std::make_shared<BlockCacheTier>(opt);
413 Status s = pcache->Open();
414
415 if (!s.ok()) {
416 return s;
417 }
418
419 *cache = pcache;
420 return s;
421}
422
423} // namespace rocksdb
424
425#endif // ifndef ROCKSDB_LITE