]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/file/writable_file_writer.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / file / writable_file_writer.cc
CommitLineData
f67539c2
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10#include "file/writable_file_writer.h"
11
12#include <algorithm>
13#include <mutex>
14
15#include "db/version_edit.h"
16#include "monitoring/histogram.h"
17#include "monitoring/iostats_context_imp.h"
18#include "port/port.h"
19#include "test_util/sync_point.h"
20#include "util/random.h"
21#include "util/rate_limiter.h"
22
23namespace ROCKSDB_NAMESPACE {
24Status WritableFileWriter::Append(const Slice& data) {
25 const char* src = data.data();
26 size_t left = data.size();
27 Status s;
28 pending_sync_ = true;
29
30 TEST_KILL_RANDOM("WritableFileWriter::Append:0",
31 rocksdb_kill_odds * REDUCE_ODDS2);
32
33 {
34 IOSTATS_TIMER_GUARD(prepare_write_nanos);
35 TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite");
36 writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left,
37 IOOptions(), nullptr);
38 }
39
40 // See whether we need to enlarge the buffer to avoid the flush
41 if (buf_.Capacity() - buf_.CurrentSize() < left) {
42 for (size_t cap = buf_.Capacity();
43 cap < max_buffer_size_; // There is still room to increase
44 cap *= 2) {
45 // See whether the next available size is large enough.
46 // Buffer will never be increased to more than max_buffer_size_.
47 size_t desired_capacity = std::min(cap * 2, max_buffer_size_);
48 if (desired_capacity - buf_.CurrentSize() >= left ||
49 (use_direct_io() && desired_capacity == max_buffer_size_)) {
50 buf_.AllocateNewBuffer(desired_capacity, true);
51 break;
52 }
53 }
54 }
55
56 // Flush only when buffered I/O
57 if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) {
58 if (buf_.CurrentSize() > 0) {
59 s = Flush();
60 if (!s.ok()) {
61 return s;
62 }
63 }
64 assert(buf_.CurrentSize() == 0);
65 }
66
67 // We never write directly to disk with direct I/O on.
68 // or we simply use it for its original purpose to accumulate many small
69 // chunks
70 if (use_direct_io() || (buf_.Capacity() >= left)) {
71 while (left > 0) {
72 size_t appended = buf_.Append(src, left);
73 left -= appended;
74 src += appended;
75
76 if (left > 0) {
77 s = Flush();
78 if (!s.ok()) {
79 break;
80 }
81 }
82 }
83 } else {
84 // Writing directly to file bypassing the buffer
85 assert(buf_.CurrentSize() == 0);
86 s = WriteBuffered(src, left);
87 }
88
89 TEST_KILL_RANDOM("WritableFileWriter::Append:1", rocksdb_kill_odds);
90 if (s.ok()) {
91 filesize_ += data.size();
92 CalculateFileChecksum(data);
93 }
94 return s;
95}
96
97Status WritableFileWriter::Pad(const size_t pad_bytes) {
98 assert(pad_bytes < kDefaultPageSize);
99 size_t left = pad_bytes;
100 size_t cap = buf_.Capacity() - buf_.CurrentSize();
101
102 // Assume pad_bytes is small compared to buf_ capacity. So we always
103 // use buf_ rather than write directly to file in certain cases like
104 // Append() does.
105 while (left) {
106 size_t append_bytes = std::min(cap, left);
107 buf_.PadWith(append_bytes, 0);
108 left -= append_bytes;
109 if (left > 0) {
110 Status s = Flush();
111 if (!s.ok()) {
112 return s;
113 }
114 }
115 cap = buf_.Capacity() - buf_.CurrentSize();
116 }
117 pending_sync_ = true;
118 filesize_ += pad_bytes;
119 return Status::OK();
120}
121
122Status WritableFileWriter::Close() {
123 // Do not quit immediately on failure the file MUST be closed
124 Status s;
125
126 // Possible to close it twice now as we MUST close
127 // in __dtor, simply flushing is not enough
128 // Windows when pre-allocating does not fill with zeros
129 // also with unbuffered access we also set the end of data.
130 if (!writable_file_) {
131 return s;
132 }
133
134 s = Flush(); // flush cache to OS
135
136 Status interim;
137 // In direct I/O mode we write whole pages so
138 // we need to let the file know where data ends.
139 if (use_direct_io()) {
140 interim = writable_file_->Truncate(filesize_, IOOptions(), nullptr);
141 if (interim.ok()) {
142 interim = writable_file_->Fsync(IOOptions(), nullptr);
143 }
144 if (!interim.ok() && s.ok()) {
145 s = interim;
146 }
147 }
148
149 TEST_KILL_RANDOM("WritableFileWriter::Close:0", rocksdb_kill_odds);
150 interim = writable_file_->Close(IOOptions(), nullptr);
151 if (!interim.ok() && s.ok()) {
152 s = interim;
153 }
154
155 writable_file_.reset();
156 TEST_KILL_RANDOM("WritableFileWriter::Close:1", rocksdb_kill_odds);
157
158 return s;
159}
160
161// write out the cached data to the OS cache or storage if direct I/O
162// enabled
163Status WritableFileWriter::Flush() {
164 Status s;
165 TEST_KILL_RANDOM("WritableFileWriter::Flush:0",
166 rocksdb_kill_odds * REDUCE_ODDS2);
167
168 if (buf_.CurrentSize() > 0) {
169 if (use_direct_io()) {
170#ifndef ROCKSDB_LITE
171 if (pending_sync_) {
172 s = WriteDirect();
173 }
174#endif // !ROCKSDB_LITE
175 } else {
176 s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize());
177 }
178 if (!s.ok()) {
179 return s;
180 }
181 }
182
183 s = writable_file_->Flush(IOOptions(), nullptr);
184
185 if (!s.ok()) {
186 return s;
187 }
188
189 // sync OS cache to disk for every bytes_per_sync_
190 // TODO: give log file and sst file different options (log
191 // files could be potentially cached in OS for their whole
192 // life time, thus we might not want to flush at all).
193
194 // We try to avoid sync to the last 1MB of data. For two reasons:
195 // (1) avoid rewrite the same page that is modified later.
196 // (2) for older version of OS, write can block while writing out
197 // the page.
198 // Xfs does neighbor page flushing outside of the specified ranges. We
199 // need to make sure sync range is far from the write offset.
200 if (!use_direct_io() && bytes_per_sync_) {
201 const uint64_t kBytesNotSyncRange =
202 1024 * 1024; // recent 1MB is not synced.
203 const uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB.
204 if (filesize_ > kBytesNotSyncRange) {
205 uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange;
206 offset_sync_to -= offset_sync_to % kBytesAlignWhenSync;
207 assert(offset_sync_to >= last_sync_size_);
208 if (offset_sync_to > 0 &&
209 offset_sync_to - last_sync_size_ >= bytes_per_sync_) {
210 s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_);
211 last_sync_size_ = offset_sync_to;
212 }
213 }
214 }
215
216 return s;
217}
218
219const char* WritableFileWriter::GetFileChecksumFuncName() const {
220 if (checksum_func_ != nullptr) {
221 return checksum_func_->Name();
222 } else {
223 return kUnknownFileChecksumFuncName.c_str();
224 }
225}
226
227Status WritableFileWriter::Sync(bool use_fsync) {
228 Status s = Flush();
229 if (!s.ok()) {
230 return s;
231 }
232 TEST_KILL_RANDOM("WritableFileWriter::Sync:0", rocksdb_kill_odds);
233 if (!use_direct_io() && pending_sync_) {
234 s = SyncInternal(use_fsync);
235 if (!s.ok()) {
236 return s;
237 }
238 }
239 TEST_KILL_RANDOM("WritableFileWriter::Sync:1", rocksdb_kill_odds);
240 pending_sync_ = false;
241 return Status::OK();
242}
243
244Status WritableFileWriter::SyncWithoutFlush(bool use_fsync) {
245 if (!writable_file_->IsSyncThreadSafe()) {
246 return Status::NotSupported(
247 "Can't WritableFileWriter::SyncWithoutFlush() because "
248 "WritableFile::IsSyncThreadSafe() is false");
249 }
250 TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1");
251 Status s = SyncInternal(use_fsync);
252 TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2");
253 return s;
254}
255
256Status WritableFileWriter::SyncInternal(bool use_fsync) {
257 Status s;
258 IOSTATS_TIMER_GUARD(fsync_nanos);
259 TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0");
260 auto prev_perf_level = GetPerfLevel();
261 IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_);
262 if (use_fsync) {
263 s = writable_file_->Fsync(IOOptions(), nullptr);
264 } else {
265 s = writable_file_->Sync(IOOptions(), nullptr);
266 }
267 SetPerfLevel(prev_perf_level);
268 return s;
269}
270
271Status WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
272 IOSTATS_TIMER_GUARD(range_sync_nanos);
273 TEST_SYNC_POINT("WritableFileWriter::RangeSync:0");
274 return writable_file_->RangeSync(offset, nbytes, IOOptions(), nullptr);
275}
276
277// This method writes to disk the specified data and makes use of the rate
278// limiter if available
279Status WritableFileWriter::WriteBuffered(const char* data, size_t size) {
280 Status s;
281 assert(!use_direct_io());
282 const char* src = data;
283 size_t left = size;
284
285 while (left > 0) {
286 size_t allowed;
287 if (rate_limiter_ != nullptr) {
288 allowed = rate_limiter_->RequestToken(
289 left, 0 /* alignment */, writable_file_->GetIOPriority(), stats_,
290 RateLimiter::OpType::kWrite);
291 } else {
292 allowed = left;
293 }
294
295 {
296 IOSTATS_TIMER_GUARD(write_nanos);
297 TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
298
299#ifndef ROCKSDB_LITE
300 FileOperationInfo::TimePoint start_ts;
301 uint64_t old_size = writable_file_->GetFileSize(IOOptions(), nullptr);
302 if (ShouldNotifyListeners()) {
303 start_ts = std::chrono::system_clock::now();
304 old_size = next_write_offset_;
305 }
306#endif
307 {
308 auto prev_perf_level = GetPerfLevel();
309 IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_);
310 s = writable_file_->Append(Slice(src, allowed), IOOptions(), nullptr);
311 SetPerfLevel(prev_perf_level);
312 }
313#ifndef ROCKSDB_LITE
314 if (ShouldNotifyListeners()) {
315 auto finish_ts = std::chrono::system_clock::now();
316 NotifyOnFileWriteFinish(old_size, allowed, start_ts, finish_ts, s);
317 }
318#endif
319 if (!s.ok()) {
320 return s;
321 }
322 }
323
324 IOSTATS_ADD(bytes_written, allowed);
325 TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0", rocksdb_kill_odds);
326
327 left -= allowed;
328 src += allowed;
329 }
330 buf_.Size(0);
331 return s;
332}
333
334void WritableFileWriter::CalculateFileChecksum(const Slice& data) {
335 if (checksum_func_ != nullptr) {
336 if (is_first_checksum_) {
337 file_checksum_ = checksum_func_->Value(data.data(), data.size());
338 is_first_checksum_ = false;
339 } else {
340 file_checksum_ =
341 checksum_func_->Extend(file_checksum_, data.data(), data.size());
342 }
343 }
344}
345
346// This flushes the accumulated data in the buffer. We pad data with zeros if
347// necessary to the whole page.
348// However, during automatic flushes padding would not be necessary.
349// We always use RateLimiter if available. We move (Refit) any buffer bytes
350// that are left over the
351// whole number of pages to be written again on the next flush because we can
352// only write on aligned
353// offsets.
354#ifndef ROCKSDB_LITE
355Status WritableFileWriter::WriteDirect() {
356 assert(use_direct_io());
357 Status s;
358 const size_t alignment = buf_.Alignment();
359 assert((next_write_offset_ % alignment) == 0);
360
361 // Calculate whole page final file advance if all writes succeed
362 size_t file_advance = TruncateToPageBoundary(alignment, buf_.CurrentSize());
363
364 // Calculate the leftover tail, we write it here padded with zeros BUT we
365 // will write
366 // it again in the future either on Close() OR when the current whole page
367 // fills out
368 size_t leftover_tail = buf_.CurrentSize() - file_advance;
369
370 // Round up and pad
371 buf_.PadToAlignmentWith(0);
372
373 const char* src = buf_.BufferStart();
374 uint64_t write_offset = next_write_offset_;
375 size_t left = buf_.CurrentSize();
376
377 while (left > 0) {
378 // Check how much is allowed
379 size_t size;
380 if (rate_limiter_ != nullptr) {
381 size = rate_limiter_->RequestToken(left, buf_.Alignment(),
382 writable_file_->GetIOPriority(),
383 stats_, RateLimiter::OpType::kWrite);
384 } else {
385 size = left;
386 }
387
388 {
389 IOSTATS_TIMER_GUARD(write_nanos);
390 TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
391 FileOperationInfo::TimePoint start_ts;
392 if (ShouldNotifyListeners()) {
393 start_ts = std::chrono::system_clock::now();
394 }
395 // direct writes must be positional
396 s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
397 IOOptions(), nullptr);
398 if (ShouldNotifyListeners()) {
399 auto finish_ts = std::chrono::system_clock::now();
400 NotifyOnFileWriteFinish(write_offset, size, start_ts, finish_ts, s);
401 }
402 if (!s.ok()) {
403 buf_.Size(file_advance + leftover_tail);
404 return s;
405 }
406 }
407
408 IOSTATS_ADD(bytes_written, size);
409 left -= size;
410 src += size;
411 write_offset += size;
412 assert((next_write_offset_ % alignment) == 0);
413 }
414
415 if (s.ok()) {
416 // Move the tail to the beginning of the buffer
417 // This never happens during normal Append but rather during
418 // explicit call to Flush()/Sync() or Close()
419 buf_.RefitTail(file_advance, leftover_tail);
420 // This is where we start writing next time which may or not be
421 // the actual file size on disk. They match if the buffer size
422 // is a multiple of whole pages otherwise filesize_ is leftover_tail
423 // behind
424 next_write_offset_ += file_advance;
425 }
426 return s;
427}
428#endif // !ROCKSDB_LITE
429} // namespace ROCKSDB_NAMESPACE