1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "db/log_reader.h"
13 #include "rocksdb/env.h"
14 #include "util/coding.h"
15 #include "util/crc32c.h"
16 #include "util/file_reader_writer.h"
17 #include "util/util.h"
22 Reader::Reporter::~Reporter() {
25 Reader::Reader(std::shared_ptr
<Logger
> info_log
,
26 std::unique_ptr
<SequentialFileReader
>&& _file
,
27 Reporter
* reporter
, bool checksum
, uint64_t log_num
)
28 : info_log_(info_log
),
29 file_(std::move(_file
)),
32 backing_store_(new char[kBlockSize
]),
37 last_record_offset_(0),
38 end_of_buffer_offset_(0),
43 delete[] backing_store_
;
46 // For kAbsoluteConsistency, on clean shutdown we don't expect any error
47 // in the log files. For other modes, we can ignore only incomplete records
48 // in the last log file, which are presumably due to a write in progress
49 // during restart (or from log recycling).
51 // TODO krad: Evaluate if we need to move to a more strict mode where we
52 // restrict the inconsistency to only the last log
53 bool Reader::ReadRecord(Slice
* record
, std::string
* scratch
,
54 WALRecoveryMode wal_recovery_mode
) {
57 bool in_fragmented_record
= false;
58 // Record offset of the logical record that we're reading
59 // 0 is a dummy value to make compilers happy
60 uint64_t prospective_record_offset
= 0;
64 uint64_t physical_record_offset
= end_of_buffer_offset_
- buffer_
.size();
66 const unsigned int record_type
= ReadPhysicalRecord(&fragment
, &drop_size
);
67 switch (record_type
) {
69 case kRecyclableFullType
:
70 if (in_fragmented_record
&& !scratch
->empty()) {
71 // Handle bug in earlier versions of log::Writer where
72 // it could emit an empty kFirstType record at the tail end
73 // of a block followed by a kFullType or kFirstType record
74 // at the beginning of the next block.
75 ReportCorruption(scratch
->size(), "partial record without end(1)");
77 prospective_record_offset
= physical_record_offset
;
80 last_record_offset_
= prospective_record_offset
;
84 case kRecyclableFirstType
:
85 if (in_fragmented_record
&& !scratch
->empty()) {
86 // Handle bug in earlier versions of log::Writer where
87 // it could emit an empty kFirstType record at the tail end
88 // of a block followed by a kFullType or kFirstType record
89 // at the beginning of the next block.
90 ReportCorruption(scratch
->size(), "partial record without end(2)");
92 prospective_record_offset
= physical_record_offset
;
93 scratch
->assign(fragment
.data(), fragment
.size());
94 in_fragmented_record
= true;
98 case kRecyclableMiddleType
:
99 if (!in_fragmented_record
) {
100 ReportCorruption(fragment
.size(),
101 "missing start of fragmented record(1)");
103 scratch
->append(fragment
.data(), fragment
.size());
108 case kRecyclableLastType
:
109 if (!in_fragmented_record
) {
110 ReportCorruption(fragment
.size(),
111 "missing start of fragmented record(2)");
113 scratch
->append(fragment
.data(), fragment
.size());
114 *record
= Slice(*scratch
);
115 last_record_offset_
= prospective_record_offset
;
121 if (wal_recovery_mode
== WALRecoveryMode::kAbsoluteConsistency
) {
122 // in clean shutdown we don't expect any error in the log files
123 ReportCorruption(drop_size
, "truncated header");
125 FALLTHROUGH_INTENDED
;
128 if (in_fragmented_record
) {
129 if (wal_recovery_mode
== WALRecoveryMode::kAbsoluteConsistency
) {
130 // in clean shutdown we don't expect any error in the log files
131 ReportCorruption(scratch
->size(), "error reading trailing data");
133 // This can be caused by the writer dying immediately after
134 // writing a physical record but before completing the next; don't
135 // treat it as a corruption, just ignore the entire logical record.
141 if (wal_recovery_mode
!= WALRecoveryMode::kSkipAnyCorruptedRecords
) {
142 // Treat a record from a previous instance of the log as EOF.
143 if (in_fragmented_record
) {
144 if (wal_recovery_mode
== WALRecoveryMode::kAbsoluteConsistency
) {
145 // in clean shutdown we don't expect any error in the log files
146 ReportCorruption(scratch
->size(), "error reading trailing data");
148 // This can be caused by the writer dying immediately after
149 // writing a physical record but before completing the next; don't
150 // treat it as a corruption, just ignore the entire logical record.
155 FALLTHROUGH_INTENDED
;
158 if (in_fragmented_record
) {
159 ReportCorruption(scratch
->size(), "error in middle of record");
160 in_fragmented_record
= false;
166 case kBadRecordChecksum
:
169 WALRecoveryMode::kTolerateCorruptedTailRecords
) {
173 if (record_type
== kBadRecordLen
) {
174 ReportCorruption(drop_size
, "bad record length");
176 ReportCorruption(drop_size
, "checksum mismatch");
178 if (in_fragmented_record
) {
179 ReportCorruption(scratch
->size(), "error in middle of record");
180 in_fragmented_record
= false;
187 snprintf(buf
, sizeof(buf
), "unknown record type %u", record_type
);
189 (fragment
.size() + (in_fragmented_record
? scratch
->size() : 0)),
191 in_fragmented_record
= false;
200 uint64_t Reader::LastRecordOffset() {
201 return last_record_offset_
;
204 void Reader::UnmarkEOF() {
209 if (eof_offset_
== 0) {
215 void Reader::UnmarkEOFInternal() {
216 // If the EOF was in the middle of a block (a partial block was read) we have
217 // to read the rest of the block as ReadPhysicalRecord can only read full
218 // blocks and expects the file position indicator to be aligned to the start
221 // consumed_bytes + buffer_size() + remaining == kBlockSize
223 size_t consumed_bytes
= eof_offset_
- buffer_
.size();
224 size_t remaining
= kBlockSize
- eof_offset_
;
226 // backing_store_ is used to concatenate what is left in buffer_ and
227 // the remainder of the block. If buffer_ already uses backing_store_,
228 // we just append the new data.
229 if (buffer_
.data() != backing_store_
+ consumed_bytes
) {
230 // Buffer_ does not use backing_store_ for storage.
231 // Copy what is left in buffer_ to backing_store.
232 memmove(backing_store_
+ consumed_bytes
, buffer_
.data(), buffer_
.size());
236 Status status
= file_
->Read(remaining
, &read_buffer
,
237 backing_store_
+ eof_offset_
);
239 size_t added
= read_buffer
.size();
240 end_of_buffer_offset_
+= added
;
244 ReportDrop(added
, status
);
251 if (read_buffer
.data() != backing_store_
+ eof_offset_
) {
252 // Read did not write to backing_store_
253 memmove(backing_store_
+ eof_offset_
, read_buffer
.data(),
257 buffer_
= Slice(backing_store_
+ consumed_bytes
,
258 eof_offset_
+ added
- consumed_bytes
);
260 if (added
< remaining
) {
262 eof_offset_
+= added
;
268 void Reader::ReportCorruption(size_t bytes
, const char* reason
) {
269 ReportDrop(bytes
, Status::Corruption(reason
));
272 void Reader::ReportDrop(size_t bytes
, const Status
& reason
) {
273 if (reporter_
!= nullptr) {
274 reporter_
->Corruption(bytes
, reason
);
278 bool Reader::ReadMore(size_t* drop_size
, int *error
) {
279 if (!eof_
&& !read_error_
) {
280 // Last read was a full read, so this is a trailer to skip
282 Status status
= file_
->Read(kBlockSize
, &buffer_
, backing_store_
);
283 end_of_buffer_offset_
+= buffer_
.size();
286 ReportDrop(kBlockSize
, status
);
290 } else if (buffer_
.size() < static_cast<size_t>(kBlockSize
)) {
292 eof_offset_
= buffer_
.size();
296 // Note that if buffer_ is non-empty, we have a truncated header at the
297 // end of the file, which can be caused by the writer crashing in the
298 // middle of writing the header. Unless explicitly requested we don't
299 // considering this an error, just report EOF.
300 if (buffer_
.size()) {
301 *drop_size
= buffer_
.size();
312 unsigned int Reader::ReadPhysicalRecord(Slice
* result
, size_t* drop_size
) {
314 // We need at least the minimum header size
315 if (buffer_
.size() < static_cast<size_t>(kHeaderSize
)) {
316 // the default value of r is meaningless because ReadMore will overwrite
317 // it if it returns false; in case it returns true, the return value will
318 // not be used anyway
320 if (!ReadMore(drop_size
, &r
)) {
327 const char* header
= buffer_
.data();
328 const uint32_t a
= static_cast<uint32_t>(header
[4]) & 0xff;
329 const uint32_t b
= static_cast<uint32_t>(header
[5]) & 0xff;
330 const unsigned int type
= header
[6];
331 const uint32_t length
= a
| (b
<< 8);
332 int header_size
= kHeaderSize
;
333 if (type
>= kRecyclableFullType
&& type
<= kRecyclableLastType
) {
334 if (end_of_buffer_offset_
- buffer_
.size() == 0) {
337 header_size
= kRecyclableHeaderSize
;
338 // We need enough for the larger header
339 if (buffer_
.size() < static_cast<size_t>(kRecyclableHeaderSize
)) {
341 if (!ReadMore(drop_size
, &r
)) {
346 const uint32_t log_num
= DecodeFixed32(header
+ 7);
347 if (log_num
!= log_number_
) {
351 if (header_size
+ length
> buffer_
.size()) {
352 *drop_size
= buffer_
.size();
355 return kBadRecordLen
;
357 // If the end of the file has been reached without reading |length|
358 // bytes of payload, assume the writer died in the middle of writing the
359 // record. Don't report a corruption unless requested.
366 if (type
== kZeroType
&& length
== 0) {
367 // Skip zero length record without reporting any drops since
368 // such records are produced by the mmap based writing code in
369 // env_posix.cc that preallocates file regions.
370 // NOTE: this should never happen in DB written by new RocksDB versions,
371 // since we turn off mmap writes to manifest and log files
378 uint32_t expected_crc
= crc32c::Unmask(DecodeFixed32(header
));
379 uint32_t actual_crc
= crc32c::Value(header
+ 6, length
+ header_size
- 6);
380 if (actual_crc
!= expected_crc
) {
381 // Drop the rest of the buffer since "length" itself may have
382 // been corrupted and if we trust it, we could find some
383 // fragment of a real log record that just happens to look
384 // like a valid log record.
385 *drop_size
= buffer_
.size();
387 return kBadRecordChecksum
;
391 buffer_
.remove_prefix(header_size
+ length
);
393 *result
= Slice(header
+ header_size
, length
);
398 bool FragmentBufferedReader::ReadRecord(Slice
* record
, std::string
* scratch
,
399 WALRecoveryMode
/*unused*/) {
400 assert(record
!= nullptr);
401 assert(scratch
!= nullptr);
405 uint64_t prospective_record_offset
= 0;
406 uint64_t physical_record_offset
= end_of_buffer_offset_
- buffer_
.size();
407 size_t drop_size
= 0;
408 unsigned int fragment_type_or_err
= 0; // Initialize to make compiler happy
410 while (TryReadFragment(&fragment
, &drop_size
, &fragment_type_or_err
)) {
411 switch (fragment_type_or_err
) {
413 case kRecyclableFullType
:
414 if (in_fragmented_record_
&& !fragments_
.empty()) {
415 ReportCorruption(fragments_
.size(), "partial record without end(1)");
419 prospective_record_offset
= physical_record_offset
;
420 last_record_offset_
= prospective_record_offset
;
421 in_fragmented_record_
= false;
425 case kRecyclableFirstType
:
426 if (in_fragmented_record_
|| !fragments_
.empty()) {
427 ReportCorruption(fragments_
.size(), "partial record without end(2)");
429 prospective_record_offset
= physical_record_offset
;
430 fragments_
.assign(fragment
.data(), fragment
.size());
431 in_fragmented_record_
= true;
435 case kRecyclableMiddleType
:
436 if (!in_fragmented_record_
) {
437 ReportCorruption(fragment
.size(),
438 "missing start of fragmented record(1)");
440 fragments_
.append(fragment
.data(), fragment
.size());
445 case kRecyclableLastType
:
446 if (!in_fragmented_record_
) {
447 ReportCorruption(fragment
.size(),
448 "missing start of fragmented record(2)");
450 fragments_
.append(fragment
.data(), fragment
.size());
451 scratch
->assign(fragments_
.data(), fragments_
.size());
453 *record
= Slice(*scratch
);
454 last_record_offset_
= prospective_record_offset
;
455 in_fragmented_record_
= false;
464 if (in_fragmented_record_
) {
465 ReportCorruption(fragments_
.size(), "error in middle of record");
466 in_fragmented_record_
= false;
471 case kBadRecordChecksum
:
476 ReportCorruption(drop_size
, "checksum mismatch");
477 if (in_fragmented_record_
) {
478 ReportCorruption(fragments_
.size(), "error in middle of record");
479 in_fragmented_record_
= false;
486 snprintf(buf
, sizeof(buf
), "unknown record type %u",
487 fragment_type_or_err
);
489 fragment
.size() + (in_fragmented_record_
? fragments_
.size() : 0),
491 in_fragmented_record_
= false;
500 void FragmentBufferedReader::UnmarkEOF() {
508 bool FragmentBufferedReader::TryReadMore(size_t* drop_size
, int* error
) {
509 if (!eof_
&& !read_error_
) {
510 // Last read was a full read, so this is a trailer to skip
512 Status status
= file_
->Read(kBlockSize
, &buffer_
, backing_store_
);
513 end_of_buffer_offset_
+= buffer_
.size();
516 ReportDrop(kBlockSize
, status
);
520 } else if (buffer_
.size() < static_cast<size_t>(kBlockSize
)) {
522 eof_offset_
= buffer_
.size();
523 TEST_SYNC_POINT_CALLBACK(
524 "FragmentBufferedLogReader::TryReadMore:FirstEOF", nullptr);
527 } else if (!read_error_
) {
534 *drop_size
= buffer_
.size();
535 if (buffer_
.size() > 0) {
542 // return true if the caller should process the fragment_type_or_err.
543 bool FragmentBufferedReader::TryReadFragment(
544 Slice
* fragment
, size_t* drop_size
, unsigned int* fragment_type_or_err
) {
545 assert(fragment
!= nullptr);
546 assert(drop_size
!= nullptr);
547 assert(fragment_type_or_err
!= nullptr);
549 while (buffer_
.size() < static_cast<size_t>(kHeaderSize
)) {
550 size_t old_size
= buffer_
.size();
552 if (!TryReadMore(drop_size
, &error
)) {
553 *fragment_type_or_err
= error
;
555 } else if (old_size
== buffer_
.size()) {
559 const char* header
= buffer_
.data();
560 const uint32_t a
= static_cast<uint32_t>(header
[4]) & 0xff;
561 const uint32_t b
= static_cast<uint32_t>(header
[5]) & 0xff;
562 const unsigned int type
= header
[6];
563 const uint32_t length
= a
| (b
<< 8);
564 int header_size
= kHeaderSize
;
565 if (type
>= kRecyclableFullType
&& type
<= kRecyclableLastType
) {
566 if (end_of_buffer_offset_
- buffer_
.size() == 0) {
569 header_size
= kRecyclableHeaderSize
;
570 while (buffer_
.size() < static_cast<size_t>(kRecyclableHeaderSize
)) {
571 size_t old_size
= buffer_
.size();
573 if (!TryReadMore(drop_size
, &error
)) {
574 *fragment_type_or_err
= error
;
576 } else if (old_size
== buffer_
.size()) {
580 const uint32_t log_num
= DecodeFixed32(header
+ 7);
581 if (log_num
!= log_number_
) {
582 *fragment_type_or_err
= kOldRecord
;
587 while (header_size
+ length
> buffer_
.size()) {
588 size_t old_size
= buffer_
.size();
590 if (!TryReadMore(drop_size
, &error
)) {
591 *fragment_type_or_err
= error
;
593 } else if (old_size
== buffer_
.size()) {
598 if (type
== kZeroType
&& length
== 0) {
600 *fragment_type_or_err
= kBadRecord
;
605 uint32_t expected_crc
= crc32c::Unmask(DecodeFixed32(header
));
606 uint32_t actual_crc
= crc32c::Value(header
+ 6, length
+ header_size
- 6);
607 if (actual_crc
!= expected_crc
) {
608 *drop_size
= buffer_
.size();
610 *fragment_type_or_err
= kBadRecordChecksum
;
615 buffer_
.remove_prefix(header_size
+ length
);
617 *fragment
= Slice(header
+ header_size
, length
);
618 *fragment_type_or_err
= type
;
623 } // namespace rocksdb