]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2013, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | #ifndef ROCKSDB_LITE |
6 | ||
7 | #include "utilities/persistent_cache/block_cache_tier_file.h" | |
8 | ||
9 | #ifndef OS_WIN | |
10 | #include <unistd.h> | |
11 | #endif | |
12 | #include <functional> | |
13 | #include <memory> | |
14 | #include <vector> | |
15 | ||
f67539c2 TL |
16 | #include "env/composite_env_wrapper.h" |
17 | #include "logging/logging.h" | |
7c673cae FG |
18 | #include "port/port.h" |
19 | #include "util/crc32c.h" | |
7c673cae | 20 | |
f67539c2 | 21 | namespace ROCKSDB_NAMESPACE { |
7c673cae FG |
22 | |
23 | // | |
24 | // File creation factories | |
25 | // | |
26 | Status NewWritableCacheFile(Env* const env, const std::string& filepath, | |
27 | std::unique_ptr<WritableFile>* file, | |
28 | const bool use_direct_writes = false) { | |
29 | EnvOptions opt; | |
30 | opt.use_direct_writes = use_direct_writes; | |
31 | Status s = env->NewWritableFile(filepath, file, opt); | |
32 | return s; | |
33 | } | |
34 | ||
35 | Status NewRandomAccessCacheFile(Env* const env, const std::string& filepath, | |
36 | std::unique_ptr<RandomAccessFile>* file, | |
37 | const bool use_direct_reads = true) { | |
f67539c2 TL |
38 | assert(env); |
39 | ||
7c673cae FG |
40 | EnvOptions opt; |
41 | opt.use_direct_reads = use_direct_reads; | |
42 | Status s = env->NewRandomAccessFile(filepath, file, opt); | |
43 | return s; | |
44 | } | |
45 | ||
46 | // | |
47 | // BlockCacheFile | |
48 | // | |
49 | Status BlockCacheFile::Delete(uint64_t* size) { | |
f67539c2 TL |
50 | assert(env_); |
51 | ||
7c673cae FG |
52 | Status status = env_->GetFileSize(Path(), size); |
53 | if (!status.ok()) { | |
54 | return status; | |
55 | } | |
56 | return env_->DeleteFile(Path()); | |
57 | } | |
58 | ||
59 | // | |
60 | // CacheRecord | |
61 | // | |
62 | // Cache record represents the record on disk | |
63 | // | |
64 | // +--------+---------+----------+------------+---------------+-------------+ | |
65 | // | magic | crc | key size | value size | key data | value data | | |
66 | // +--------+---------+----------+------------+---------------+-------------+ | |
67 | // <-- 4 --><-- 4 --><-- 4 --><-- 4 --><-- key size --><-- v-size --> | |
68 | // | |
69 | struct CacheRecordHeader { | |
11fdf7f2 TL |
70 | CacheRecordHeader() |
71 | : magic_(0), crc_(0), key_size_(0), val_size_(0) {} | |
7c673cae FG |
72 | CacheRecordHeader(const uint32_t magic, const uint32_t key_size, |
73 | const uint32_t val_size) | |
74 | : magic_(magic), crc_(0), key_size_(key_size), val_size_(val_size) {} | |
75 | ||
76 | uint32_t magic_; | |
77 | uint32_t crc_; | |
78 | uint32_t key_size_; | |
79 | uint32_t val_size_; | |
80 | }; | |
81 | ||
82 | struct CacheRecord { | |
83 | CacheRecord() {} | |
84 | CacheRecord(const Slice& key, const Slice& val) | |
85 | : hdr_(MAGIC, static_cast<uint32_t>(key.size()), | |
86 | static_cast<uint32_t>(val.size())), | |
87 | key_(key), | |
88 | val_(val) { | |
89 | hdr_.crc_ = ComputeCRC(); | |
90 | } | |
91 | ||
92 | uint32_t ComputeCRC() const; | |
93 | bool Serialize(std::vector<CacheWriteBuffer*>* bufs, size_t* woff); | |
94 | bool Deserialize(const Slice& buf); | |
95 | ||
96 | static uint32_t CalcSize(const Slice& key, const Slice& val) { | |
97 | return static_cast<uint32_t>(sizeof(CacheRecordHeader) + key.size() + | |
98 | val.size()); | |
99 | } | |
100 | ||
101 | static const uint32_t MAGIC = 0xfefa; | |
102 | ||
103 | bool Append(std::vector<CacheWriteBuffer*>* bufs, size_t* woff, | |
104 | const char* data, const size_t size); | |
105 | ||
106 | CacheRecordHeader hdr_; | |
107 | Slice key_; | |
108 | Slice val_; | |
109 | }; | |
110 | ||
111 | static_assert(sizeof(CacheRecordHeader) == 16, "DataHeader is not aligned"); | |
112 | ||
113 | uint32_t CacheRecord::ComputeCRC() const { | |
114 | uint32_t crc = 0; | |
115 | CacheRecordHeader tmp = hdr_; | |
116 | tmp.crc_ = 0; | |
117 | crc = crc32c::Extend(crc, reinterpret_cast<const char*>(&tmp), sizeof(tmp)); | |
118 | crc = crc32c::Extend(crc, reinterpret_cast<const char*>(key_.data()), | |
119 | key_.size()); | |
120 | crc = crc32c::Extend(crc, reinterpret_cast<const char*>(val_.data()), | |
121 | val_.size()); | |
122 | return crc; | |
123 | } | |
124 | ||
125 | bool CacheRecord::Serialize(std::vector<CacheWriteBuffer*>* bufs, | |
126 | size_t* woff) { | |
127 | assert(bufs->size()); | |
128 | return Append(bufs, woff, reinterpret_cast<const char*>(&hdr_), | |
129 | sizeof(hdr_)) && | |
130 | Append(bufs, woff, reinterpret_cast<const char*>(key_.data()), | |
131 | key_.size()) && | |
132 | Append(bufs, woff, reinterpret_cast<const char*>(val_.data()), | |
133 | val_.size()); | |
134 | } | |
135 | ||
136 | bool CacheRecord::Append(std::vector<CacheWriteBuffer*>* bufs, size_t* woff, | |
137 | const char* data, const size_t data_size) { | |
138 | assert(*woff < bufs->size()); | |
139 | ||
140 | const char* p = data; | |
141 | size_t size = data_size; | |
142 | ||
143 | while (size && *woff < bufs->size()) { | |
144 | CacheWriteBuffer* buf = (*bufs)[*woff]; | |
145 | const size_t free = buf->Free(); | |
146 | if (size <= free) { | |
147 | buf->Append(p, size); | |
148 | size = 0; | |
149 | } else { | |
150 | buf->Append(p, free); | |
151 | p += free; | |
152 | size -= free; | |
153 | assert(!buf->Free()); | |
154 | assert(buf->Used() == buf->Capacity()); | |
155 | } | |
156 | ||
157 | if (!buf->Free()) { | |
158 | *woff += 1; | |
159 | } | |
160 | } | |
161 | ||
162 | assert(!size); | |
163 | ||
164 | return !size; | |
165 | } | |
166 | ||
167 | bool CacheRecord::Deserialize(const Slice& data) { | |
168 | assert(data.size() >= sizeof(CacheRecordHeader)); | |
169 | if (data.size() < sizeof(CacheRecordHeader)) { | |
170 | return false; | |
171 | } | |
172 | ||
173 | memcpy(&hdr_, data.data(), sizeof(hdr_)); | |
174 | ||
175 | assert(hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) == data.size()); | |
176 | if (hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) != data.size()) { | |
177 | return false; | |
178 | } | |
179 | ||
180 | key_ = Slice(data.data_ + sizeof(hdr_), hdr_.key_size_); | |
181 | val_ = Slice(key_.data_ + hdr_.key_size_, hdr_.val_size_); | |
182 | ||
183 | if (!(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_)) { | |
184 | fprintf(stderr, "** magic %d ** \n", hdr_.magic_); | |
185 | fprintf(stderr, "** key_size %d ** \n", hdr_.key_size_); | |
186 | fprintf(stderr, "** val_size %d ** \n", hdr_.val_size_); | |
187 | fprintf(stderr, "** key %s ** \n", key_.ToString().c_str()); | |
188 | fprintf(stderr, "** val %s ** \n", val_.ToString().c_str()); | |
189 | for (size_t i = 0; i < hdr_.val_size_; ++i) { | |
190 | fprintf(stderr, "%d.", (uint8_t)val_.data()[i]); | |
191 | } | |
192 | fprintf(stderr, "\n** cksum %d != %d **", hdr_.crc_, ComputeCRC()); | |
193 | } | |
194 | ||
195 | assert(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_); | |
196 | return hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_; | |
197 | } | |
198 | ||
199 | // | |
200 | // RandomAccessFile | |
201 | // | |
202 | ||
203 | bool RandomAccessCacheFile::Open(const bool enable_direct_reads) { | |
204 | WriteLock _(&rwlock_); | |
205 | return OpenImpl(enable_direct_reads); | |
206 | } | |
207 | ||
208 | bool RandomAccessCacheFile::OpenImpl(const bool enable_direct_reads) { | |
209 | rwlock_.AssertHeld(); | |
210 | ||
211 | ROCKS_LOG_DEBUG(log_, "Opening cache file %s", Path().c_str()); | |
212 | ||
213 | std::unique_ptr<RandomAccessFile> file; | |
214 | Status status = | |
215 | NewRandomAccessCacheFile(env_, Path(), &file, enable_direct_reads); | |
216 | if (!status.ok()) { | |
217 | Error(log_, "Error opening random access file %s. %s", Path().c_str(), | |
218 | status.ToString().c_str()); | |
219 | return false; | |
220 | } | |
f67539c2 TL |
221 | freader_.reset(new RandomAccessFileReader( |
222 | NewLegacyRandomAccessFileWrapper(file), Path(), env_)); | |
7c673cae FG |
223 | |
224 | return true; | |
225 | } | |
226 | ||
227 | bool RandomAccessCacheFile::Read(const LBA& lba, Slice* key, Slice* val, | |
228 | char* scratch) { | |
229 | ReadLock _(&rwlock_); | |
230 | ||
231 | assert(lba.cache_id_ == cache_id_); | |
232 | ||
233 | if (!freader_) { | |
234 | return false; | |
235 | } | |
236 | ||
237 | Slice result; | |
20effc67 TL |
238 | Status s = freader_->Read(IOOptions(), lba.off_, lba.size_, &result, scratch, |
239 | nullptr); | |
7c673cae FG |
240 | if (!s.ok()) { |
241 | Error(log_, "Error reading from file %s. %s", Path().c_str(), | |
242 | s.ToString().c_str()); | |
243 | return false; | |
244 | } | |
245 | ||
246 | assert(result.data() == scratch); | |
247 | ||
248 | return ParseRec(lba, key, val, scratch); | |
249 | } | |
250 | ||
251 | bool RandomAccessCacheFile::ParseRec(const LBA& lba, Slice* key, Slice* val, | |
252 | char* scratch) { | |
253 | Slice data(scratch, lba.size_); | |
254 | ||
255 | CacheRecord rec; | |
256 | if (!rec.Deserialize(data)) { | |
257 | assert(!"Error deserializing data"); | |
258 | Error(log_, "Error de-serializing record from file %s off %d", | |
259 | Path().c_str(), lba.off_); | |
260 | return false; | |
261 | } | |
262 | ||
263 | *key = Slice(rec.key_); | |
264 | *val = Slice(rec.val_); | |
265 | ||
266 | return true; | |
267 | } | |
268 | ||
269 | // | |
270 | // WriteableCacheFile | |
271 | // | |
272 | ||
273 | WriteableCacheFile::~WriteableCacheFile() { | |
274 | WriteLock _(&rwlock_); | |
275 | if (!eof_) { | |
276 | // This file never flushed. We give priority to shutdown since this is a | |
277 | // cache | |
278 | // TODO(krad): Figure a way to flush the pending data | |
279 | if (file_) { | |
280 | assert(refs_ == 1); | |
281 | --refs_; | |
282 | } | |
283 | } | |
284 | assert(!refs_); | |
285 | ClearBuffers(); | |
286 | } | |
287 | ||
11fdf7f2 | 288 | bool WriteableCacheFile::Create(const bool /*enable_direct_writes*/, |
7c673cae FG |
289 | const bool enable_direct_reads) { |
290 | WriteLock _(&rwlock_); | |
291 | ||
292 | enable_direct_reads_ = enable_direct_reads; | |
293 | ||
294 | ROCKS_LOG_DEBUG(log_, "Creating new cache %s (max size is %d B)", | |
295 | Path().c_str(), max_size_); | |
296 | ||
f67539c2 TL |
297 | assert(env_); |
298 | ||
7c673cae FG |
299 | Status s = env_->FileExists(Path()); |
300 | if (s.ok()) { | |
301 | ROCKS_LOG_WARN(log_, "File %s already exists. %s", Path().c_str(), | |
302 | s.ToString().c_str()); | |
303 | } | |
304 | ||
305 | s = NewWritableCacheFile(env_, Path(), &file_); | |
306 | if (!s.ok()) { | |
307 | ROCKS_LOG_WARN(log_, "Unable to create file %s. %s", Path().c_str(), | |
308 | s.ToString().c_str()); | |
309 | return false; | |
310 | } | |
311 | ||
312 | assert(!refs_); | |
313 | ++refs_; | |
314 | ||
315 | return true; | |
316 | } | |
317 | ||
318 | bool WriteableCacheFile::Append(const Slice& key, const Slice& val, LBA* lba) { | |
319 | WriteLock _(&rwlock_); | |
320 | ||
321 | if (eof_) { | |
322 | // We can't append since the file is full | |
323 | return false; | |
324 | } | |
325 | ||
326 | // estimate the space required to store the (key, val) | |
327 | uint32_t rec_size = CacheRecord::CalcSize(key, val); | |
328 | ||
329 | if (!ExpandBuffer(rec_size)) { | |
330 | // unable to expand the buffer | |
331 | ROCKS_LOG_DEBUG(log_, "Error expanding buffers. size=%d", rec_size); | |
332 | return false; | |
333 | } | |
334 | ||
335 | lba->cache_id_ = cache_id_; | |
336 | lba->off_ = disk_woff_; | |
337 | lba->size_ = rec_size; | |
338 | ||
339 | CacheRecord rec(key, val); | |
340 | if (!rec.Serialize(&bufs_, &buf_woff_)) { | |
341 | // unexpected error: unable to serialize the data | |
342 | assert(!"Error serializing record"); | |
343 | return false; | |
344 | } | |
345 | ||
346 | disk_woff_ += rec_size; | |
347 | eof_ = disk_woff_ >= max_size_; | |
348 | ||
349 | // dispatch buffer for flush | |
350 | DispatchBuffer(); | |
351 | ||
352 | return true; | |
353 | } | |
354 | ||
355 | bool WriteableCacheFile::ExpandBuffer(const size_t size) { | |
356 | rwlock_.AssertHeld(); | |
357 | assert(!eof_); | |
358 | ||
359 | // determine if there is enough space | |
360 | size_t free = 0; // compute the free space left in buffer | |
361 | for (size_t i = buf_woff_; i < bufs_.size(); ++i) { | |
362 | free += bufs_[i]->Free(); | |
363 | if (size <= free) { | |
364 | // we have enough space in the buffer | |
365 | return true; | |
366 | } | |
367 | } | |
368 | ||
369 | // expand the buffer until there is enough space to write `size` bytes | |
370 | assert(free < size); | |
f67539c2 TL |
371 | assert(alloc_); |
372 | ||
7c673cae FG |
373 | while (free < size) { |
374 | CacheWriteBuffer* const buf = alloc_->Allocate(); | |
375 | if (!buf) { | |
376 | ROCKS_LOG_DEBUG(log_, "Unable to allocate buffers"); | |
377 | return false; | |
378 | } | |
379 | ||
380 | size_ += static_cast<uint32_t>(buf->Free()); | |
381 | free += buf->Free(); | |
382 | bufs_.push_back(buf); | |
383 | } | |
384 | ||
385 | assert(free >= size); | |
386 | return true; | |
387 | } | |
388 | ||
389 | void WriteableCacheFile::DispatchBuffer() { | |
390 | rwlock_.AssertHeld(); | |
391 | ||
392 | assert(bufs_.size()); | |
393 | assert(buf_doff_ <= buf_woff_); | |
394 | assert(buf_woff_ <= bufs_.size()); | |
395 | ||
396 | if (pending_ios_) { | |
397 | return; | |
398 | } | |
399 | ||
400 | if (!eof_ && buf_doff_ == buf_woff_) { | |
401 | // dispatch buffer is pointing to write buffer and we haven't hit eof | |
402 | return; | |
403 | } | |
404 | ||
405 | assert(eof_ || buf_doff_ < buf_woff_); | |
406 | assert(buf_doff_ < bufs_.size()); | |
407 | assert(file_); | |
f67539c2 | 408 | assert(alloc_); |
7c673cae FG |
409 | |
410 | auto* buf = bufs_[buf_doff_]; | |
411 | const uint64_t file_off = buf_doff_ * alloc_->BufferSize(); | |
412 | ||
413 | assert(!buf->Free() || | |
414 | (eof_ && buf_doff_ == buf_woff_ && buf_woff_ < bufs_.size())); | |
415 | // we have reached end of file, and there is space in the last buffer | |
416 | // pad it with zero for direct IO | |
417 | buf->FillTrailingZeros(); | |
418 | ||
419 | assert(buf->Used() % kFileAlignmentSize == 0); | |
420 | ||
421 | writer_->Write(file_.get(), buf, file_off, | |
422 | std::bind(&WriteableCacheFile::BufferWriteDone, this)); | |
423 | pending_ios_++; | |
424 | buf_doff_++; | |
425 | } | |
426 | ||
427 | void WriteableCacheFile::BufferWriteDone() { | |
428 | WriteLock _(&rwlock_); | |
429 | ||
430 | assert(bufs_.size()); | |
431 | ||
432 | pending_ios_--; | |
433 | ||
434 | if (buf_doff_ < bufs_.size()) { | |
435 | DispatchBuffer(); | |
436 | } | |
437 | ||
438 | if (eof_ && buf_doff_ >= bufs_.size() && !pending_ios_) { | |
439 | // end-of-file reached, move to read mode | |
440 | CloseAndOpenForReading(); | |
441 | } | |
442 | } | |
443 | ||
444 | void WriteableCacheFile::CloseAndOpenForReading() { | |
445 | // Our env abstraction do not allow reading from a file opened for appending | |
446 | // We need close the file and re-open it for reading | |
447 | Close(); | |
448 | RandomAccessCacheFile::OpenImpl(enable_direct_reads_); | |
449 | } | |
450 | ||
451 | bool WriteableCacheFile::ReadBuffer(const LBA& lba, Slice* key, Slice* block, | |
452 | char* scratch) { | |
453 | rwlock_.AssertHeld(); | |
454 | ||
455 | if (!ReadBuffer(lba, scratch)) { | |
456 | Error(log_, "Error reading from buffer. cache=%d off=%d", cache_id_, | |
457 | lba.off_); | |
458 | return false; | |
459 | } | |
460 | ||
461 | return ParseRec(lba, key, block, scratch); | |
462 | } | |
463 | ||
464 | bool WriteableCacheFile::ReadBuffer(const LBA& lba, char* data) { | |
465 | rwlock_.AssertHeld(); | |
466 | ||
467 | assert(lba.off_ < disk_woff_); | |
f67539c2 | 468 | assert(alloc_); |
7c673cae FG |
469 | |
470 | // we read from the buffers like reading from a flat file. The list of buffers | |
471 | // are treated as contiguous stream of data | |
472 | ||
473 | char* tmp = data; | |
474 | size_t pending_nbytes = lba.size_; | |
475 | // start buffer | |
476 | size_t start_idx = lba.off_ / alloc_->BufferSize(); | |
477 | // offset into the start buffer | |
478 | size_t start_off = lba.off_ % alloc_->BufferSize(); | |
479 | ||
480 | assert(start_idx <= buf_woff_); | |
481 | ||
482 | for (size_t i = start_idx; pending_nbytes && i < bufs_.size(); ++i) { | |
483 | assert(i <= buf_woff_); | |
484 | auto* buf = bufs_[i]; | |
485 | assert(i == buf_woff_ || !buf->Free()); | |
486 | // bytes to write to the buffer | |
487 | size_t nbytes = pending_nbytes > (buf->Used() - start_off) | |
488 | ? (buf->Used() - start_off) | |
489 | : pending_nbytes; | |
490 | memcpy(tmp, buf->Data() + start_off, nbytes); | |
491 | ||
492 | // left over to be written | |
493 | pending_nbytes -= nbytes; | |
494 | start_off = 0; | |
495 | tmp += nbytes; | |
496 | } | |
497 | ||
498 | assert(!pending_nbytes); | |
499 | if (pending_nbytes) { | |
500 | return false; | |
501 | } | |
502 | ||
503 | assert(tmp == data + lba.size_); | |
504 | return true; | |
505 | } | |
506 | ||
507 | void WriteableCacheFile::Close() { | |
508 | rwlock_.AssertHeld(); | |
509 | ||
510 | assert(size_ >= max_size_); | |
511 | assert(disk_woff_ >= max_size_); | |
512 | assert(buf_doff_ == bufs_.size()); | |
513 | assert(bufs_.size() - buf_woff_ <= 1); | |
514 | assert(!pending_ios_); | |
515 | ||
516 | Info(log_, "Closing file %s. size=%d written=%d", Path().c_str(), size_, | |
517 | disk_woff_); | |
518 | ||
519 | ClearBuffers(); | |
520 | file_.reset(); | |
521 | ||
522 | assert(refs_); | |
523 | --refs_; | |
524 | } | |
525 | ||
526 | void WriteableCacheFile::ClearBuffers() { | |
f67539c2 TL |
527 | assert(alloc_); |
528 | ||
7c673cae FG |
529 | for (size_t i = 0; i < bufs_.size(); ++i) { |
530 | alloc_->Deallocate(bufs_[i]); | |
531 | } | |
532 | ||
533 | bufs_.clear(); | |
534 | } | |
535 | ||
536 | // | |
537 | // ThreadedFileWriter implementation | |
538 | // | |
539 | ThreadedWriter::ThreadedWriter(PersistentCacheTier* const cache, | |
540 | const size_t qdepth, const size_t io_size) | |
541 | : Writer(cache), io_size_(io_size) { | |
542 | for (size_t i = 0; i < qdepth; ++i) { | |
543 | port::Thread th(&ThreadedWriter::ThreadMain, this); | |
544 | threads_.push_back(std::move(th)); | |
545 | } | |
546 | } | |
547 | ||
548 | void ThreadedWriter::Stop() { | |
549 | // notify all threads to exit | |
550 | for (size_t i = 0; i < threads_.size(); ++i) { | |
551 | q_.Push(IO(/*signal=*/true)); | |
552 | } | |
553 | ||
554 | // wait for all threads to exit | |
555 | for (auto& th : threads_) { | |
556 | th.join(); | |
557 | assert(!th.joinable()); | |
558 | } | |
559 | threads_.clear(); | |
560 | } | |
561 | ||
562 | void ThreadedWriter::Write(WritableFile* const file, CacheWriteBuffer* buf, | |
563 | const uint64_t file_off, | |
564 | const std::function<void()> callback) { | |
565 | q_.Push(IO(file, buf, file_off, callback)); | |
566 | } | |
567 | ||
568 | void ThreadedWriter::ThreadMain() { | |
569 | while (true) { | |
570 | // Fetch the IO to process | |
571 | IO io(q_.Pop()); | |
572 | if (io.signal_) { | |
573 | // that's secret signal to exit | |
574 | break; | |
575 | } | |
576 | ||
577 | // Reserve space for writing the buffer | |
578 | while (!cache_->Reserve(io.buf_->Used())) { | |
579 | // We can fail to reserve space if every file in the system | |
580 | // is being currently accessed | |
581 | /* sleep override */ | |
582 | Env::Default()->SleepForMicroseconds(1000000); | |
583 | } | |
584 | ||
585 | DispatchIO(io); | |
586 | ||
587 | io.callback_(); | |
588 | } | |
589 | } | |
590 | ||
591 | void ThreadedWriter::DispatchIO(const IO& io) { | |
592 | size_t written = 0; | |
593 | while (written < io.buf_->Used()) { | |
594 | Slice data(io.buf_->Data() + written, io_size_); | |
595 | Status s = io.file_->Append(data); | |
596 | assert(s.ok()); | |
597 | if (!s.ok()) { | |
598 | // That is definite IO error to device. There is not much we can | |
599 | // do but ignore the failure. This can lead to corruption of data on | |
600 | // disk, but the cache will skip while reading | |
601 | fprintf(stderr, "Error writing data to file. %s\n", s.ToString().c_str()); | |
602 | } | |
603 | written += io_size_; | |
604 | } | |
605 | } | |
606 | ||
f67539c2 | 607 | } // namespace ROCKSDB_NAMESPACE |
7c673cae FG |
608 | |
609 | #endif |