]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2013, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | #ifndef ROCKSDB_LITE |
6 | ||
7 | #include "utilities/persistent_cache/block_cache_tier_file.h" | |
8 | ||
9 | #ifndef OS_WIN | |
10 | #include <unistd.h> | |
11 | #endif | |
12 | #include <functional> | |
13 | #include <memory> | |
14 | #include <vector> | |
15 | ||
16 | #include "port/port.h" | |
17 | #include "util/crc32c.h" | |
18 | #include "util/logging.h" | |
19 | ||
20 | namespace rocksdb { | |
21 | ||
22 | // | |
23 | // File creation factories | |
24 | // | |
25 | Status NewWritableCacheFile(Env* const env, const std::string& filepath, | |
26 | std::unique_ptr<WritableFile>* file, | |
27 | const bool use_direct_writes = false) { | |
28 | EnvOptions opt; | |
29 | opt.use_direct_writes = use_direct_writes; | |
30 | Status s = env->NewWritableFile(filepath, file, opt); | |
31 | return s; | |
32 | } | |
33 | ||
34 | Status NewRandomAccessCacheFile(Env* const env, const std::string& filepath, | |
35 | std::unique_ptr<RandomAccessFile>* file, | |
36 | const bool use_direct_reads = true) { | |
37 | EnvOptions opt; | |
38 | opt.use_direct_reads = use_direct_reads; | |
39 | Status s = env->NewRandomAccessFile(filepath, file, opt); | |
40 | return s; | |
41 | } | |
42 | ||
43 | // | |
44 | // BlockCacheFile | |
45 | // | |
46 | Status BlockCacheFile::Delete(uint64_t* size) { | |
47 | Status status = env_->GetFileSize(Path(), size); | |
48 | if (!status.ok()) { | |
49 | return status; | |
50 | } | |
51 | return env_->DeleteFile(Path()); | |
52 | } | |
53 | ||
54 | // | |
55 | // CacheRecord | |
56 | // | |
57 | // Cache record represents the record on disk | |
58 | // | |
59 | // +--------+---------+----------+------------+---------------+-------------+ | |
60 | // | magic | crc | key size | value size | key data | value data | | |
61 | // +--------+---------+----------+------------+---------------+-------------+ | |
62 | // <-- 4 --><-- 4 --><-- 4 --><-- 4 --><-- key size --><-- v-size --> | |
63 | // | |
64 | struct CacheRecordHeader { | |
11fdf7f2 TL |
65 | CacheRecordHeader() |
66 | : magic_(0), crc_(0), key_size_(0), val_size_(0) {} | |
7c673cae FG |
67 | CacheRecordHeader(const uint32_t magic, const uint32_t key_size, |
68 | const uint32_t val_size) | |
69 | : magic_(magic), crc_(0), key_size_(key_size), val_size_(val_size) {} | |
70 | ||
71 | uint32_t magic_; | |
72 | uint32_t crc_; | |
73 | uint32_t key_size_; | |
74 | uint32_t val_size_; | |
75 | }; | |
76 | ||
77 | struct CacheRecord { | |
78 | CacheRecord() {} | |
79 | CacheRecord(const Slice& key, const Slice& val) | |
80 | : hdr_(MAGIC, static_cast<uint32_t>(key.size()), | |
81 | static_cast<uint32_t>(val.size())), | |
82 | key_(key), | |
83 | val_(val) { | |
84 | hdr_.crc_ = ComputeCRC(); | |
85 | } | |
86 | ||
87 | uint32_t ComputeCRC() const; | |
88 | bool Serialize(std::vector<CacheWriteBuffer*>* bufs, size_t* woff); | |
89 | bool Deserialize(const Slice& buf); | |
90 | ||
91 | static uint32_t CalcSize(const Slice& key, const Slice& val) { | |
92 | return static_cast<uint32_t>(sizeof(CacheRecordHeader) + key.size() + | |
93 | val.size()); | |
94 | } | |
95 | ||
96 | static const uint32_t MAGIC = 0xfefa; | |
97 | ||
98 | bool Append(std::vector<CacheWriteBuffer*>* bufs, size_t* woff, | |
99 | const char* data, const size_t size); | |
100 | ||
101 | CacheRecordHeader hdr_; | |
102 | Slice key_; | |
103 | Slice val_; | |
104 | }; | |
105 | ||
106 | static_assert(sizeof(CacheRecordHeader) == 16, "DataHeader is not aligned"); | |
107 | ||
108 | uint32_t CacheRecord::ComputeCRC() const { | |
109 | uint32_t crc = 0; | |
110 | CacheRecordHeader tmp = hdr_; | |
111 | tmp.crc_ = 0; | |
112 | crc = crc32c::Extend(crc, reinterpret_cast<const char*>(&tmp), sizeof(tmp)); | |
113 | crc = crc32c::Extend(crc, reinterpret_cast<const char*>(key_.data()), | |
114 | key_.size()); | |
115 | crc = crc32c::Extend(crc, reinterpret_cast<const char*>(val_.data()), | |
116 | val_.size()); | |
117 | return crc; | |
118 | } | |
119 | ||
120 | bool CacheRecord::Serialize(std::vector<CacheWriteBuffer*>* bufs, | |
121 | size_t* woff) { | |
122 | assert(bufs->size()); | |
123 | return Append(bufs, woff, reinterpret_cast<const char*>(&hdr_), | |
124 | sizeof(hdr_)) && | |
125 | Append(bufs, woff, reinterpret_cast<const char*>(key_.data()), | |
126 | key_.size()) && | |
127 | Append(bufs, woff, reinterpret_cast<const char*>(val_.data()), | |
128 | val_.size()); | |
129 | } | |
130 | ||
131 | bool CacheRecord::Append(std::vector<CacheWriteBuffer*>* bufs, size_t* woff, | |
132 | const char* data, const size_t data_size) { | |
133 | assert(*woff < bufs->size()); | |
134 | ||
135 | const char* p = data; | |
136 | size_t size = data_size; | |
137 | ||
138 | while (size && *woff < bufs->size()) { | |
139 | CacheWriteBuffer* buf = (*bufs)[*woff]; | |
140 | const size_t free = buf->Free(); | |
141 | if (size <= free) { | |
142 | buf->Append(p, size); | |
143 | size = 0; | |
144 | } else { | |
145 | buf->Append(p, free); | |
146 | p += free; | |
147 | size -= free; | |
148 | assert(!buf->Free()); | |
149 | assert(buf->Used() == buf->Capacity()); | |
150 | } | |
151 | ||
152 | if (!buf->Free()) { | |
153 | *woff += 1; | |
154 | } | |
155 | } | |
156 | ||
157 | assert(!size); | |
158 | ||
159 | return !size; | |
160 | } | |
161 | ||
162 | bool CacheRecord::Deserialize(const Slice& data) { | |
163 | assert(data.size() >= sizeof(CacheRecordHeader)); | |
164 | if (data.size() < sizeof(CacheRecordHeader)) { | |
165 | return false; | |
166 | } | |
167 | ||
168 | memcpy(&hdr_, data.data(), sizeof(hdr_)); | |
169 | ||
170 | assert(hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) == data.size()); | |
171 | if (hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) != data.size()) { | |
172 | return false; | |
173 | } | |
174 | ||
175 | key_ = Slice(data.data_ + sizeof(hdr_), hdr_.key_size_); | |
176 | val_ = Slice(key_.data_ + hdr_.key_size_, hdr_.val_size_); | |
177 | ||
178 | if (!(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_)) { | |
179 | fprintf(stderr, "** magic %d ** \n", hdr_.magic_); | |
180 | fprintf(stderr, "** key_size %d ** \n", hdr_.key_size_); | |
181 | fprintf(stderr, "** val_size %d ** \n", hdr_.val_size_); | |
182 | fprintf(stderr, "** key %s ** \n", key_.ToString().c_str()); | |
183 | fprintf(stderr, "** val %s ** \n", val_.ToString().c_str()); | |
184 | for (size_t i = 0; i < hdr_.val_size_; ++i) { | |
185 | fprintf(stderr, "%d.", (uint8_t)val_.data()[i]); | |
186 | } | |
187 | fprintf(stderr, "\n** cksum %d != %d **", hdr_.crc_, ComputeCRC()); | |
188 | } | |
189 | ||
190 | assert(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_); | |
191 | return hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_; | |
192 | } | |
193 | ||
194 | // | |
195 | // RandomAccessFile | |
196 | // | |
197 | ||
198 | bool RandomAccessCacheFile::Open(const bool enable_direct_reads) { | |
199 | WriteLock _(&rwlock_); | |
200 | return OpenImpl(enable_direct_reads); | |
201 | } | |
202 | ||
203 | bool RandomAccessCacheFile::OpenImpl(const bool enable_direct_reads) { | |
204 | rwlock_.AssertHeld(); | |
205 | ||
206 | ROCKS_LOG_DEBUG(log_, "Opening cache file %s", Path().c_str()); | |
207 | ||
208 | std::unique_ptr<RandomAccessFile> file; | |
209 | Status status = | |
210 | NewRandomAccessCacheFile(env_, Path(), &file, enable_direct_reads); | |
211 | if (!status.ok()) { | |
212 | Error(log_, "Error opening random access file %s. %s", Path().c_str(), | |
213 | status.ToString().c_str()); | |
214 | return false; | |
215 | } | |
11fdf7f2 | 216 | freader_.reset(new RandomAccessFileReader(std::move(file), Path(), env_)); |
7c673cae FG |
217 | |
218 | return true; | |
219 | } | |
220 | ||
221 | bool RandomAccessCacheFile::Read(const LBA& lba, Slice* key, Slice* val, | |
222 | char* scratch) { | |
223 | ReadLock _(&rwlock_); | |
224 | ||
225 | assert(lba.cache_id_ == cache_id_); | |
226 | ||
227 | if (!freader_) { | |
228 | return false; | |
229 | } | |
230 | ||
231 | Slice result; | |
232 | Status s = freader_->Read(lba.off_, lba.size_, &result, scratch); | |
233 | if (!s.ok()) { | |
234 | Error(log_, "Error reading from file %s. %s", Path().c_str(), | |
235 | s.ToString().c_str()); | |
236 | return false; | |
237 | } | |
238 | ||
239 | assert(result.data() == scratch); | |
240 | ||
241 | return ParseRec(lba, key, val, scratch); | |
242 | } | |
243 | ||
244 | bool RandomAccessCacheFile::ParseRec(const LBA& lba, Slice* key, Slice* val, | |
245 | char* scratch) { | |
246 | Slice data(scratch, lba.size_); | |
247 | ||
248 | CacheRecord rec; | |
249 | if (!rec.Deserialize(data)) { | |
250 | assert(!"Error deserializing data"); | |
251 | Error(log_, "Error de-serializing record from file %s off %d", | |
252 | Path().c_str(), lba.off_); | |
253 | return false; | |
254 | } | |
255 | ||
256 | *key = Slice(rec.key_); | |
257 | *val = Slice(rec.val_); | |
258 | ||
259 | return true; | |
260 | } | |
261 | ||
262 | // | |
263 | // WriteableCacheFile | |
264 | // | |
265 | ||
266 | WriteableCacheFile::~WriteableCacheFile() { | |
267 | WriteLock _(&rwlock_); | |
268 | if (!eof_) { | |
269 | // This file never flushed. We give priority to shutdown since this is a | |
270 | // cache | |
271 | // TODO(krad): Figure a way to flush the pending data | |
272 | if (file_) { | |
273 | assert(refs_ == 1); | |
274 | --refs_; | |
275 | } | |
276 | } | |
277 | assert(!refs_); | |
278 | ClearBuffers(); | |
279 | } | |
280 | ||
11fdf7f2 | 281 | bool WriteableCacheFile::Create(const bool /*enable_direct_writes*/, |
7c673cae FG |
282 | const bool enable_direct_reads) { |
283 | WriteLock _(&rwlock_); | |
284 | ||
285 | enable_direct_reads_ = enable_direct_reads; | |
286 | ||
287 | ROCKS_LOG_DEBUG(log_, "Creating new cache %s (max size is %d B)", | |
288 | Path().c_str(), max_size_); | |
289 | ||
290 | Status s = env_->FileExists(Path()); | |
291 | if (s.ok()) { | |
292 | ROCKS_LOG_WARN(log_, "File %s already exists. %s", Path().c_str(), | |
293 | s.ToString().c_str()); | |
294 | } | |
295 | ||
296 | s = NewWritableCacheFile(env_, Path(), &file_); | |
297 | if (!s.ok()) { | |
298 | ROCKS_LOG_WARN(log_, "Unable to create file %s. %s", Path().c_str(), | |
299 | s.ToString().c_str()); | |
300 | return false; | |
301 | } | |
302 | ||
303 | assert(!refs_); | |
304 | ++refs_; | |
305 | ||
306 | return true; | |
307 | } | |
308 | ||
309 | bool WriteableCacheFile::Append(const Slice& key, const Slice& val, LBA* lba) { | |
310 | WriteLock _(&rwlock_); | |
311 | ||
312 | if (eof_) { | |
313 | // We can't append since the file is full | |
314 | return false; | |
315 | } | |
316 | ||
317 | // estimate the space required to store the (key, val) | |
318 | uint32_t rec_size = CacheRecord::CalcSize(key, val); | |
319 | ||
320 | if (!ExpandBuffer(rec_size)) { | |
321 | // unable to expand the buffer | |
322 | ROCKS_LOG_DEBUG(log_, "Error expanding buffers. size=%d", rec_size); | |
323 | return false; | |
324 | } | |
325 | ||
326 | lba->cache_id_ = cache_id_; | |
327 | lba->off_ = disk_woff_; | |
328 | lba->size_ = rec_size; | |
329 | ||
330 | CacheRecord rec(key, val); | |
331 | if (!rec.Serialize(&bufs_, &buf_woff_)) { | |
332 | // unexpected error: unable to serialize the data | |
333 | assert(!"Error serializing record"); | |
334 | return false; | |
335 | } | |
336 | ||
337 | disk_woff_ += rec_size; | |
338 | eof_ = disk_woff_ >= max_size_; | |
339 | ||
340 | // dispatch buffer for flush | |
341 | DispatchBuffer(); | |
342 | ||
343 | return true; | |
344 | } | |
345 | ||
346 | bool WriteableCacheFile::ExpandBuffer(const size_t size) { | |
347 | rwlock_.AssertHeld(); | |
348 | assert(!eof_); | |
349 | ||
350 | // determine if there is enough space | |
351 | size_t free = 0; // compute the free space left in buffer | |
352 | for (size_t i = buf_woff_; i < bufs_.size(); ++i) { | |
353 | free += bufs_[i]->Free(); | |
354 | if (size <= free) { | |
355 | // we have enough space in the buffer | |
356 | return true; | |
357 | } | |
358 | } | |
359 | ||
360 | // expand the buffer until there is enough space to write `size` bytes | |
361 | assert(free < size); | |
362 | while (free < size) { | |
363 | CacheWriteBuffer* const buf = alloc_->Allocate(); | |
364 | if (!buf) { | |
365 | ROCKS_LOG_DEBUG(log_, "Unable to allocate buffers"); | |
366 | return false; | |
367 | } | |
368 | ||
369 | size_ += static_cast<uint32_t>(buf->Free()); | |
370 | free += buf->Free(); | |
371 | bufs_.push_back(buf); | |
372 | } | |
373 | ||
374 | assert(free >= size); | |
375 | return true; | |
376 | } | |
377 | ||
378 | void WriteableCacheFile::DispatchBuffer() { | |
379 | rwlock_.AssertHeld(); | |
380 | ||
381 | assert(bufs_.size()); | |
382 | assert(buf_doff_ <= buf_woff_); | |
383 | assert(buf_woff_ <= bufs_.size()); | |
384 | ||
385 | if (pending_ios_) { | |
386 | return; | |
387 | } | |
388 | ||
389 | if (!eof_ && buf_doff_ == buf_woff_) { | |
390 | // dispatch buffer is pointing to write buffer and we haven't hit eof | |
391 | return; | |
392 | } | |
393 | ||
394 | assert(eof_ || buf_doff_ < buf_woff_); | |
395 | assert(buf_doff_ < bufs_.size()); | |
396 | assert(file_); | |
397 | ||
398 | auto* buf = bufs_[buf_doff_]; | |
399 | const uint64_t file_off = buf_doff_ * alloc_->BufferSize(); | |
400 | ||
401 | assert(!buf->Free() || | |
402 | (eof_ && buf_doff_ == buf_woff_ && buf_woff_ < bufs_.size())); | |
403 | // we have reached end of file, and there is space in the last buffer | |
404 | // pad it with zero for direct IO | |
405 | buf->FillTrailingZeros(); | |
406 | ||
407 | assert(buf->Used() % kFileAlignmentSize == 0); | |
408 | ||
409 | writer_->Write(file_.get(), buf, file_off, | |
410 | std::bind(&WriteableCacheFile::BufferWriteDone, this)); | |
411 | pending_ios_++; | |
412 | buf_doff_++; | |
413 | } | |
414 | ||
415 | void WriteableCacheFile::BufferWriteDone() { | |
416 | WriteLock _(&rwlock_); | |
417 | ||
418 | assert(bufs_.size()); | |
419 | ||
420 | pending_ios_--; | |
421 | ||
422 | if (buf_doff_ < bufs_.size()) { | |
423 | DispatchBuffer(); | |
424 | } | |
425 | ||
426 | if (eof_ && buf_doff_ >= bufs_.size() && !pending_ios_) { | |
427 | // end-of-file reached, move to read mode | |
428 | CloseAndOpenForReading(); | |
429 | } | |
430 | } | |
431 | ||
432 | void WriteableCacheFile::CloseAndOpenForReading() { | |
433 | // Our env abstraction do not allow reading from a file opened for appending | |
434 | // We need close the file and re-open it for reading | |
435 | Close(); | |
436 | RandomAccessCacheFile::OpenImpl(enable_direct_reads_); | |
437 | } | |
438 | ||
439 | bool WriteableCacheFile::ReadBuffer(const LBA& lba, Slice* key, Slice* block, | |
440 | char* scratch) { | |
441 | rwlock_.AssertHeld(); | |
442 | ||
443 | if (!ReadBuffer(lba, scratch)) { | |
444 | Error(log_, "Error reading from buffer. cache=%d off=%d", cache_id_, | |
445 | lba.off_); | |
446 | return false; | |
447 | } | |
448 | ||
449 | return ParseRec(lba, key, block, scratch); | |
450 | } | |
451 | ||
452 | bool WriteableCacheFile::ReadBuffer(const LBA& lba, char* data) { | |
453 | rwlock_.AssertHeld(); | |
454 | ||
455 | assert(lba.off_ < disk_woff_); | |
456 | ||
457 | // we read from the buffers like reading from a flat file. The list of buffers | |
458 | // are treated as contiguous stream of data | |
459 | ||
460 | char* tmp = data; | |
461 | size_t pending_nbytes = lba.size_; | |
462 | // start buffer | |
463 | size_t start_idx = lba.off_ / alloc_->BufferSize(); | |
464 | // offset into the start buffer | |
465 | size_t start_off = lba.off_ % alloc_->BufferSize(); | |
466 | ||
467 | assert(start_idx <= buf_woff_); | |
468 | ||
469 | for (size_t i = start_idx; pending_nbytes && i < bufs_.size(); ++i) { | |
470 | assert(i <= buf_woff_); | |
471 | auto* buf = bufs_[i]; | |
472 | assert(i == buf_woff_ || !buf->Free()); | |
473 | // bytes to write to the buffer | |
474 | size_t nbytes = pending_nbytes > (buf->Used() - start_off) | |
475 | ? (buf->Used() - start_off) | |
476 | : pending_nbytes; | |
477 | memcpy(tmp, buf->Data() + start_off, nbytes); | |
478 | ||
479 | // left over to be written | |
480 | pending_nbytes -= nbytes; | |
481 | start_off = 0; | |
482 | tmp += nbytes; | |
483 | } | |
484 | ||
485 | assert(!pending_nbytes); | |
486 | if (pending_nbytes) { | |
487 | return false; | |
488 | } | |
489 | ||
490 | assert(tmp == data + lba.size_); | |
491 | return true; | |
492 | } | |
493 | ||
494 | void WriteableCacheFile::Close() { | |
495 | rwlock_.AssertHeld(); | |
496 | ||
497 | assert(size_ >= max_size_); | |
498 | assert(disk_woff_ >= max_size_); | |
499 | assert(buf_doff_ == bufs_.size()); | |
500 | assert(bufs_.size() - buf_woff_ <= 1); | |
501 | assert(!pending_ios_); | |
502 | ||
503 | Info(log_, "Closing file %s. size=%d written=%d", Path().c_str(), size_, | |
504 | disk_woff_); | |
505 | ||
506 | ClearBuffers(); | |
507 | file_.reset(); | |
508 | ||
509 | assert(refs_); | |
510 | --refs_; | |
511 | } | |
512 | ||
513 | void WriteableCacheFile::ClearBuffers() { | |
514 | for (size_t i = 0; i < bufs_.size(); ++i) { | |
515 | alloc_->Deallocate(bufs_[i]); | |
516 | } | |
517 | ||
518 | bufs_.clear(); | |
519 | } | |
520 | ||
521 | // | |
522 | // ThreadedFileWriter implementation | |
523 | // | |
524 | ThreadedWriter::ThreadedWriter(PersistentCacheTier* const cache, | |
525 | const size_t qdepth, const size_t io_size) | |
526 | : Writer(cache), io_size_(io_size) { | |
527 | for (size_t i = 0; i < qdepth; ++i) { | |
528 | port::Thread th(&ThreadedWriter::ThreadMain, this); | |
529 | threads_.push_back(std::move(th)); | |
530 | } | |
531 | } | |
532 | ||
533 | void ThreadedWriter::Stop() { | |
534 | // notify all threads to exit | |
535 | for (size_t i = 0; i < threads_.size(); ++i) { | |
536 | q_.Push(IO(/*signal=*/true)); | |
537 | } | |
538 | ||
539 | // wait for all threads to exit | |
540 | for (auto& th : threads_) { | |
541 | th.join(); | |
542 | assert(!th.joinable()); | |
543 | } | |
544 | threads_.clear(); | |
545 | } | |
546 | ||
547 | void ThreadedWriter::Write(WritableFile* const file, CacheWriteBuffer* buf, | |
548 | const uint64_t file_off, | |
549 | const std::function<void()> callback) { | |
550 | q_.Push(IO(file, buf, file_off, callback)); | |
551 | } | |
552 | ||
553 | void ThreadedWriter::ThreadMain() { | |
554 | while (true) { | |
555 | // Fetch the IO to process | |
556 | IO io(q_.Pop()); | |
557 | if (io.signal_) { | |
558 | // that's secret signal to exit | |
559 | break; | |
560 | } | |
561 | ||
562 | // Reserve space for writing the buffer | |
563 | while (!cache_->Reserve(io.buf_->Used())) { | |
564 | // We can fail to reserve space if every file in the system | |
565 | // is being currently accessed | |
566 | /* sleep override */ | |
567 | Env::Default()->SleepForMicroseconds(1000000); | |
568 | } | |
569 | ||
570 | DispatchIO(io); | |
571 | ||
572 | io.callback_(); | |
573 | } | |
574 | } | |
575 | ||
576 | void ThreadedWriter::DispatchIO(const IO& io) { | |
577 | size_t written = 0; | |
578 | while (written < io.buf_->Used()) { | |
579 | Slice data(io.buf_->Data() + written, io_size_); | |
580 | Status s = io.file_->Append(data); | |
581 | assert(s.ok()); | |
582 | if (!s.ok()) { | |
583 | // That is definite IO error to device. There is not much we can | |
584 | // do but ignore the failure. This can lead to corruption of data on | |
585 | // disk, but the cache will skip while reading | |
586 | fprintf(stderr, "Error writing data to file. %s\n", s.ToString().c_str()); | |
587 | } | |
588 | written += io_size_; | |
589 | } | |
590 | } | |
591 | ||
592 | } // namespace rocksdb | |
593 | ||
594 | #endif |