]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/file/writable_file_writer.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / file / writable_file_writer.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "file/writable_file_writer.h"
11
12 #include <algorithm>
13 #include <mutex>
14
15 #include "db/version_edit.h"
16 #include "monitoring/histogram.h"
17 #include "monitoring/iostats_context_imp.h"
18 #include "port/port.h"
19 #include "test_util/sync_point.h"
20 #include "util/random.h"
21 #include "util/rate_limiter.h"
22
23 namespace ROCKSDB_NAMESPACE {
24 IOStatus WritableFileWriter::Append(const Slice& data) {
25 const char* src = data.data();
26 size_t left = data.size();
27 IOStatus s;
28 pending_sync_ = true;
29
30 TEST_KILL_RANDOM("WritableFileWriter::Append:0",
31 rocksdb_kill_odds * REDUCE_ODDS2);
32
33 // Calculate the checksum of appended data
34 UpdateFileChecksum(data);
35
36 {
37 IOSTATS_TIMER_GUARD(prepare_write_nanos);
38 TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite");
39 writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left,
40 IOOptions(), nullptr);
41 }
42
43 // See whether we need to enlarge the buffer to avoid the flush
44 if (buf_.Capacity() - buf_.CurrentSize() < left) {
45 for (size_t cap = buf_.Capacity();
46 cap < max_buffer_size_; // There is still room to increase
47 cap *= 2) {
48 // See whether the next available size is large enough.
49 // Buffer will never be increased to more than max_buffer_size_.
50 size_t desired_capacity = std::min(cap * 2, max_buffer_size_);
51 if (desired_capacity - buf_.CurrentSize() >= left ||
52 (use_direct_io() && desired_capacity == max_buffer_size_)) {
53 buf_.AllocateNewBuffer(desired_capacity, true);
54 break;
55 }
56 }
57 }
58
59 // Flush only when buffered I/O
60 if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) {
61 if (buf_.CurrentSize() > 0) {
62 s = Flush();
63 if (!s.ok()) {
64 return s;
65 }
66 }
67 assert(buf_.CurrentSize() == 0);
68 }
69
70 // We never write directly to disk with direct I/O on.
71 // or we simply use it for its original purpose to accumulate many small
72 // chunks
73 if (use_direct_io() || (buf_.Capacity() >= left)) {
74 while (left > 0) {
75 size_t appended = buf_.Append(src, left);
76 left -= appended;
77 src += appended;
78
79 if (left > 0) {
80 s = Flush();
81 if (!s.ok()) {
82 break;
83 }
84 }
85 }
86 } else {
87 // Writing directly to file bypassing the buffer
88 assert(buf_.CurrentSize() == 0);
89 s = WriteBuffered(src, left);
90 }
91
92 TEST_KILL_RANDOM("WritableFileWriter::Append:1", rocksdb_kill_odds);
93 if (s.ok()) {
94 filesize_ += data.size();
95 }
96 return s;
97 }
98
99 IOStatus WritableFileWriter::Pad(const size_t pad_bytes) {
100 assert(pad_bytes < kDefaultPageSize);
101 size_t left = pad_bytes;
102 size_t cap = buf_.Capacity() - buf_.CurrentSize();
103
104 // Assume pad_bytes is small compared to buf_ capacity. So we always
105 // use buf_ rather than write directly to file in certain cases like
106 // Append() does.
107 while (left) {
108 size_t append_bytes = std::min(cap, left);
109 buf_.PadWith(append_bytes, 0);
110 left -= append_bytes;
111 if (left > 0) {
112 IOStatus s = Flush();
113 if (!s.ok()) {
114 return s;
115 }
116 }
117 cap = buf_.Capacity() - buf_.CurrentSize();
118 }
119 pending_sync_ = true;
120 filesize_ += pad_bytes;
121 return IOStatus::OK();
122 }
123
124 IOStatus WritableFileWriter::Close() {
125 // Do not quit immediately on failure the file MUST be closed
126 IOStatus s;
127
128 // Possible to close it twice now as we MUST close
129 // in __dtor, simply flushing is not enough
130 // Windows when pre-allocating does not fill with zeros
131 // also with unbuffered access we also set the end of data.
132 if (writable_file_.get() == nullptr) {
133 return s;
134 }
135
136 s = Flush(); // flush cache to OS
137
138 IOStatus interim;
139 // In direct I/O mode we write whole pages so
140 // we need to let the file know where data ends.
141 if (use_direct_io()) {
142 {
143 #ifndef ROCKSDB_LITE
144 FileOperationInfo::StartTimePoint start_ts;
145 if (ShouldNotifyListeners()) {
146 start_ts = FileOperationInfo::StartNow();
147 }
148 #endif
149 interim = writable_file_->Truncate(filesize_, IOOptions(), nullptr);
150 #ifndef ROCKSDB_LITE
151 if (ShouldNotifyListeners()) {
152 auto finish_ts = FileOperationInfo::FinishNow();
153 NotifyOnFileTruncateFinish(start_ts, finish_ts, s);
154 }
155 #endif
156 }
157 if (interim.ok()) {
158 {
159 #ifndef ROCKSDB_LITE
160 FileOperationInfo::StartTimePoint start_ts;
161 if (ShouldNotifyListeners()) {
162 start_ts = FileOperationInfo::StartNow();
163 }
164 #endif
165 interim = writable_file_->Fsync(IOOptions(), nullptr);
166 #ifndef ROCKSDB_LITE
167 if (ShouldNotifyListeners()) {
168 auto finish_ts = FileOperationInfo::FinishNow();
169 NotifyOnFileSyncFinish(start_ts, finish_ts, s,
170 FileOperationType::kFsync);
171 }
172 #endif
173 }
174 }
175 if (!interim.ok() && s.ok()) {
176 s = interim;
177 }
178 }
179
180 TEST_KILL_RANDOM("WritableFileWriter::Close:0", rocksdb_kill_odds);
181 {
182 #ifndef ROCKSDB_LITE
183 FileOperationInfo::StartTimePoint start_ts;
184 if (ShouldNotifyListeners()) {
185 start_ts = FileOperationInfo::StartNow();
186 }
187 #endif
188 interim = writable_file_->Close(IOOptions(), nullptr);
189 #ifndef ROCKSDB_LITE
190 if (ShouldNotifyListeners()) {
191 auto finish_ts = FileOperationInfo::FinishNow();
192 NotifyOnFileCloseFinish(start_ts, finish_ts, s);
193 }
194 #endif
195 }
196 if (!interim.ok() && s.ok()) {
197 s = interim;
198 }
199
200 writable_file_.reset();
201 TEST_KILL_RANDOM("WritableFileWriter::Close:1", rocksdb_kill_odds);
202
203 if (s.ok() && checksum_generator_ != nullptr && !checksum_finalized_) {
204 checksum_generator_->Finalize();
205 checksum_finalized_ = true;
206 }
207
208 return s;
209 }
210
211 // write out the cached data to the OS cache or storage if direct I/O
212 // enabled
213 IOStatus WritableFileWriter::Flush() {
214 IOStatus s;
215 TEST_KILL_RANDOM("WritableFileWriter::Flush:0",
216 rocksdb_kill_odds * REDUCE_ODDS2);
217
218 if (buf_.CurrentSize() > 0) {
219 if (use_direct_io()) {
220 #ifndef ROCKSDB_LITE
221 if (pending_sync_) {
222 s = WriteDirect();
223 }
224 #endif // !ROCKSDB_LITE
225 } else {
226 s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize());
227 }
228 if (!s.ok()) {
229 return s;
230 }
231 }
232
233 {
234 #ifndef ROCKSDB_LITE
235 FileOperationInfo::StartTimePoint start_ts;
236 if (ShouldNotifyListeners()) {
237 start_ts = FileOperationInfo::StartNow();
238 }
239 #endif
240 s = writable_file_->Flush(IOOptions(), nullptr);
241 #ifndef ROCKSDB_LITE
242 if (ShouldNotifyListeners()) {
243 auto finish_ts = std::chrono::steady_clock::now();
244 NotifyOnFileFlushFinish(start_ts, finish_ts, s);
245 }
246 #endif
247 }
248
249 if (!s.ok()) {
250 return s;
251 }
252
253 // sync OS cache to disk for every bytes_per_sync_
254 // TODO: give log file and sst file different options (log
255 // files could be potentially cached in OS for their whole
256 // life time, thus we might not want to flush at all).
257
258 // We try to avoid sync to the last 1MB of data. For two reasons:
259 // (1) avoid rewrite the same page that is modified later.
260 // (2) for older version of OS, write can block while writing out
261 // the page.
262 // Xfs does neighbor page flushing outside of the specified ranges. We
263 // need to make sure sync range is far from the write offset.
264 if (!use_direct_io() && bytes_per_sync_) {
265 const uint64_t kBytesNotSyncRange =
266 1024 * 1024; // recent 1MB is not synced.
267 const uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB.
268 if (filesize_ > kBytesNotSyncRange) {
269 uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange;
270 offset_sync_to -= offset_sync_to % kBytesAlignWhenSync;
271 assert(offset_sync_to >= last_sync_size_);
272 if (offset_sync_to > 0 &&
273 offset_sync_to - last_sync_size_ >= bytes_per_sync_) {
274 s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_);
275 last_sync_size_ = offset_sync_to;
276 }
277 }
278 }
279
280 return s;
281 }
282
283 std::string WritableFileWriter::GetFileChecksum() {
284 if (checksum_generator_ != nullptr) {
285 assert(checksum_finalized_);
286 return checksum_generator_->GetChecksum();
287 } else {
288 return kUnknownFileChecksum;
289 }
290 }
291
292 const char* WritableFileWriter::GetFileChecksumFuncName() const {
293 if (checksum_generator_ != nullptr) {
294 return checksum_generator_->Name();
295 } else {
296 return kUnknownFileChecksumFuncName;
297 }
298 }
299
300 IOStatus WritableFileWriter::Sync(bool use_fsync) {
301 IOStatus s = Flush();
302 if (!s.ok()) {
303 return s;
304 }
305 TEST_KILL_RANDOM("WritableFileWriter::Sync:0", rocksdb_kill_odds);
306 if (!use_direct_io() && pending_sync_) {
307 s = SyncInternal(use_fsync);
308 if (!s.ok()) {
309 return s;
310 }
311 }
312 TEST_KILL_RANDOM("WritableFileWriter::Sync:1", rocksdb_kill_odds);
313 pending_sync_ = false;
314 return IOStatus::OK();
315 }
316
317 IOStatus WritableFileWriter::SyncWithoutFlush(bool use_fsync) {
318 if (!writable_file_->IsSyncThreadSafe()) {
319 return IOStatus::NotSupported(
320 "Can't WritableFileWriter::SyncWithoutFlush() because "
321 "WritableFile::IsSyncThreadSafe() is false");
322 }
323 TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1");
324 IOStatus s = SyncInternal(use_fsync);
325 TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2");
326 return s;
327 }
328
329 IOStatus WritableFileWriter::SyncInternal(bool use_fsync) {
330 IOStatus s;
331 IOSTATS_TIMER_GUARD(fsync_nanos);
332 TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0");
333 auto prev_perf_level = GetPerfLevel();
334 IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_);
335 #ifndef ROCKSDB_LITE
336 FileOperationInfo::StartTimePoint start_ts;
337 if (ShouldNotifyListeners()) {
338 start_ts = FileOperationInfo::StartNow();
339 }
340 #endif
341 if (use_fsync) {
342 s = writable_file_->Fsync(IOOptions(), nullptr);
343 } else {
344 s = writable_file_->Sync(IOOptions(), nullptr);
345 }
346 #ifndef ROCKSDB_LITE
347 if (ShouldNotifyListeners()) {
348 auto finish_ts = std::chrono::steady_clock::now();
349 NotifyOnFileSyncFinish(
350 start_ts, finish_ts, s,
351 use_fsync ? FileOperationType::kFsync : FileOperationType::kSync);
352 }
353 #endif
354 SetPerfLevel(prev_perf_level);
355 return s;
356 }
357
358 IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
359 IOSTATS_TIMER_GUARD(range_sync_nanos);
360 TEST_SYNC_POINT("WritableFileWriter::RangeSync:0");
361 #ifndef ROCKSDB_LITE
362 FileOperationInfo::StartTimePoint start_ts;
363 if (ShouldNotifyListeners()) {
364 start_ts = FileOperationInfo::StartNow();
365 }
366 #endif
367 IOStatus s = writable_file_->RangeSync(offset, nbytes, IOOptions(), nullptr);
368 #ifndef ROCKSDB_LITE
369 if (ShouldNotifyListeners()) {
370 auto finish_ts = std::chrono::steady_clock::now();
371 NotifyOnFileRangeSyncFinish(offset, nbytes, start_ts, finish_ts, s);
372 }
373 #endif
374 return s;
375 }
376
377 // This method writes to disk the specified data and makes use of the rate
378 // limiter if available
379 IOStatus WritableFileWriter::WriteBuffered(const char* data, size_t size) {
380 IOStatus s;
381 assert(!use_direct_io());
382 const char* src = data;
383 size_t left = size;
384
385 while (left > 0) {
386 size_t allowed;
387 if (rate_limiter_ != nullptr) {
388 allowed = rate_limiter_->RequestToken(
389 left, 0 /* alignment */, writable_file_->GetIOPriority(), stats_,
390 RateLimiter::OpType::kWrite);
391 } else {
392 allowed = left;
393 }
394
395 {
396 IOSTATS_TIMER_GUARD(write_nanos);
397 TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
398
399 #ifndef ROCKSDB_LITE
400 FileOperationInfo::StartTimePoint start_ts;
401 uint64_t old_size = writable_file_->GetFileSize(IOOptions(), nullptr);
402 if (ShouldNotifyListeners()) {
403 start_ts = FileOperationInfo::StartNow();
404 old_size = next_write_offset_;
405 }
406 #endif
407 {
408 auto prev_perf_level = GetPerfLevel();
409 IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_);
410 s = writable_file_->Append(Slice(src, allowed), IOOptions(), nullptr);
411 SetPerfLevel(prev_perf_level);
412 }
413 #ifndef ROCKSDB_LITE
414 if (ShouldNotifyListeners()) {
415 auto finish_ts = std::chrono::steady_clock::now();
416 NotifyOnFileWriteFinish(old_size, allowed, start_ts, finish_ts, s);
417 }
418 #endif
419 if (!s.ok()) {
420 return s;
421 }
422 }
423
424 IOSTATS_ADD(bytes_written, allowed);
425 TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0", rocksdb_kill_odds);
426
427 left -= allowed;
428 src += allowed;
429 }
430 buf_.Size(0);
431 return s;
432 }
433
434 void WritableFileWriter::UpdateFileChecksum(const Slice& data) {
435 if (checksum_generator_ != nullptr) {
436 checksum_generator_->Update(data.data(), data.size());
437 }
438 }
439
440 // This flushes the accumulated data in the buffer. We pad data with zeros if
441 // necessary to the whole page.
442 // However, during automatic flushes padding would not be necessary.
443 // We always use RateLimiter if available. We move (Refit) any buffer bytes
444 // that are left over the
445 // whole number of pages to be written again on the next flush because we can
446 // only write on aligned
447 // offsets.
448 #ifndef ROCKSDB_LITE
449 IOStatus WritableFileWriter::WriteDirect() {
450 assert(use_direct_io());
451 IOStatus s;
452 const size_t alignment = buf_.Alignment();
453 assert((next_write_offset_ % alignment) == 0);
454
455 // Calculate whole page final file advance if all writes succeed
456 size_t file_advance = TruncateToPageBoundary(alignment, buf_.CurrentSize());
457
458 // Calculate the leftover tail, we write it here padded with zeros BUT we
459 // will write
460 // it again in the future either on Close() OR when the current whole page
461 // fills out
462 size_t leftover_tail = buf_.CurrentSize() - file_advance;
463
464 // Round up and pad
465 buf_.PadToAlignmentWith(0);
466
467 const char* src = buf_.BufferStart();
468 uint64_t write_offset = next_write_offset_;
469 size_t left = buf_.CurrentSize();
470
471 while (left > 0) {
472 // Check how much is allowed
473 size_t size;
474 if (rate_limiter_ != nullptr) {
475 size = rate_limiter_->RequestToken(left, buf_.Alignment(),
476 writable_file_->GetIOPriority(),
477 stats_, RateLimiter::OpType::kWrite);
478 } else {
479 size = left;
480 }
481
482 {
483 IOSTATS_TIMER_GUARD(write_nanos);
484 TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
485 FileOperationInfo::StartTimePoint start_ts;
486 if (ShouldNotifyListeners()) {
487 start_ts = FileOperationInfo::StartNow();
488 }
489 // direct writes must be positional
490 s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
491 IOOptions(), nullptr);
492 if (ShouldNotifyListeners()) {
493 auto finish_ts = std::chrono::steady_clock::now();
494 NotifyOnFileWriteFinish(write_offset, size, start_ts, finish_ts, s);
495 }
496 if (!s.ok()) {
497 buf_.Size(file_advance + leftover_tail);
498 return s;
499 }
500 }
501
502 IOSTATS_ADD(bytes_written, size);
503 left -= size;
504 src += size;
505 write_offset += size;
506 assert((next_write_offset_ % alignment) == 0);
507 }
508
509 if (s.ok()) {
510 // Move the tail to the beginning of the buffer
511 // This never happens during normal Append but rather during
512 // explicit call to Flush()/Sync() or Close()
513 buf_.RefitTail(file_advance, leftover_tail);
514 // This is where we start writing next time which may or not be
515 // the actual file size on disk. They match if the buffer size
516 // is a multiple of whole pages otherwise filesize_ is leftover_tail
517 // behind
518 next_write_offset_ += file_advance;
519 }
520 return s;
521 }
522 #endif // !ROCKSDB_LITE
523 } // namespace ROCKSDB_NAMESPACE