1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "file/writable_file_writer.h"
15 #include "db/version_edit.h"
16 #include "monitoring/histogram.h"
17 #include "monitoring/iostats_context_imp.h"
18 #include "port/port.h"
19 #include "test_util/sync_point.h"
20 #include "util/random.h"
21 #include "util/rate_limiter.h"
23 namespace ROCKSDB_NAMESPACE
{
24 IOStatus
WritableFileWriter::Append(const Slice
& data
) {
25 const char* src
= data
.data();
26 size_t left
= data
.size();
30 TEST_KILL_RANDOM("WritableFileWriter::Append:0",
31 rocksdb_kill_odds
* REDUCE_ODDS2
);
33 // Calculate the checksum of appended data
34 UpdateFileChecksum(data
);
37 IOSTATS_TIMER_GUARD(prepare_write_nanos
);
38 TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite");
39 writable_file_
->PrepareWrite(static_cast<size_t>(GetFileSize()), left
,
40 IOOptions(), nullptr);
43 // See whether we need to enlarge the buffer to avoid the flush
44 if (buf_
.Capacity() - buf_
.CurrentSize() < left
) {
45 for (size_t cap
= buf_
.Capacity();
46 cap
< max_buffer_size_
; // There is still room to increase
48 // See whether the next available size is large enough.
49 // Buffer will never be increased to more than max_buffer_size_.
50 size_t desired_capacity
= std::min(cap
* 2, max_buffer_size_
);
51 if (desired_capacity
- buf_
.CurrentSize() >= left
||
52 (use_direct_io() && desired_capacity
== max_buffer_size_
)) {
53 buf_
.AllocateNewBuffer(desired_capacity
, true);
59 // Flush only when buffered I/O
60 if (!use_direct_io() && (buf_
.Capacity() - buf_
.CurrentSize()) < left
) {
61 if (buf_
.CurrentSize() > 0) {
67 assert(buf_
.CurrentSize() == 0);
70 // We never write directly to disk with direct I/O on.
71 // or we simply use it for its original purpose to accumulate many small
73 if (use_direct_io() || (buf_
.Capacity() >= left
)) {
75 size_t appended
= buf_
.Append(src
, left
);
87 // Writing directly to file bypassing the buffer
88 assert(buf_
.CurrentSize() == 0);
89 s
= WriteBuffered(src
, left
);
92 TEST_KILL_RANDOM("WritableFileWriter::Append:1", rocksdb_kill_odds
);
94 filesize_
+= data
.size();
99 IOStatus
WritableFileWriter::Pad(const size_t pad_bytes
) {
100 assert(pad_bytes
< kDefaultPageSize
);
101 size_t left
= pad_bytes
;
102 size_t cap
= buf_
.Capacity() - buf_
.CurrentSize();
104 // Assume pad_bytes is small compared to buf_ capacity. So we always
105 // use buf_ rather than write directly to file in certain cases like
108 size_t append_bytes
= std::min(cap
, left
);
109 buf_
.PadWith(append_bytes
, 0);
110 left
-= append_bytes
;
112 IOStatus s
= Flush();
117 cap
= buf_
.Capacity() - buf_
.CurrentSize();
119 pending_sync_
= true;
120 filesize_
+= pad_bytes
;
121 return IOStatus::OK();
124 IOStatus
WritableFileWriter::Close() {
125 // Do not quit immediately on failure the file MUST be closed
128 // Possible to close it twice now as we MUST close
129 // in __dtor, simply flushing is not enough
130 // Windows when pre-allocating does not fill with zeros
131 // also with unbuffered access we also set the end of data.
132 if (writable_file_
.get() == nullptr) {
136 s
= Flush(); // flush cache to OS
139 // In direct I/O mode we write whole pages so
140 // we need to let the file know where data ends.
141 if (use_direct_io()) {
144 FileOperationInfo::StartTimePoint start_ts
;
145 if (ShouldNotifyListeners()) {
146 start_ts
= FileOperationInfo::StartNow();
149 interim
= writable_file_
->Truncate(filesize_
, IOOptions(), nullptr);
151 if (ShouldNotifyListeners()) {
152 auto finish_ts
= FileOperationInfo::FinishNow();
153 NotifyOnFileTruncateFinish(start_ts
, finish_ts
, s
);
160 FileOperationInfo::StartTimePoint start_ts
;
161 if (ShouldNotifyListeners()) {
162 start_ts
= FileOperationInfo::StartNow();
165 interim
= writable_file_
->Fsync(IOOptions(), nullptr);
167 if (ShouldNotifyListeners()) {
168 auto finish_ts
= FileOperationInfo::FinishNow();
169 NotifyOnFileSyncFinish(start_ts
, finish_ts
, s
,
170 FileOperationType::kFsync
);
175 if (!interim
.ok() && s
.ok()) {
180 TEST_KILL_RANDOM("WritableFileWriter::Close:0", rocksdb_kill_odds
);
183 FileOperationInfo::StartTimePoint start_ts
;
184 if (ShouldNotifyListeners()) {
185 start_ts
= FileOperationInfo::StartNow();
188 interim
= writable_file_
->Close(IOOptions(), nullptr);
190 if (ShouldNotifyListeners()) {
191 auto finish_ts
= FileOperationInfo::FinishNow();
192 NotifyOnFileCloseFinish(start_ts
, finish_ts
, s
);
196 if (!interim
.ok() && s
.ok()) {
200 writable_file_
.reset();
201 TEST_KILL_RANDOM("WritableFileWriter::Close:1", rocksdb_kill_odds
);
203 if (s
.ok() && checksum_generator_
!= nullptr && !checksum_finalized_
) {
204 checksum_generator_
->Finalize();
205 checksum_finalized_
= true;
211 // write out the cached data to the OS cache or storage if direct I/O
213 IOStatus
WritableFileWriter::Flush() {
215 TEST_KILL_RANDOM("WritableFileWriter::Flush:0",
216 rocksdb_kill_odds
* REDUCE_ODDS2
);
218 if (buf_
.CurrentSize() > 0) {
219 if (use_direct_io()) {
224 #endif // !ROCKSDB_LITE
226 s
= WriteBuffered(buf_
.BufferStart(), buf_
.CurrentSize());
235 FileOperationInfo::StartTimePoint start_ts
;
236 if (ShouldNotifyListeners()) {
237 start_ts
= FileOperationInfo::StartNow();
240 s
= writable_file_
->Flush(IOOptions(), nullptr);
242 if (ShouldNotifyListeners()) {
243 auto finish_ts
= std::chrono::steady_clock::now();
244 NotifyOnFileFlushFinish(start_ts
, finish_ts
, s
);
253 // sync OS cache to disk for every bytes_per_sync_
254 // TODO: give log file and sst file different options (log
255 // files could be potentially cached in OS for their whole
256 // life time, thus we might not want to flush at all).
258 // We try to avoid sync to the last 1MB of data. For two reasons:
259 // (1) avoid rewrite the same page that is modified later.
260 // (2) for older version of OS, write can block while writing out
262 // Xfs does neighbor page flushing outside of the specified ranges. We
263 // need to make sure sync range is far from the write offset.
264 if (!use_direct_io() && bytes_per_sync_
) {
265 const uint64_t kBytesNotSyncRange
=
266 1024 * 1024; // recent 1MB is not synced.
267 const uint64_t kBytesAlignWhenSync
= 4 * 1024; // Align 4KB.
268 if (filesize_
> kBytesNotSyncRange
) {
269 uint64_t offset_sync_to
= filesize_
- kBytesNotSyncRange
;
270 offset_sync_to
-= offset_sync_to
% kBytesAlignWhenSync
;
271 assert(offset_sync_to
>= last_sync_size_
);
272 if (offset_sync_to
> 0 &&
273 offset_sync_to
- last_sync_size_
>= bytes_per_sync_
) {
274 s
= RangeSync(last_sync_size_
, offset_sync_to
- last_sync_size_
);
275 last_sync_size_
= offset_sync_to
;
283 std::string
WritableFileWriter::GetFileChecksum() {
284 if (checksum_generator_
!= nullptr) {
285 assert(checksum_finalized_
);
286 return checksum_generator_
->GetChecksum();
288 return kUnknownFileChecksum
;
292 const char* WritableFileWriter::GetFileChecksumFuncName() const {
293 if (checksum_generator_
!= nullptr) {
294 return checksum_generator_
->Name();
296 return kUnknownFileChecksumFuncName
;
300 IOStatus
WritableFileWriter::Sync(bool use_fsync
) {
301 IOStatus s
= Flush();
305 TEST_KILL_RANDOM("WritableFileWriter::Sync:0", rocksdb_kill_odds
);
306 if (!use_direct_io() && pending_sync_
) {
307 s
= SyncInternal(use_fsync
);
312 TEST_KILL_RANDOM("WritableFileWriter::Sync:1", rocksdb_kill_odds
);
313 pending_sync_
= false;
314 return IOStatus::OK();
317 IOStatus
WritableFileWriter::SyncWithoutFlush(bool use_fsync
) {
318 if (!writable_file_
->IsSyncThreadSafe()) {
319 return IOStatus::NotSupported(
320 "Can't WritableFileWriter::SyncWithoutFlush() because "
321 "WritableFile::IsSyncThreadSafe() is false");
323 TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1");
324 IOStatus s
= SyncInternal(use_fsync
);
325 TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2");
329 IOStatus
WritableFileWriter::SyncInternal(bool use_fsync
) {
331 IOSTATS_TIMER_GUARD(fsync_nanos
);
332 TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0");
333 auto prev_perf_level
= GetPerfLevel();
334 IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos
, env_
);
336 FileOperationInfo::StartTimePoint start_ts
;
337 if (ShouldNotifyListeners()) {
338 start_ts
= FileOperationInfo::StartNow();
342 s
= writable_file_
->Fsync(IOOptions(), nullptr);
344 s
= writable_file_
->Sync(IOOptions(), nullptr);
347 if (ShouldNotifyListeners()) {
348 auto finish_ts
= std::chrono::steady_clock::now();
349 NotifyOnFileSyncFinish(
350 start_ts
, finish_ts
, s
,
351 use_fsync
? FileOperationType::kFsync
: FileOperationType::kSync
);
354 SetPerfLevel(prev_perf_level
);
358 IOStatus
WritableFileWriter::RangeSync(uint64_t offset
, uint64_t nbytes
) {
359 IOSTATS_TIMER_GUARD(range_sync_nanos
);
360 TEST_SYNC_POINT("WritableFileWriter::RangeSync:0");
362 FileOperationInfo::StartTimePoint start_ts
;
363 if (ShouldNotifyListeners()) {
364 start_ts
= FileOperationInfo::StartNow();
367 IOStatus s
= writable_file_
->RangeSync(offset
, nbytes
, IOOptions(), nullptr);
369 if (ShouldNotifyListeners()) {
370 auto finish_ts
= std::chrono::steady_clock::now();
371 NotifyOnFileRangeSyncFinish(offset
, nbytes
, start_ts
, finish_ts
, s
);
377 // This method writes to disk the specified data and makes use of the rate
378 // limiter if available
379 IOStatus
WritableFileWriter::WriteBuffered(const char* data
, size_t size
) {
381 assert(!use_direct_io());
382 const char* src
= data
;
387 if (rate_limiter_
!= nullptr) {
388 allowed
= rate_limiter_
->RequestToken(
389 left
, 0 /* alignment */, writable_file_
->GetIOPriority(), stats_
,
390 RateLimiter::OpType::kWrite
);
396 IOSTATS_TIMER_GUARD(write_nanos
);
397 TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
400 FileOperationInfo::StartTimePoint start_ts
;
401 uint64_t old_size
= writable_file_
->GetFileSize(IOOptions(), nullptr);
402 if (ShouldNotifyListeners()) {
403 start_ts
= FileOperationInfo::StartNow();
404 old_size
= next_write_offset_
;
408 auto prev_perf_level
= GetPerfLevel();
409 IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos
, env_
);
410 s
= writable_file_
->Append(Slice(src
, allowed
), IOOptions(), nullptr);
411 SetPerfLevel(prev_perf_level
);
414 if (ShouldNotifyListeners()) {
415 auto finish_ts
= std::chrono::steady_clock::now();
416 NotifyOnFileWriteFinish(old_size
, allowed
, start_ts
, finish_ts
, s
);
424 IOSTATS_ADD(bytes_written
, allowed
);
425 TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0", rocksdb_kill_odds
);
434 void WritableFileWriter::UpdateFileChecksum(const Slice
& data
) {
435 if (checksum_generator_
!= nullptr) {
436 checksum_generator_
->Update(data
.data(), data
.size());
440 // This flushes the accumulated data in the buffer. We pad data with zeros if
441 // necessary to the whole page.
442 // However, during automatic flushes padding would not be necessary.
443 // We always use RateLimiter if available. We move (Refit) any buffer bytes
444 // that are left over the
445 // whole number of pages to be written again on the next flush because we can
446 // only write on aligned
449 IOStatus
WritableFileWriter::WriteDirect() {
450 assert(use_direct_io());
452 const size_t alignment
= buf_
.Alignment();
453 assert((next_write_offset_
% alignment
) == 0);
455 // Calculate whole page final file advance if all writes succeed
456 size_t file_advance
= TruncateToPageBoundary(alignment
, buf_
.CurrentSize());
458 // Calculate the leftover tail, we write it here padded with zeros BUT we
460 // it again in the future either on Close() OR when the current whole page
462 size_t leftover_tail
= buf_
.CurrentSize() - file_advance
;
465 buf_
.PadToAlignmentWith(0);
467 const char* src
= buf_
.BufferStart();
468 uint64_t write_offset
= next_write_offset_
;
469 size_t left
= buf_
.CurrentSize();
472 // Check how much is allowed
474 if (rate_limiter_
!= nullptr) {
475 size
= rate_limiter_
->RequestToken(left
, buf_
.Alignment(),
476 writable_file_
->GetIOPriority(),
477 stats_
, RateLimiter::OpType::kWrite
);
483 IOSTATS_TIMER_GUARD(write_nanos
);
484 TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
485 FileOperationInfo::StartTimePoint start_ts
;
486 if (ShouldNotifyListeners()) {
487 start_ts
= FileOperationInfo::StartNow();
489 // direct writes must be positional
490 s
= writable_file_
->PositionedAppend(Slice(src
, size
), write_offset
,
491 IOOptions(), nullptr);
492 if (ShouldNotifyListeners()) {
493 auto finish_ts
= std::chrono::steady_clock::now();
494 NotifyOnFileWriteFinish(write_offset
, size
, start_ts
, finish_ts
, s
);
497 buf_
.Size(file_advance
+ leftover_tail
);
502 IOSTATS_ADD(bytes_written
, size
);
505 write_offset
+= size
;
506 assert((next_write_offset_
% alignment
) == 0);
510 // Move the tail to the beginning of the buffer
511 // This never happens during normal Append but rather during
512 // explicit call to Flush()/Sync() or Close()
513 buf_
.RefitTail(file_advance
, leftover_tail
);
514 // This is where we start writing next time which may or not be
515 // the actual file size on disk. They match if the buffer size
516 // is a multiple of whole pages otherwise filesize_ is leftover_tail
518 next_write_offset_
+= file_advance
;
522 #endif // !ROCKSDB_LITE
523 } // namespace ROCKSDB_NAMESPACE