]>
Commit | Line | Data |
---|---|---|
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. | |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | // | |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | ||
10 | #include "file/writable_file_writer.h" | |
11 | ||
12 | #include <algorithm> | |
13 | #include <mutex> | |
14 | ||
15 | #include "db/version_edit.h" | |
16 | #include "monitoring/histogram.h" | |
17 | #include "monitoring/iostats_context_imp.h" | |
18 | #include "port/port.h" | |
19 | #include "test_util/sync_point.h" | |
20 | #include "util/random.h" | |
21 | #include "util/rate_limiter.h" | |
22 | ||
23 | namespace ROCKSDB_NAMESPACE { | |
24 | Status WritableFileWriter::Append(const Slice& data) { | |
25 | const char* src = data.data(); | |
26 | size_t left = data.size(); | |
27 | Status s; | |
28 | pending_sync_ = true; | |
29 | ||
30 | TEST_KILL_RANDOM("WritableFileWriter::Append:0", | |
31 | rocksdb_kill_odds * REDUCE_ODDS2); | |
32 | ||
33 | { | |
34 | IOSTATS_TIMER_GUARD(prepare_write_nanos); | |
35 | TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite"); | |
36 | writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left, | |
37 | IOOptions(), nullptr); | |
38 | } | |
39 | ||
40 | // See whether we need to enlarge the buffer to avoid the flush | |
41 | if (buf_.Capacity() - buf_.CurrentSize() < left) { | |
42 | for (size_t cap = buf_.Capacity(); | |
43 | cap < max_buffer_size_; // There is still room to increase | |
44 | cap *= 2) { | |
45 | // See whether the next available size is large enough. | |
46 | // Buffer will never be increased to more than max_buffer_size_. | |
47 | size_t desired_capacity = std::min(cap * 2, max_buffer_size_); | |
48 | if (desired_capacity - buf_.CurrentSize() >= left || | |
49 | (use_direct_io() && desired_capacity == max_buffer_size_)) { | |
50 | buf_.AllocateNewBuffer(desired_capacity, true); | |
51 | break; | |
52 | } | |
53 | } | |
54 | } | |
55 | ||
56 | // Flush only when buffered I/O | |
57 | if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) { | |
58 | if (buf_.CurrentSize() > 0) { | |
59 | s = Flush(); | |
60 | if (!s.ok()) { | |
61 | return s; | |
62 | } | |
63 | } | |
64 | assert(buf_.CurrentSize() == 0); | |
65 | } | |
66 | ||
67 | // We never write directly to disk with direct I/O on. | |
68 | // or we simply use it for its original purpose to accumulate many small | |
69 | // chunks | |
70 | if (use_direct_io() || (buf_.Capacity() >= left)) { | |
71 | while (left > 0) { | |
72 | size_t appended = buf_.Append(src, left); | |
73 | left -= appended; | |
74 | src += appended; | |
75 | ||
76 | if (left > 0) { | |
77 | s = Flush(); | |
78 | if (!s.ok()) { | |
79 | break; | |
80 | } | |
81 | } | |
82 | } | |
83 | } else { | |
84 | // Writing directly to file bypassing the buffer | |
85 | assert(buf_.CurrentSize() == 0); | |
86 | s = WriteBuffered(src, left); | |
87 | } | |
88 | ||
89 | TEST_KILL_RANDOM("WritableFileWriter::Append:1", rocksdb_kill_odds); | |
90 | if (s.ok()) { | |
91 | filesize_ += data.size(); | |
92 | CalculateFileChecksum(data); | |
93 | } | |
94 | return s; | |
95 | } | |
96 | ||
97 | Status WritableFileWriter::Pad(const size_t pad_bytes) { | |
98 | assert(pad_bytes < kDefaultPageSize); | |
99 | size_t left = pad_bytes; | |
100 | size_t cap = buf_.Capacity() - buf_.CurrentSize(); | |
101 | ||
102 | // Assume pad_bytes is small compared to buf_ capacity. So we always | |
103 | // use buf_ rather than write directly to file in certain cases like | |
104 | // Append() does. | |
105 | while (left) { | |
106 | size_t append_bytes = std::min(cap, left); | |
107 | buf_.PadWith(append_bytes, 0); | |
108 | left -= append_bytes; | |
109 | if (left > 0) { | |
110 | Status s = Flush(); | |
111 | if (!s.ok()) { | |
112 | return s; | |
113 | } | |
114 | } | |
115 | cap = buf_.Capacity() - buf_.CurrentSize(); | |
116 | } | |
117 | pending_sync_ = true; | |
118 | filesize_ += pad_bytes; | |
119 | return Status::OK(); | |
120 | } | |
121 | ||
122 | Status WritableFileWriter::Close() { | |
123 | // Do not quit immediately on failure the file MUST be closed | |
124 | Status s; | |
125 | ||
126 | // Possible to close it twice now as we MUST close | |
127 | // in __dtor, simply flushing is not enough | |
128 | // Windows when pre-allocating does not fill with zeros | |
129 | // also with unbuffered access we also set the end of data. | |
130 | if (!writable_file_) { | |
131 | return s; | |
132 | } | |
133 | ||
134 | s = Flush(); // flush cache to OS | |
135 | ||
136 | Status interim; | |
137 | // In direct I/O mode we write whole pages so | |
138 | // we need to let the file know where data ends. | |
139 | if (use_direct_io()) { | |
140 | interim = writable_file_->Truncate(filesize_, IOOptions(), nullptr); | |
141 | if (interim.ok()) { | |
142 | interim = writable_file_->Fsync(IOOptions(), nullptr); | |
143 | } | |
144 | if (!interim.ok() && s.ok()) { | |
145 | s = interim; | |
146 | } | |
147 | } | |
148 | ||
149 | TEST_KILL_RANDOM("WritableFileWriter::Close:0", rocksdb_kill_odds); | |
150 | interim = writable_file_->Close(IOOptions(), nullptr); | |
151 | if (!interim.ok() && s.ok()) { | |
152 | s = interim; | |
153 | } | |
154 | ||
155 | writable_file_.reset(); | |
156 | TEST_KILL_RANDOM("WritableFileWriter::Close:1", rocksdb_kill_odds); | |
157 | ||
158 | return s; | |
159 | } | |
160 | ||
161 | // write out the cached data to the OS cache or storage if direct I/O | |
162 | // enabled | |
163 | Status WritableFileWriter::Flush() { | |
164 | Status s; | |
165 | TEST_KILL_RANDOM("WritableFileWriter::Flush:0", | |
166 | rocksdb_kill_odds * REDUCE_ODDS2); | |
167 | ||
168 | if (buf_.CurrentSize() > 0) { | |
169 | if (use_direct_io()) { | |
170 | #ifndef ROCKSDB_LITE | |
171 | if (pending_sync_) { | |
172 | s = WriteDirect(); | |
173 | } | |
174 | #endif // !ROCKSDB_LITE | |
175 | } else { | |
176 | s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize()); | |
177 | } | |
178 | if (!s.ok()) { | |
179 | return s; | |
180 | } | |
181 | } | |
182 | ||
183 | s = writable_file_->Flush(IOOptions(), nullptr); | |
184 | ||
185 | if (!s.ok()) { | |
186 | return s; | |
187 | } | |
188 | ||
189 | // sync OS cache to disk for every bytes_per_sync_ | |
190 | // TODO: give log file and sst file different options (log | |
191 | // files could be potentially cached in OS for their whole | |
192 | // life time, thus we might not want to flush at all). | |
193 | ||
194 | // We try to avoid sync to the last 1MB of data. For two reasons: | |
195 | // (1) avoid rewrite the same page that is modified later. | |
196 | // (2) for older version of OS, write can block while writing out | |
197 | // the page. | |
198 | // Xfs does neighbor page flushing outside of the specified ranges. We | |
199 | // need to make sure sync range is far from the write offset. | |
200 | if (!use_direct_io() && bytes_per_sync_) { | |
201 | const uint64_t kBytesNotSyncRange = | |
202 | 1024 * 1024; // recent 1MB is not synced. | |
203 | const uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB. | |
204 | if (filesize_ > kBytesNotSyncRange) { | |
205 | uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange; | |
206 | offset_sync_to -= offset_sync_to % kBytesAlignWhenSync; | |
207 | assert(offset_sync_to >= last_sync_size_); | |
208 | if (offset_sync_to > 0 && | |
209 | offset_sync_to - last_sync_size_ >= bytes_per_sync_) { | |
210 | s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_); | |
211 | last_sync_size_ = offset_sync_to; | |
212 | } | |
213 | } | |
214 | } | |
215 | ||
216 | return s; | |
217 | } | |
218 | ||
219 | const char* WritableFileWriter::GetFileChecksumFuncName() const { | |
220 | if (checksum_func_ != nullptr) { | |
221 | return checksum_func_->Name(); | |
222 | } else { | |
223 | return kUnknownFileChecksumFuncName.c_str(); | |
224 | } | |
225 | } | |
226 | ||
227 | Status WritableFileWriter::Sync(bool use_fsync) { | |
228 | Status s = Flush(); | |
229 | if (!s.ok()) { | |
230 | return s; | |
231 | } | |
232 | TEST_KILL_RANDOM("WritableFileWriter::Sync:0", rocksdb_kill_odds); | |
233 | if (!use_direct_io() && pending_sync_) { | |
234 | s = SyncInternal(use_fsync); | |
235 | if (!s.ok()) { | |
236 | return s; | |
237 | } | |
238 | } | |
239 | TEST_KILL_RANDOM("WritableFileWriter::Sync:1", rocksdb_kill_odds); | |
240 | pending_sync_ = false; | |
241 | return Status::OK(); | |
242 | } | |
243 | ||
244 | Status WritableFileWriter::SyncWithoutFlush(bool use_fsync) { | |
245 | if (!writable_file_->IsSyncThreadSafe()) { | |
246 | return Status::NotSupported( | |
247 | "Can't WritableFileWriter::SyncWithoutFlush() because " | |
248 | "WritableFile::IsSyncThreadSafe() is false"); | |
249 | } | |
250 | TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1"); | |
251 | Status s = SyncInternal(use_fsync); | |
252 | TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2"); | |
253 | return s; | |
254 | } | |
255 | ||
256 | Status WritableFileWriter::SyncInternal(bool use_fsync) { | |
257 | Status s; | |
258 | IOSTATS_TIMER_GUARD(fsync_nanos); | |
259 | TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0"); | |
260 | auto prev_perf_level = GetPerfLevel(); | |
261 | IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_); | |
262 | if (use_fsync) { | |
263 | s = writable_file_->Fsync(IOOptions(), nullptr); | |
264 | } else { | |
265 | s = writable_file_->Sync(IOOptions(), nullptr); | |
266 | } | |
267 | SetPerfLevel(prev_perf_level); | |
268 | return s; | |
269 | } | |
270 | ||
271 | Status WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { | |
272 | IOSTATS_TIMER_GUARD(range_sync_nanos); | |
273 | TEST_SYNC_POINT("WritableFileWriter::RangeSync:0"); | |
274 | return writable_file_->RangeSync(offset, nbytes, IOOptions(), nullptr); | |
275 | } | |
276 | ||
277 | // This method writes to disk the specified data and makes use of the rate | |
278 | // limiter if available | |
279 | Status WritableFileWriter::WriteBuffered(const char* data, size_t size) { | |
280 | Status s; | |
281 | assert(!use_direct_io()); | |
282 | const char* src = data; | |
283 | size_t left = size; | |
284 | ||
285 | while (left > 0) { | |
286 | size_t allowed; | |
287 | if (rate_limiter_ != nullptr) { | |
288 | allowed = rate_limiter_->RequestToken( | |
289 | left, 0 /* alignment */, writable_file_->GetIOPriority(), stats_, | |
290 | RateLimiter::OpType::kWrite); | |
291 | } else { | |
292 | allowed = left; | |
293 | } | |
294 | ||
295 | { | |
296 | IOSTATS_TIMER_GUARD(write_nanos); | |
297 | TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); | |
298 | ||
299 | #ifndef ROCKSDB_LITE | |
300 | FileOperationInfo::TimePoint start_ts; | |
301 | uint64_t old_size = writable_file_->GetFileSize(IOOptions(), nullptr); | |
302 | if (ShouldNotifyListeners()) { | |
303 | start_ts = std::chrono::system_clock::now(); | |
304 | old_size = next_write_offset_; | |
305 | } | |
306 | #endif | |
307 | { | |
308 | auto prev_perf_level = GetPerfLevel(); | |
309 | IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_); | |
310 | s = writable_file_->Append(Slice(src, allowed), IOOptions(), nullptr); | |
311 | SetPerfLevel(prev_perf_level); | |
312 | } | |
313 | #ifndef ROCKSDB_LITE | |
314 | if (ShouldNotifyListeners()) { | |
315 | auto finish_ts = std::chrono::system_clock::now(); | |
316 | NotifyOnFileWriteFinish(old_size, allowed, start_ts, finish_ts, s); | |
317 | } | |
318 | #endif | |
319 | if (!s.ok()) { | |
320 | return s; | |
321 | } | |
322 | } | |
323 | ||
324 | IOSTATS_ADD(bytes_written, allowed); | |
325 | TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0", rocksdb_kill_odds); | |
326 | ||
327 | left -= allowed; | |
328 | src += allowed; | |
329 | } | |
330 | buf_.Size(0); | |
331 | return s; | |
332 | } | |
333 | ||
334 | void WritableFileWriter::CalculateFileChecksum(const Slice& data) { | |
335 | if (checksum_func_ != nullptr) { | |
336 | if (is_first_checksum_) { | |
337 | file_checksum_ = checksum_func_->Value(data.data(), data.size()); | |
338 | is_first_checksum_ = false; | |
339 | } else { | |
340 | file_checksum_ = | |
341 | checksum_func_->Extend(file_checksum_, data.data(), data.size()); | |
342 | } | |
343 | } | |
344 | } | |
345 | ||
346 | // This flushes the accumulated data in the buffer. We pad data with zeros if | |
347 | // necessary to the whole page. | |
348 | // However, during automatic flushes padding would not be necessary. | |
349 | // We always use RateLimiter if available. We move (Refit) any buffer bytes | |
350 | // that are left over the | |
351 | // whole number of pages to be written again on the next flush because we can | |
352 | // only write on aligned | |
353 | // offsets. | |
354 | #ifndef ROCKSDB_LITE | |
355 | Status WritableFileWriter::WriteDirect() { | |
356 | assert(use_direct_io()); | |
357 | Status s; | |
358 | const size_t alignment = buf_.Alignment(); | |
359 | assert((next_write_offset_ % alignment) == 0); | |
360 | ||
361 | // Calculate whole page final file advance if all writes succeed | |
362 | size_t file_advance = TruncateToPageBoundary(alignment, buf_.CurrentSize()); | |
363 | ||
364 | // Calculate the leftover tail, we write it here padded with zeros BUT we | |
365 | // will write | |
366 | // it again in the future either on Close() OR when the current whole page | |
367 | // fills out | |
368 | size_t leftover_tail = buf_.CurrentSize() - file_advance; | |
369 | ||
370 | // Round up and pad | |
371 | buf_.PadToAlignmentWith(0); | |
372 | ||
373 | const char* src = buf_.BufferStart(); | |
374 | uint64_t write_offset = next_write_offset_; | |
375 | size_t left = buf_.CurrentSize(); | |
376 | ||
377 | while (left > 0) { | |
378 | // Check how much is allowed | |
379 | size_t size; | |
380 | if (rate_limiter_ != nullptr) { | |
381 | size = rate_limiter_->RequestToken(left, buf_.Alignment(), | |
382 | writable_file_->GetIOPriority(), | |
383 | stats_, RateLimiter::OpType::kWrite); | |
384 | } else { | |
385 | size = left; | |
386 | } | |
387 | ||
388 | { | |
389 | IOSTATS_TIMER_GUARD(write_nanos); | |
390 | TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); | |
391 | FileOperationInfo::TimePoint start_ts; | |
392 | if (ShouldNotifyListeners()) { | |
393 | start_ts = std::chrono::system_clock::now(); | |
394 | } | |
395 | // direct writes must be positional | |
396 | s = writable_file_->PositionedAppend(Slice(src, size), write_offset, | |
397 | IOOptions(), nullptr); | |
398 | if (ShouldNotifyListeners()) { | |
399 | auto finish_ts = std::chrono::system_clock::now(); | |
400 | NotifyOnFileWriteFinish(write_offset, size, start_ts, finish_ts, s); | |
401 | } | |
402 | if (!s.ok()) { | |
403 | buf_.Size(file_advance + leftover_tail); | |
404 | return s; | |
405 | } | |
406 | } | |
407 | ||
408 | IOSTATS_ADD(bytes_written, size); | |
409 | left -= size; | |
410 | src += size; | |
411 | write_offset += size; | |
412 | assert((next_write_offset_ % alignment) == 0); | |
413 | } | |
414 | ||
415 | if (s.ok()) { | |
416 | // Move the tail to the beginning of the buffer | |
417 | // This never happens during normal Append but rather during | |
418 | // explicit call to Flush()/Sync() or Close() | |
419 | buf_.RefitTail(file_advance, leftover_tail); | |
420 | // This is where we start writing next time which may or not be | |
421 | // the actual file size on disk. They match if the buffer size | |
422 | // is a multiple of whole pages otherwise filesize_ is leftover_tail | |
423 | // behind | |
424 | next_write_offset_ += file_advance; | |
425 | } | |
426 | return s; | |
427 | } | |
428 | #endif // !ROCKSDB_LITE | |
429 | } // namespace ROCKSDB_NAMESPACE |