]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2013, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | #ifndef ROCKSDB_LITE |
6 | ||
7 | #include "utilities/persistent_cache/block_cache_tier.h" | |
8 | ||
9 | #include <regex> | |
10 | #include <utility> | |
11 | #include <vector> | |
12 | ||
13 | #include "port/port.h" | |
14 | #include "util/logging.h" | |
15 | #include "util/stop_watch.h" | |
16 | #include "util/sync_point.h" | |
17 | #include "utilities/persistent_cache/block_cache_tier_file.h" | |
18 | ||
19 | namespace rocksdb { | |
20 | ||
21 | // | |
22 | // BlockCacheImpl | |
23 | // | |
24 | Status BlockCacheTier::Open() { | |
25 | Status status; | |
26 | ||
27 | WriteLock _(&lock_); | |
28 | ||
29 | assert(!size_); | |
30 | ||
31 | // Check the validity of the options | |
32 | status = opt_.ValidateSettings(); | |
33 | assert(status.ok()); | |
34 | if (!status.ok()) { | |
35 | Error(opt_.log, "Invalid block cache options"); | |
36 | return status; | |
37 | } | |
38 | ||
39 | // Create base directory or cleanup existing directory | |
40 | status = opt_.env->CreateDirIfMissing(opt_.path); | |
41 | if (!status.ok()) { | |
42 | Error(opt_.log, "Error creating directory %s. %s", opt_.path.c_str(), | |
43 | status.ToString().c_str()); | |
44 | return status; | |
45 | } | |
46 | ||
47 | // Create base/<cache dir> directory | |
48 | status = opt_.env->CreateDir(GetCachePath()); | |
49 | if (!status.ok()) { | |
50 | // directory already exists, clean it up | |
51 | status = CleanupCacheFolder(GetCachePath()); | |
52 | assert(status.ok()); | |
53 | if (!status.ok()) { | |
54 | Error(opt_.log, "Error creating directory %s. %s", opt_.path.c_str(), | |
55 | status.ToString().c_str()); | |
56 | return status; | |
57 | } | |
58 | } | |
59 | ||
60 | // create a new file | |
61 | assert(!cache_file_); | |
62 | status = NewCacheFile(); | |
63 | if (!status.ok()) { | |
64 | Error(opt_.log, "Error creating new file %s. %s", opt_.path.c_str(), | |
65 | status.ToString().c_str()); | |
66 | return status; | |
67 | } | |
68 | ||
69 | assert(cache_file_); | |
70 | ||
71 | if (opt_.pipeline_writes) { | |
72 | assert(!insert_th_.joinable()); | |
73 | insert_th_ = port::Thread(&BlockCacheTier::InsertMain, this); | |
74 | } | |
75 | ||
76 | return Status::OK(); | |
77 | } | |
78 | ||
79 | bool IsCacheFile(const std::string& file) { | |
80 | // check if the file has .rc suffix | |
81 | // Unfortunately regex support across compilers is not even, so we use simple | |
82 | // string parsing | |
83 | size_t pos = file.find("."); | |
84 | if (pos == std::string::npos) { | |
85 | return false; | |
86 | } | |
87 | ||
88 | std::string suffix = file.substr(pos); | |
89 | return suffix == ".rc"; | |
90 | } | |
91 | ||
92 | Status BlockCacheTier::CleanupCacheFolder(const std::string& folder) { | |
93 | std::vector<std::string> files; | |
94 | Status status = opt_.env->GetChildren(folder, &files); | |
95 | if (!status.ok()) { | |
96 | Error(opt_.log, "Error getting files for %s. %s", folder.c_str(), | |
97 | status.ToString().c_str()); | |
98 | return status; | |
99 | } | |
100 | ||
101 | // cleanup files with the patter :digi:.rc | |
102 | for (auto file : files) { | |
103 | if (IsCacheFile(file)) { | |
104 | // cache file | |
105 | Info(opt_.log, "Removing file %s.", file.c_str()); | |
106 | status = opt_.env->DeleteFile(folder + "/" + file); | |
107 | if (!status.ok()) { | |
108 | Error(opt_.log, "Error deleting file %s. %s", file.c_str(), | |
109 | status.ToString().c_str()); | |
110 | return status; | |
111 | } | |
112 | } else { | |
113 | ROCKS_LOG_DEBUG(opt_.log, "Skipping file %s", file.c_str()); | |
114 | } | |
115 | } | |
116 | return Status::OK(); | |
117 | } | |
118 | ||
119 | Status BlockCacheTier::Close() { | |
120 | // stop the insert thread | |
121 | if (opt_.pipeline_writes && insert_th_.joinable()) { | |
122 | InsertOp op(/*quit=*/true); | |
123 | insert_ops_.Push(std::move(op)); | |
124 | insert_th_.join(); | |
125 | } | |
126 | ||
127 | // stop the writer before | |
128 | writer_.Stop(); | |
129 | ||
130 | // clear all metadata | |
131 | WriteLock _(&lock_); | |
132 | metadata_.Clear(); | |
133 | return Status::OK(); | |
134 | } | |
135 | ||
136 | template<class T> | |
137 | void Add(std::map<std::string, double>* stats, const std::string& key, | |
138 | const T& t) { | |
11fdf7f2 | 139 | stats->insert({key, static_cast<double>(t)}); |
7c673cae FG |
140 | } |
141 | ||
142 | PersistentCache::StatsType BlockCacheTier::Stats() { | |
143 | std::map<std::string, double> stats; | |
144 | Add(&stats, "persistentcache.blockcachetier.bytes_piplined", | |
145 | stats_.bytes_pipelined_.Average()); | |
146 | Add(&stats, "persistentcache.blockcachetier.bytes_written", | |
147 | stats_.bytes_written_.Average()); | |
148 | Add(&stats, "persistentcache.blockcachetier.bytes_read", | |
149 | stats_.bytes_read_.Average()); | |
150 | Add(&stats, "persistentcache.blockcachetier.insert_dropped", | |
151 | stats_.insert_dropped_); | |
152 | Add(&stats, "persistentcache.blockcachetier.cache_hits", | |
153 | stats_.cache_hits_); | |
154 | Add(&stats, "persistentcache.blockcachetier.cache_misses", | |
155 | stats_.cache_misses_); | |
156 | Add(&stats, "persistentcache.blockcachetier.cache_errors", | |
157 | stats_.cache_errors_); | |
158 | Add(&stats, "persistentcache.blockcachetier.cache_hits_pct", | |
159 | stats_.CacheHitPct()); | |
160 | Add(&stats, "persistentcache.blockcachetier.cache_misses_pct", | |
161 | stats_.CacheMissPct()); | |
162 | Add(&stats, "persistentcache.blockcachetier.read_hit_latency", | |
163 | stats_.read_hit_latency_.Average()); | |
164 | Add(&stats, "persistentcache.blockcachetier.read_miss_latency", | |
165 | stats_.read_miss_latency_.Average()); | |
11fdf7f2 | 166 | Add(&stats, "persistentcache.blockcachetier.write_latency", |
7c673cae FG |
167 | stats_.write_latency_.Average()); |
168 | ||
169 | auto out = PersistentCacheTier::Stats(); | |
170 | out.push_back(stats); | |
171 | return out; | |
172 | } | |
173 | ||
174 | Status BlockCacheTier::Insert(const Slice& key, const char* data, | |
175 | const size_t size) { | |
176 | // update stats | |
177 | stats_.bytes_pipelined_.Add(size); | |
178 | ||
179 | if (opt_.pipeline_writes) { | |
180 | // off load the write to the write thread | |
181 | insert_ops_.Push( | |
182 | InsertOp(key.ToString(), std::move(std::string(data, size)))); | |
183 | return Status::OK(); | |
184 | } | |
185 | ||
186 | assert(!opt_.pipeline_writes); | |
187 | return InsertImpl(key, Slice(data, size)); | |
188 | } | |
189 | ||
190 | void BlockCacheTier::InsertMain() { | |
191 | while (true) { | |
192 | InsertOp op(insert_ops_.Pop()); | |
193 | ||
194 | if (op.signal_) { | |
195 | // that is a secret signal to exit | |
196 | break; | |
197 | } | |
198 | ||
199 | size_t retry = 0; | |
200 | Status s; | |
201 | while ((s = InsertImpl(Slice(op.key_), Slice(op.data_))).IsTryAgain()) { | |
202 | if (retry > kMaxRetry) { | |
203 | break; | |
204 | } | |
205 | ||
206 | // this can happen when the buffers are full, we wait till some buffers | |
207 | // are free. Why don't we wait inside the code. This is because we want | |
208 | // to support both pipelined and non-pipelined mode | |
209 | buffer_allocator_.WaitUntilUsable(); | |
210 | retry++; | |
211 | } | |
212 | ||
213 | if (!s.ok()) { | |
214 | stats_.insert_dropped_++; | |
215 | } | |
216 | } | |
217 | } | |
218 | ||
219 | Status BlockCacheTier::InsertImpl(const Slice& key, const Slice& data) { | |
220 | // pre-condition | |
221 | assert(key.size()); | |
222 | assert(data.size()); | |
223 | assert(cache_file_); | |
224 | ||
225 | StopWatchNano timer(opt_.env, /*auto_start=*/ true); | |
226 | ||
227 | WriteLock _(&lock_); | |
228 | ||
229 | LBA lba; | |
230 | if (metadata_.Lookup(key, &lba)) { | |
231 | // the key already exists, this is duplicate insert | |
232 | return Status::OK(); | |
233 | } | |
234 | ||
235 | while (!cache_file_->Append(key, data, &lba)) { | |
236 | if (!cache_file_->Eof()) { | |
237 | ROCKS_LOG_DEBUG(opt_.log, "Error inserting to cache file %d", | |
238 | cache_file_->cacheid()); | |
239 | stats_.write_latency_.Add(timer.ElapsedNanos() / 1000); | |
240 | return Status::TryAgain(); | |
241 | } | |
242 | ||
243 | assert(cache_file_->Eof()); | |
244 | Status status = NewCacheFile(); | |
245 | if (!status.ok()) { | |
246 | return status; | |
247 | } | |
248 | } | |
249 | ||
250 | // Insert into lookup index | |
251 | BlockInfo* info = metadata_.Insert(key, lba); | |
252 | assert(info); | |
253 | if (!info) { | |
254 | return Status::IOError("Unexpected error inserting to index"); | |
255 | } | |
256 | ||
257 | // insert to cache file reverse mapping | |
258 | cache_file_->Add(info); | |
259 | ||
260 | // update stats | |
261 | stats_.bytes_written_.Add(data.size()); | |
262 | stats_.write_latency_.Add(timer.ElapsedNanos() / 1000); | |
263 | return Status::OK(); | |
264 | } | |
265 | ||
494da23a | 266 | Status BlockCacheTier::Lookup(const Slice& key, std::unique_ptr<char[]>* val, |
7c673cae FG |
267 | size_t* size) { |
268 | StopWatchNano timer(opt_.env, /*auto_start=*/ true); | |
269 | ||
270 | LBA lba; | |
271 | bool status; | |
272 | status = metadata_.Lookup(key, &lba); | |
273 | if (!status) { | |
274 | stats_.cache_misses_++; | |
275 | stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000); | |
276 | return Status::NotFound("blockcache: key not found"); | |
277 | } | |
278 | ||
279 | BlockCacheFile* const file = metadata_.Lookup(lba.cache_id_); | |
280 | if (!file) { | |
281 | // this can happen because the block index and cache file index are | |
282 | // different, and the cache file might be removed between the two lookups | |
283 | stats_.cache_misses_++; | |
284 | stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000); | |
285 | return Status::NotFound("blockcache: cache file not found"); | |
286 | } | |
287 | ||
288 | assert(file->refs_); | |
289 | ||
494da23a | 290 | std::unique_ptr<char[]> scratch(new char[lba.size_]); |
7c673cae FG |
291 | Slice blk_key; |
292 | Slice blk_val; | |
293 | ||
294 | status = file->Read(lba, &blk_key, &blk_val, scratch.get()); | |
295 | --file->refs_; | |
296 | if (!status) { | |
297 | stats_.cache_misses_++; | |
298 | stats_.cache_errors_++; | |
299 | stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000); | |
300 | return Status::NotFound("blockcache: error reading data"); | |
301 | } | |
302 | ||
303 | assert(blk_key == key); | |
304 | ||
305 | val->reset(new char[blk_val.size()]); | |
306 | memcpy(val->get(), blk_val.data(), blk_val.size()); | |
307 | *size = blk_val.size(); | |
308 | ||
309 | stats_.bytes_read_.Add(*size); | |
310 | stats_.cache_hits_++; | |
311 | stats_.read_hit_latency_.Add(timer.ElapsedNanos() / 1000); | |
312 | ||
313 | return Status::OK(); | |
314 | } | |
315 | ||
316 | bool BlockCacheTier::Erase(const Slice& key) { | |
317 | WriteLock _(&lock_); | |
318 | BlockInfo* info = metadata_.Remove(key); | |
319 | assert(info); | |
320 | delete info; | |
321 | return true; | |
322 | } | |
323 | ||
324 | Status BlockCacheTier::NewCacheFile() { | |
325 | lock_.AssertHeld(); | |
326 | ||
327 | TEST_SYNC_POINT_CALLBACK("BlockCacheTier::NewCacheFile:DeleteDir", | |
328 | (void*)(GetCachePath().c_str())); | |
329 | ||
330 | std::unique_ptr<WriteableCacheFile> f( | |
331 | new WriteableCacheFile(opt_.env, &buffer_allocator_, &writer_, | |
332 | GetCachePath(), writer_cache_id_, | |
333 | opt_.cache_file_size, opt_.log)); | |
334 | ||
335 | bool status = f->Create(opt_.enable_direct_writes, opt_.enable_direct_reads); | |
336 | if (!status) { | |
337 | return Status::IOError("Error creating file"); | |
338 | } | |
339 | ||
340 | Info(opt_.log, "Created cache file %d", writer_cache_id_); | |
341 | ||
342 | writer_cache_id_++; | |
343 | cache_file_ = f.release(); | |
344 | ||
345 | // insert to cache files tree | |
346 | status = metadata_.Insert(cache_file_); | |
347 | assert(status); | |
348 | if (!status) { | |
349 | Error(opt_.log, "Error inserting to metadata"); | |
350 | return Status::IOError("Error inserting to metadata"); | |
351 | } | |
352 | ||
353 | return Status::OK(); | |
354 | } | |
355 | ||
356 | bool BlockCacheTier::Reserve(const size_t size) { | |
357 | WriteLock _(&lock_); | |
358 | assert(size_ <= opt_.cache_size); | |
359 | ||
360 | if (size + size_ <= opt_.cache_size) { | |
361 | // there is enough space to write | |
362 | size_ += size; | |
363 | return true; | |
364 | } | |
365 | ||
366 | assert(size + size_ >= opt_.cache_size); | |
367 | // there is not enough space to fit the requested data | |
368 | // we can clear some space by evicting cold data | |
369 | ||
370 | const double retain_fac = (100 - kEvictPct) / static_cast<double>(100); | |
371 | while (size + size_ > opt_.cache_size * retain_fac) { | |
494da23a | 372 | std::unique_ptr<BlockCacheFile> f(metadata_.Evict()); |
7c673cae FG |
373 | if (!f) { |
374 | // nothing is evictable | |
375 | return false; | |
376 | } | |
377 | assert(!f->refs_); | |
378 | uint64_t file_size; | |
379 | if (!f->Delete(&file_size).ok()) { | |
380 | // unable to delete file | |
381 | return false; | |
382 | } | |
383 | ||
384 | assert(file_size <= size_); | |
385 | size_ -= file_size; | |
386 | } | |
387 | ||
388 | size_ += size; | |
389 | assert(size_ <= opt_.cache_size * 0.9); | |
390 | return true; | |
391 | } | |
392 | ||
393 | Status NewPersistentCache(Env* const env, const std::string& path, | |
394 | const uint64_t size, | |
395 | const std::shared_ptr<Logger>& log, | |
396 | const bool optimized_for_nvm, | |
397 | std::shared_ptr<PersistentCache>* cache) { | |
398 | if (!cache) { | |
399 | return Status::IOError("invalid argument cache"); | |
400 | } | |
401 | ||
402 | auto opt = PersistentCacheConfig(env, path, size, log); | |
403 | if (optimized_for_nvm) { | |
404 | // the default settings are optimized for SSD | |
405 | // NVM devices are better accessed with 4K direct IO and written with | |
406 | // parallelism | |
407 | opt.enable_direct_writes = true; | |
408 | opt.writer_qdepth = 4; | |
409 | opt.writer_dispatch_size = 4 * 1024; | |
410 | } | |
411 | ||
412 | auto pcache = std::make_shared<BlockCacheTier>(opt); | |
413 | Status s = pcache->Open(); | |
414 | ||
415 | if (!s.ok()) { | |
416 | return s; | |
417 | } | |
418 | ||
419 | *cache = pcache; | |
420 | return s; | |
421 | } | |
422 | ||
423 | } // namespace rocksdb | |
424 | ||
425 | #endif // ifndef ROCKSDB_LITE |