]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/persistent_cache/block_cache_tier_file.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / utilities / persistent_cache / block_cache_tier_file.cc
CommitLineData
7c673cae 1// Copyright (c) 2013, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5#ifndef ROCKSDB_LITE
6
7#include "utilities/persistent_cache/block_cache_tier_file.h"
8
9#ifndef OS_WIN
10#include <unistd.h>
11#endif
12#include <functional>
13#include <memory>
14#include <vector>
15
16#include "port/port.h"
17#include "util/crc32c.h"
18#include "util/logging.h"
19
20namespace rocksdb {
21
22//
23// File creation factories
24//
25Status NewWritableCacheFile(Env* const env, const std::string& filepath,
26 std::unique_ptr<WritableFile>* file,
27 const bool use_direct_writes = false) {
28 EnvOptions opt;
29 opt.use_direct_writes = use_direct_writes;
30 Status s = env->NewWritableFile(filepath, file, opt);
31 return s;
32}
33
34Status NewRandomAccessCacheFile(Env* const env, const std::string& filepath,
35 std::unique_ptr<RandomAccessFile>* file,
36 const bool use_direct_reads = true) {
37 EnvOptions opt;
38 opt.use_direct_reads = use_direct_reads;
39 Status s = env->NewRandomAccessFile(filepath, file, opt);
40 return s;
41}
42
43//
44// BlockCacheFile
45//
46Status BlockCacheFile::Delete(uint64_t* size) {
47 Status status = env_->GetFileSize(Path(), size);
48 if (!status.ok()) {
49 return status;
50 }
51 return env_->DeleteFile(Path());
52}
53
54//
55// CacheRecord
56//
57// Cache record represents the record on disk
58//
59// +--------+---------+----------+------------+---------------+-------------+
60// | magic | crc | key size | value size | key data | value data |
61// +--------+---------+----------+------------+---------------+-------------+
62// <-- 4 --><-- 4 --><-- 4 --><-- 4 --><-- key size --><-- v-size -->
63//
64struct CacheRecordHeader {
11fdf7f2
TL
65 CacheRecordHeader()
66 : magic_(0), crc_(0), key_size_(0), val_size_(0) {}
7c673cae
FG
67 CacheRecordHeader(const uint32_t magic, const uint32_t key_size,
68 const uint32_t val_size)
69 : magic_(magic), crc_(0), key_size_(key_size), val_size_(val_size) {}
70
71 uint32_t magic_;
72 uint32_t crc_;
73 uint32_t key_size_;
74 uint32_t val_size_;
75};
76
77struct CacheRecord {
78 CacheRecord() {}
79 CacheRecord(const Slice& key, const Slice& val)
80 : hdr_(MAGIC, static_cast<uint32_t>(key.size()),
81 static_cast<uint32_t>(val.size())),
82 key_(key),
83 val_(val) {
84 hdr_.crc_ = ComputeCRC();
85 }
86
87 uint32_t ComputeCRC() const;
88 bool Serialize(std::vector<CacheWriteBuffer*>* bufs, size_t* woff);
89 bool Deserialize(const Slice& buf);
90
91 static uint32_t CalcSize(const Slice& key, const Slice& val) {
92 return static_cast<uint32_t>(sizeof(CacheRecordHeader) + key.size() +
93 val.size());
94 }
95
96 static const uint32_t MAGIC = 0xfefa;
97
98 bool Append(std::vector<CacheWriteBuffer*>* bufs, size_t* woff,
99 const char* data, const size_t size);
100
101 CacheRecordHeader hdr_;
102 Slice key_;
103 Slice val_;
104};
105
106static_assert(sizeof(CacheRecordHeader) == 16, "DataHeader is not aligned");
107
108uint32_t CacheRecord::ComputeCRC() const {
109 uint32_t crc = 0;
110 CacheRecordHeader tmp = hdr_;
111 tmp.crc_ = 0;
112 crc = crc32c::Extend(crc, reinterpret_cast<const char*>(&tmp), sizeof(tmp));
113 crc = crc32c::Extend(crc, reinterpret_cast<const char*>(key_.data()),
114 key_.size());
115 crc = crc32c::Extend(crc, reinterpret_cast<const char*>(val_.data()),
116 val_.size());
117 return crc;
118}
119
120bool CacheRecord::Serialize(std::vector<CacheWriteBuffer*>* bufs,
121 size_t* woff) {
122 assert(bufs->size());
123 return Append(bufs, woff, reinterpret_cast<const char*>(&hdr_),
124 sizeof(hdr_)) &&
125 Append(bufs, woff, reinterpret_cast<const char*>(key_.data()),
126 key_.size()) &&
127 Append(bufs, woff, reinterpret_cast<const char*>(val_.data()),
128 val_.size());
129}
130
131bool CacheRecord::Append(std::vector<CacheWriteBuffer*>* bufs, size_t* woff,
132 const char* data, const size_t data_size) {
133 assert(*woff < bufs->size());
134
135 const char* p = data;
136 size_t size = data_size;
137
138 while (size && *woff < bufs->size()) {
139 CacheWriteBuffer* buf = (*bufs)[*woff];
140 const size_t free = buf->Free();
141 if (size <= free) {
142 buf->Append(p, size);
143 size = 0;
144 } else {
145 buf->Append(p, free);
146 p += free;
147 size -= free;
148 assert(!buf->Free());
149 assert(buf->Used() == buf->Capacity());
150 }
151
152 if (!buf->Free()) {
153 *woff += 1;
154 }
155 }
156
157 assert(!size);
158
159 return !size;
160}
161
162bool CacheRecord::Deserialize(const Slice& data) {
163 assert(data.size() >= sizeof(CacheRecordHeader));
164 if (data.size() < sizeof(CacheRecordHeader)) {
165 return false;
166 }
167
168 memcpy(&hdr_, data.data(), sizeof(hdr_));
169
170 assert(hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) == data.size());
171 if (hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) != data.size()) {
172 return false;
173 }
174
175 key_ = Slice(data.data_ + sizeof(hdr_), hdr_.key_size_);
176 val_ = Slice(key_.data_ + hdr_.key_size_, hdr_.val_size_);
177
178 if (!(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_)) {
179 fprintf(stderr, "** magic %d ** \n", hdr_.magic_);
180 fprintf(stderr, "** key_size %d ** \n", hdr_.key_size_);
181 fprintf(stderr, "** val_size %d ** \n", hdr_.val_size_);
182 fprintf(stderr, "** key %s ** \n", key_.ToString().c_str());
183 fprintf(stderr, "** val %s ** \n", val_.ToString().c_str());
184 for (size_t i = 0; i < hdr_.val_size_; ++i) {
185 fprintf(stderr, "%d.", (uint8_t)val_.data()[i]);
186 }
187 fprintf(stderr, "\n** cksum %d != %d **", hdr_.crc_, ComputeCRC());
188 }
189
190 assert(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_);
191 return hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_;
192}
193
194//
195// RandomAccessFile
196//
197
198bool RandomAccessCacheFile::Open(const bool enable_direct_reads) {
199 WriteLock _(&rwlock_);
200 return OpenImpl(enable_direct_reads);
201}
202
203bool RandomAccessCacheFile::OpenImpl(const bool enable_direct_reads) {
204 rwlock_.AssertHeld();
205
206 ROCKS_LOG_DEBUG(log_, "Opening cache file %s", Path().c_str());
207
208 std::unique_ptr<RandomAccessFile> file;
209 Status status =
210 NewRandomAccessCacheFile(env_, Path(), &file, enable_direct_reads);
211 if (!status.ok()) {
212 Error(log_, "Error opening random access file %s. %s", Path().c_str(),
213 status.ToString().c_str());
214 return false;
215 }
11fdf7f2 216 freader_.reset(new RandomAccessFileReader(std::move(file), Path(), env_));
7c673cae
FG
217
218 return true;
219}
220
221bool RandomAccessCacheFile::Read(const LBA& lba, Slice* key, Slice* val,
222 char* scratch) {
223 ReadLock _(&rwlock_);
224
225 assert(lba.cache_id_ == cache_id_);
226
227 if (!freader_) {
228 return false;
229 }
230
231 Slice result;
232 Status s = freader_->Read(lba.off_, lba.size_, &result, scratch);
233 if (!s.ok()) {
234 Error(log_, "Error reading from file %s. %s", Path().c_str(),
235 s.ToString().c_str());
236 return false;
237 }
238
239 assert(result.data() == scratch);
240
241 return ParseRec(lba, key, val, scratch);
242}
243
244bool RandomAccessCacheFile::ParseRec(const LBA& lba, Slice* key, Slice* val,
245 char* scratch) {
246 Slice data(scratch, lba.size_);
247
248 CacheRecord rec;
249 if (!rec.Deserialize(data)) {
250 assert(!"Error deserializing data");
251 Error(log_, "Error de-serializing record from file %s off %d",
252 Path().c_str(), lba.off_);
253 return false;
254 }
255
256 *key = Slice(rec.key_);
257 *val = Slice(rec.val_);
258
259 return true;
260}
261
262//
263// WriteableCacheFile
264//
265
266WriteableCacheFile::~WriteableCacheFile() {
267 WriteLock _(&rwlock_);
268 if (!eof_) {
269 // This file never flushed. We give priority to shutdown since this is a
270 // cache
271 // TODO(krad): Figure a way to flush the pending data
272 if (file_) {
273 assert(refs_ == 1);
274 --refs_;
275 }
276 }
277 assert(!refs_);
278 ClearBuffers();
279}
280
11fdf7f2 281bool WriteableCacheFile::Create(const bool /*enable_direct_writes*/,
7c673cae
FG
282 const bool enable_direct_reads) {
283 WriteLock _(&rwlock_);
284
285 enable_direct_reads_ = enable_direct_reads;
286
287 ROCKS_LOG_DEBUG(log_, "Creating new cache %s (max size is %d B)",
288 Path().c_str(), max_size_);
289
290 Status s = env_->FileExists(Path());
291 if (s.ok()) {
292 ROCKS_LOG_WARN(log_, "File %s already exists. %s", Path().c_str(),
293 s.ToString().c_str());
294 }
295
296 s = NewWritableCacheFile(env_, Path(), &file_);
297 if (!s.ok()) {
298 ROCKS_LOG_WARN(log_, "Unable to create file %s. %s", Path().c_str(),
299 s.ToString().c_str());
300 return false;
301 }
302
303 assert(!refs_);
304 ++refs_;
305
306 return true;
307}
308
309bool WriteableCacheFile::Append(const Slice& key, const Slice& val, LBA* lba) {
310 WriteLock _(&rwlock_);
311
312 if (eof_) {
313 // We can't append since the file is full
314 return false;
315 }
316
317 // estimate the space required to store the (key, val)
318 uint32_t rec_size = CacheRecord::CalcSize(key, val);
319
320 if (!ExpandBuffer(rec_size)) {
321 // unable to expand the buffer
322 ROCKS_LOG_DEBUG(log_, "Error expanding buffers. size=%d", rec_size);
323 return false;
324 }
325
326 lba->cache_id_ = cache_id_;
327 lba->off_ = disk_woff_;
328 lba->size_ = rec_size;
329
330 CacheRecord rec(key, val);
331 if (!rec.Serialize(&bufs_, &buf_woff_)) {
332 // unexpected error: unable to serialize the data
333 assert(!"Error serializing record");
334 return false;
335 }
336
337 disk_woff_ += rec_size;
338 eof_ = disk_woff_ >= max_size_;
339
340 // dispatch buffer for flush
341 DispatchBuffer();
342
343 return true;
344}
345
346bool WriteableCacheFile::ExpandBuffer(const size_t size) {
347 rwlock_.AssertHeld();
348 assert(!eof_);
349
350 // determine if there is enough space
351 size_t free = 0; // compute the free space left in buffer
352 for (size_t i = buf_woff_; i < bufs_.size(); ++i) {
353 free += bufs_[i]->Free();
354 if (size <= free) {
355 // we have enough space in the buffer
356 return true;
357 }
358 }
359
360 // expand the buffer until there is enough space to write `size` bytes
361 assert(free < size);
362 while (free < size) {
363 CacheWriteBuffer* const buf = alloc_->Allocate();
364 if (!buf) {
365 ROCKS_LOG_DEBUG(log_, "Unable to allocate buffers");
366 return false;
367 }
368
369 size_ += static_cast<uint32_t>(buf->Free());
370 free += buf->Free();
371 bufs_.push_back(buf);
372 }
373
374 assert(free >= size);
375 return true;
376}
377
378void WriteableCacheFile::DispatchBuffer() {
379 rwlock_.AssertHeld();
380
381 assert(bufs_.size());
382 assert(buf_doff_ <= buf_woff_);
383 assert(buf_woff_ <= bufs_.size());
384
385 if (pending_ios_) {
386 return;
387 }
388
389 if (!eof_ && buf_doff_ == buf_woff_) {
390 // dispatch buffer is pointing to write buffer and we haven't hit eof
391 return;
392 }
393
394 assert(eof_ || buf_doff_ < buf_woff_);
395 assert(buf_doff_ < bufs_.size());
396 assert(file_);
397
398 auto* buf = bufs_[buf_doff_];
399 const uint64_t file_off = buf_doff_ * alloc_->BufferSize();
400
401 assert(!buf->Free() ||
402 (eof_ && buf_doff_ == buf_woff_ && buf_woff_ < bufs_.size()));
403 // we have reached end of file, and there is space in the last buffer
404 // pad it with zero for direct IO
405 buf->FillTrailingZeros();
406
407 assert(buf->Used() % kFileAlignmentSize == 0);
408
409 writer_->Write(file_.get(), buf, file_off,
410 std::bind(&WriteableCacheFile::BufferWriteDone, this));
411 pending_ios_++;
412 buf_doff_++;
413}
414
415void WriteableCacheFile::BufferWriteDone() {
416 WriteLock _(&rwlock_);
417
418 assert(bufs_.size());
419
420 pending_ios_--;
421
422 if (buf_doff_ < bufs_.size()) {
423 DispatchBuffer();
424 }
425
426 if (eof_ && buf_doff_ >= bufs_.size() && !pending_ios_) {
427 // end-of-file reached, move to read mode
428 CloseAndOpenForReading();
429 }
430}
431
432void WriteableCacheFile::CloseAndOpenForReading() {
433 // Our env abstraction do not allow reading from a file opened for appending
434 // We need close the file and re-open it for reading
435 Close();
436 RandomAccessCacheFile::OpenImpl(enable_direct_reads_);
437}
438
439bool WriteableCacheFile::ReadBuffer(const LBA& lba, Slice* key, Slice* block,
440 char* scratch) {
441 rwlock_.AssertHeld();
442
443 if (!ReadBuffer(lba, scratch)) {
444 Error(log_, "Error reading from buffer. cache=%d off=%d", cache_id_,
445 lba.off_);
446 return false;
447 }
448
449 return ParseRec(lba, key, block, scratch);
450}
451
452bool WriteableCacheFile::ReadBuffer(const LBA& lba, char* data) {
453 rwlock_.AssertHeld();
454
455 assert(lba.off_ < disk_woff_);
456
457 // we read from the buffers like reading from a flat file. The list of buffers
458 // are treated as contiguous stream of data
459
460 char* tmp = data;
461 size_t pending_nbytes = lba.size_;
462 // start buffer
463 size_t start_idx = lba.off_ / alloc_->BufferSize();
464 // offset into the start buffer
465 size_t start_off = lba.off_ % alloc_->BufferSize();
466
467 assert(start_idx <= buf_woff_);
468
469 for (size_t i = start_idx; pending_nbytes && i < bufs_.size(); ++i) {
470 assert(i <= buf_woff_);
471 auto* buf = bufs_[i];
472 assert(i == buf_woff_ || !buf->Free());
473 // bytes to write to the buffer
474 size_t nbytes = pending_nbytes > (buf->Used() - start_off)
475 ? (buf->Used() - start_off)
476 : pending_nbytes;
477 memcpy(tmp, buf->Data() + start_off, nbytes);
478
479 // left over to be written
480 pending_nbytes -= nbytes;
481 start_off = 0;
482 tmp += nbytes;
483 }
484
485 assert(!pending_nbytes);
486 if (pending_nbytes) {
487 return false;
488 }
489
490 assert(tmp == data + lba.size_);
491 return true;
492}
493
494void WriteableCacheFile::Close() {
495 rwlock_.AssertHeld();
496
497 assert(size_ >= max_size_);
498 assert(disk_woff_ >= max_size_);
499 assert(buf_doff_ == bufs_.size());
500 assert(bufs_.size() - buf_woff_ <= 1);
501 assert(!pending_ios_);
502
503 Info(log_, "Closing file %s. size=%d written=%d", Path().c_str(), size_,
504 disk_woff_);
505
506 ClearBuffers();
507 file_.reset();
508
509 assert(refs_);
510 --refs_;
511}
512
513void WriteableCacheFile::ClearBuffers() {
514 for (size_t i = 0; i < bufs_.size(); ++i) {
515 alloc_->Deallocate(bufs_[i]);
516 }
517
518 bufs_.clear();
519}
520
521//
522// ThreadedFileWriter implementation
523//
524ThreadedWriter::ThreadedWriter(PersistentCacheTier* const cache,
525 const size_t qdepth, const size_t io_size)
526 : Writer(cache), io_size_(io_size) {
527 for (size_t i = 0; i < qdepth; ++i) {
528 port::Thread th(&ThreadedWriter::ThreadMain, this);
529 threads_.push_back(std::move(th));
530 }
531}
532
533void ThreadedWriter::Stop() {
534 // notify all threads to exit
535 for (size_t i = 0; i < threads_.size(); ++i) {
536 q_.Push(IO(/*signal=*/true));
537 }
538
539 // wait for all threads to exit
540 for (auto& th : threads_) {
541 th.join();
542 assert(!th.joinable());
543 }
544 threads_.clear();
545}
546
547void ThreadedWriter::Write(WritableFile* const file, CacheWriteBuffer* buf,
548 const uint64_t file_off,
549 const std::function<void()> callback) {
550 q_.Push(IO(file, buf, file_off, callback));
551}
552
553void ThreadedWriter::ThreadMain() {
554 while (true) {
555 // Fetch the IO to process
556 IO io(q_.Pop());
557 if (io.signal_) {
558 // that's secret signal to exit
559 break;
560 }
561
562 // Reserve space for writing the buffer
563 while (!cache_->Reserve(io.buf_->Used())) {
564 // We can fail to reserve space if every file in the system
565 // is being currently accessed
566 /* sleep override */
567 Env::Default()->SleepForMicroseconds(1000000);
568 }
569
570 DispatchIO(io);
571
572 io.callback_();
573 }
574}
575
576void ThreadedWriter::DispatchIO(const IO& io) {
577 size_t written = 0;
578 while (written < io.buf_->Used()) {
579 Slice data(io.buf_->Data() + written, io_size_);
580 Status s = io.file_->Append(data);
581 assert(s.ok());
582 if (!s.ok()) {
583 // That is definite IO error to device. There is not much we can
584 // do but ignore the failure. This can lead to corruption of data on
585 // disk, but the cache will skip while reading
586 fprintf(stderr, "Error writing data to file. %s\n", s.ToString().c_str());
587 }
588 written += io_size_;
589 }
590}
591
592} // namespace rocksdb
593
594#endif