1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "db/log_reader.h"
13 #include "file/sequence_file_reader.h"
14 #include "port/lang.h"
15 #include "rocksdb/env.h"
16 #include "test_util/sync_point.h"
17 #include "util/coding.h"
18 #include "util/crc32c.h"
20 namespace ROCKSDB_NAMESPACE
{
23 Reader::Reporter::~Reporter() {
26 Reader::Reader(std::shared_ptr
<Logger
> info_log
,
27 std::unique_ptr
<SequentialFileReader
>&& _file
,
28 Reporter
* reporter
, bool checksum
, uint64_t log_num
)
29 : info_log_(info_log
),
30 file_(std::move(_file
)),
33 backing_store_(new char[kBlockSize
]),
38 last_record_offset_(0),
39 end_of_buffer_offset_(0),
44 delete[] backing_store_
;
47 // For kAbsoluteConsistency, on clean shutdown we don't expect any error
48 // in the log files. For other modes, we can ignore only incomplete records
49 // in the last log file, which are presumably due to a write in progress
50 // during restart (or from log recycling).
52 // TODO krad: Evaluate if we need to move to a more strict mode where we
53 // restrict the inconsistency to only the last log
54 bool Reader::ReadRecord(Slice
* record
, std::string
* scratch
,
55 WALRecoveryMode wal_recovery_mode
) {
58 bool in_fragmented_record
= false;
59 // Record offset of the logical record that we're reading
60 // 0 is a dummy value to make compilers happy
61 uint64_t prospective_record_offset
= 0;
65 uint64_t physical_record_offset
= end_of_buffer_offset_
- buffer_
.size();
67 const unsigned int record_type
= ReadPhysicalRecord(&fragment
, &drop_size
);
68 switch (record_type
) {
70 case kRecyclableFullType
:
71 if (in_fragmented_record
&& !scratch
->empty()) {
72 // Handle bug in earlier versions of log::Writer where
73 // it could emit an empty kFirstType record at the tail end
74 // of a block followed by a kFullType or kFirstType record
75 // at the beginning of the next block.
76 ReportCorruption(scratch
->size(), "partial record without end(1)");
78 prospective_record_offset
= physical_record_offset
;
81 last_record_offset_
= prospective_record_offset
;
85 case kRecyclableFirstType
:
86 if (in_fragmented_record
&& !scratch
->empty()) {
87 // Handle bug in earlier versions of log::Writer where
88 // it could emit an empty kFirstType record at the tail end
89 // of a block followed by a kFullType or kFirstType record
90 // at the beginning of the next block.
91 ReportCorruption(scratch
->size(), "partial record without end(2)");
93 prospective_record_offset
= physical_record_offset
;
94 scratch
->assign(fragment
.data(), fragment
.size());
95 in_fragmented_record
= true;
99 case kRecyclableMiddleType
:
100 if (!in_fragmented_record
) {
101 ReportCorruption(fragment
.size(),
102 "missing start of fragmented record(1)");
104 scratch
->append(fragment
.data(), fragment
.size());
109 case kRecyclableLastType
:
110 if (!in_fragmented_record
) {
111 ReportCorruption(fragment
.size(),
112 "missing start of fragmented record(2)");
114 scratch
->append(fragment
.data(), fragment
.size());
115 *record
= Slice(*scratch
);
116 last_record_offset_
= prospective_record_offset
;
122 if (wal_recovery_mode
== WALRecoveryMode::kAbsoluteConsistency
||
123 wal_recovery_mode
== WALRecoveryMode::kPointInTimeRecovery
) {
124 // In clean shutdown we don't expect any error in the log files.
125 // In point-in-time recovery an incomplete record at the end could
126 // produce a hole in the recovered data. Report an error here, which
127 // higher layers can choose to ignore when it's provable there is no
129 ReportCorruption(drop_size
, "truncated header");
131 FALLTHROUGH_INTENDED
;
134 if (in_fragmented_record
) {
135 if (wal_recovery_mode
== WALRecoveryMode::kAbsoluteConsistency
||
136 wal_recovery_mode
== WALRecoveryMode::kPointInTimeRecovery
) {
137 // In clean shutdown we don't expect any error in the log files.
138 // In point-in-time recovery an incomplete record at the end could
139 // produce a hole in the recovered data. Report an error here, which
140 // higher layers can choose to ignore when it's provable there is no
142 ReportCorruption(scratch
->size(), "error reading trailing data");
144 // This can be caused by the writer dying immediately after
145 // writing a physical record but before completing the next; don't
146 // treat it as a corruption, just ignore the entire logical record.
152 if (wal_recovery_mode
!= WALRecoveryMode::kSkipAnyCorruptedRecords
) {
153 // Treat a record from a previous instance of the log as EOF.
154 if (in_fragmented_record
) {
155 if (wal_recovery_mode
== WALRecoveryMode::kAbsoluteConsistency
||
156 wal_recovery_mode
== WALRecoveryMode::kPointInTimeRecovery
) {
157 // In clean shutdown we don't expect any error in the log files.
158 // In point-in-time recovery an incomplete record at the end could
159 // produce a hole in the recovered data. Report an error here,
160 // which higher layers can choose to ignore when it's provable
162 ReportCorruption(scratch
->size(), "error reading trailing data");
164 // This can be caused by the writer dying immediately after
165 // writing a physical record but before completing the next; don't
166 // treat it as a corruption, just ignore the entire logical record.
171 FALLTHROUGH_INTENDED
;
174 if (in_fragmented_record
) {
175 ReportCorruption(scratch
->size(), "error in middle of record");
176 in_fragmented_record
= false;
183 if (wal_recovery_mode
== WALRecoveryMode::kAbsoluteConsistency
||
184 wal_recovery_mode
== WALRecoveryMode::kPointInTimeRecovery
) {
185 // In clean shutdown we don't expect any error in the log files.
186 // In point-in-time recovery an incomplete record at the end could
187 // produce a hole in the recovered data. Report an error here, which
188 // higher layers can choose to ignore when it's provable there is no
190 ReportCorruption(drop_size
, "truncated record body");
194 FALLTHROUGH_INTENDED
;
196 case kBadRecordChecksum
:
199 WALRecoveryMode::kTolerateCorruptedTailRecords
) {
203 if (record_type
== kBadRecordLen
) {
204 ReportCorruption(drop_size
, "bad record length");
206 ReportCorruption(drop_size
, "checksum mismatch");
208 if (in_fragmented_record
) {
209 ReportCorruption(scratch
->size(), "error in middle of record");
210 in_fragmented_record
= false;
217 snprintf(buf
, sizeof(buf
), "unknown record type %u", record_type
);
219 (fragment
.size() + (in_fragmented_record
? scratch
->size() : 0)),
221 in_fragmented_record
= false;
230 uint64_t Reader::LastRecordOffset() {
231 return last_record_offset_
;
234 uint64_t Reader::LastRecordEnd() {
235 return end_of_buffer_offset_
- buffer_
.size();
238 void Reader::UnmarkEOF() {
243 if (eof_offset_
== 0) {
249 void Reader::UnmarkEOFInternal() {
250 // If the EOF was in the middle of a block (a partial block was read) we have
251 // to read the rest of the block as ReadPhysicalRecord can only read full
252 // blocks and expects the file position indicator to be aligned to the start
255 // consumed_bytes + buffer_size() + remaining == kBlockSize
257 size_t consumed_bytes
= eof_offset_
- buffer_
.size();
258 size_t remaining
= kBlockSize
- eof_offset_
;
260 // backing_store_ is used to concatenate what is left in buffer_ and
261 // the remainder of the block. If buffer_ already uses backing_store_,
262 // we just append the new data.
263 if (buffer_
.data() != backing_store_
+ consumed_bytes
) {
264 // Buffer_ does not use backing_store_ for storage.
265 // Copy what is left in buffer_ to backing_store.
266 memmove(backing_store_
+ consumed_bytes
, buffer_
.data(), buffer_
.size());
270 Status status
= file_
->Read(remaining
, &read_buffer
,
271 backing_store_
+ eof_offset_
);
273 size_t added
= read_buffer
.size();
274 end_of_buffer_offset_
+= added
;
278 ReportDrop(added
, status
);
285 if (read_buffer
.data() != backing_store_
+ eof_offset_
) {
286 // Read did not write to backing_store_
287 memmove(backing_store_
+ eof_offset_
, read_buffer
.data(),
291 buffer_
= Slice(backing_store_
+ consumed_bytes
,
292 eof_offset_
+ added
- consumed_bytes
);
294 if (added
< remaining
) {
296 eof_offset_
+= added
;
302 void Reader::ReportCorruption(size_t bytes
, const char* reason
) {
303 ReportDrop(bytes
, Status::Corruption(reason
));
306 void Reader::ReportDrop(size_t bytes
, const Status
& reason
) {
307 if (reporter_
!= nullptr) {
308 reporter_
->Corruption(bytes
, reason
);
312 bool Reader::ReadMore(size_t* drop_size
, int *error
) {
313 if (!eof_
&& !read_error_
) {
314 // Last read was a full read, so this is a trailer to skip
316 Status status
= file_
->Read(kBlockSize
, &buffer_
, backing_store_
);
317 TEST_SYNC_POINT_CALLBACK("LogReader::ReadMore:AfterReadFile", &status
);
318 end_of_buffer_offset_
+= buffer_
.size();
321 ReportDrop(kBlockSize
, status
);
325 } else if (buffer_
.size() < static_cast<size_t>(kBlockSize
)) {
327 eof_offset_
= buffer_
.size();
331 // Note that if buffer_ is non-empty, we have a truncated header at the
332 // end of the file, which can be caused by the writer crashing in the
333 // middle of writing the header. Unless explicitly requested we don't
334 // considering this an error, just report EOF.
335 if (buffer_
.size()) {
336 *drop_size
= buffer_
.size();
347 unsigned int Reader::ReadPhysicalRecord(Slice
* result
, size_t* drop_size
) {
349 // We need at least the minimum header size
350 if (buffer_
.size() < static_cast<size_t>(kHeaderSize
)) {
351 // the default value of r is meaningless because ReadMore will overwrite
352 // it if it returns false; in case it returns true, the return value will
353 // not be used anyway
355 if (!ReadMore(drop_size
, &r
)) {
362 const char* header
= buffer_
.data();
363 const uint32_t a
= static_cast<uint32_t>(header
[4]) & 0xff;
364 const uint32_t b
= static_cast<uint32_t>(header
[5]) & 0xff;
365 const unsigned int type
= header
[6];
366 const uint32_t length
= a
| (b
<< 8);
367 int header_size
= kHeaderSize
;
368 if (type
>= kRecyclableFullType
&& type
<= kRecyclableLastType
) {
369 if (end_of_buffer_offset_
- buffer_
.size() == 0) {
372 header_size
= kRecyclableHeaderSize
;
373 // We need enough for the larger header
374 if (buffer_
.size() < static_cast<size_t>(kRecyclableHeaderSize
)) {
376 if (!ReadMore(drop_size
, &r
)) {
381 const uint32_t log_num
= DecodeFixed32(header
+ 7);
382 if (log_num
!= log_number_
) {
386 if (header_size
+ length
> buffer_
.size()) {
387 assert(buffer_
.size() >= static_cast<size_t>(header_size
));
388 *drop_size
= buffer_
.size();
390 // If the end of the read has been reached without seeing
391 // `header_size + length` bytes of payload, report a corruption. The
392 // higher layers can decide how to handle it based on the recovery mode,
393 // whether this occurred at EOF, whether this is the final WAL, etc.
394 return kBadRecordLen
;
397 if (type
== kZeroType
&& length
== 0) {
398 // Skip zero length record without reporting any drops since
399 // such records are produced by the mmap based writing code in
400 // env_posix.cc that preallocates file regions.
401 // NOTE: this should never happen in DB written by new RocksDB versions,
402 // since we turn off mmap writes to manifest and log files
409 uint32_t expected_crc
= crc32c::Unmask(DecodeFixed32(header
));
410 uint32_t actual_crc
= crc32c::Value(header
+ 6, length
+ header_size
- 6);
411 if (actual_crc
!= expected_crc
) {
412 // Drop the rest of the buffer since "length" itself may have
413 // been corrupted and if we trust it, we could find some
414 // fragment of a real log record that just happens to look
415 // like a valid log record.
416 *drop_size
= buffer_
.size();
418 return kBadRecordChecksum
;
422 buffer_
.remove_prefix(header_size
+ length
);
424 *result
= Slice(header
+ header_size
, length
);
429 bool FragmentBufferedReader::ReadRecord(Slice
* record
, std::string
* scratch
,
430 WALRecoveryMode
/*unused*/) {
431 assert(record
!= nullptr);
432 assert(scratch
!= nullptr);
436 uint64_t prospective_record_offset
= 0;
437 uint64_t physical_record_offset
= end_of_buffer_offset_
- buffer_
.size();
438 size_t drop_size
= 0;
439 unsigned int fragment_type_or_err
= 0; // Initialize to make compiler happy
441 while (TryReadFragment(&fragment
, &drop_size
, &fragment_type_or_err
)) {
442 switch (fragment_type_or_err
) {
444 case kRecyclableFullType
:
445 if (in_fragmented_record_
&& !fragments_
.empty()) {
446 ReportCorruption(fragments_
.size(), "partial record without end(1)");
450 prospective_record_offset
= physical_record_offset
;
451 last_record_offset_
= prospective_record_offset
;
452 in_fragmented_record_
= false;
456 case kRecyclableFirstType
:
457 if (in_fragmented_record_
|| !fragments_
.empty()) {
458 ReportCorruption(fragments_
.size(), "partial record without end(2)");
460 prospective_record_offset
= physical_record_offset
;
461 fragments_
.assign(fragment
.data(), fragment
.size());
462 in_fragmented_record_
= true;
466 case kRecyclableMiddleType
:
467 if (!in_fragmented_record_
) {
468 ReportCorruption(fragment
.size(),
469 "missing start of fragmented record(1)");
471 fragments_
.append(fragment
.data(), fragment
.size());
476 case kRecyclableLastType
:
477 if (!in_fragmented_record_
) {
478 ReportCorruption(fragment
.size(),
479 "missing start of fragmented record(2)");
481 fragments_
.append(fragment
.data(), fragment
.size());
482 scratch
->assign(fragments_
.data(), fragments_
.size());
484 *record
= Slice(*scratch
);
485 last_record_offset_
= prospective_record_offset
;
486 in_fragmented_record_
= false;
495 if (in_fragmented_record_
) {
496 ReportCorruption(fragments_
.size(), "error in middle of record");
497 in_fragmented_record_
= false;
502 case kBadRecordChecksum
:
507 ReportCorruption(drop_size
, "checksum mismatch");
508 if (in_fragmented_record_
) {
509 ReportCorruption(fragments_
.size(), "error in middle of record");
510 in_fragmented_record_
= false;
517 snprintf(buf
, sizeof(buf
), "unknown record type %u",
518 fragment_type_or_err
);
520 fragment
.size() + (in_fragmented_record_
? fragments_
.size() : 0),
522 in_fragmented_record_
= false;
531 void FragmentBufferedReader::UnmarkEOF() {
539 bool FragmentBufferedReader::TryReadMore(size_t* drop_size
, int* error
) {
540 if (!eof_
&& !read_error_
) {
541 // Last read was a full read, so this is a trailer to skip
543 Status status
= file_
->Read(kBlockSize
, &buffer_
, backing_store_
);
544 end_of_buffer_offset_
+= buffer_
.size();
547 ReportDrop(kBlockSize
, status
);
551 } else if (buffer_
.size() < static_cast<size_t>(kBlockSize
)) {
553 eof_offset_
= buffer_
.size();
554 TEST_SYNC_POINT_CALLBACK(
555 "FragmentBufferedLogReader::TryReadMore:FirstEOF", nullptr);
558 } else if (!read_error_
) {
565 *drop_size
= buffer_
.size();
566 if (buffer_
.size() > 0) {
573 // return true if the caller should process the fragment_type_or_err.
574 bool FragmentBufferedReader::TryReadFragment(
575 Slice
* fragment
, size_t* drop_size
, unsigned int* fragment_type_or_err
) {
576 assert(fragment
!= nullptr);
577 assert(drop_size
!= nullptr);
578 assert(fragment_type_or_err
!= nullptr);
580 while (buffer_
.size() < static_cast<size_t>(kHeaderSize
)) {
581 size_t old_size
= buffer_
.size();
583 if (!TryReadMore(drop_size
, &error
)) {
584 *fragment_type_or_err
= error
;
586 } else if (old_size
== buffer_
.size()) {
590 const char* header
= buffer_
.data();
591 const uint32_t a
= static_cast<uint32_t>(header
[4]) & 0xff;
592 const uint32_t b
= static_cast<uint32_t>(header
[5]) & 0xff;
593 const unsigned int type
= header
[6];
594 const uint32_t length
= a
| (b
<< 8);
595 int header_size
= kHeaderSize
;
596 if (type
>= kRecyclableFullType
&& type
<= kRecyclableLastType
) {
597 if (end_of_buffer_offset_
- buffer_
.size() == 0) {
600 header_size
= kRecyclableHeaderSize
;
601 while (buffer_
.size() < static_cast<size_t>(kRecyclableHeaderSize
)) {
602 size_t old_size
= buffer_
.size();
604 if (!TryReadMore(drop_size
, &error
)) {
605 *fragment_type_or_err
= error
;
607 } else if (old_size
== buffer_
.size()) {
611 const uint32_t log_num
= DecodeFixed32(header
+ 7);
612 if (log_num
!= log_number_
) {
613 *fragment_type_or_err
= kOldRecord
;
618 while (header_size
+ length
> buffer_
.size()) {
619 size_t old_size
= buffer_
.size();
621 if (!TryReadMore(drop_size
, &error
)) {
622 *fragment_type_or_err
= error
;
624 } else if (old_size
== buffer_
.size()) {
629 if (type
== kZeroType
&& length
== 0) {
631 *fragment_type_or_err
= kBadRecord
;
636 uint32_t expected_crc
= crc32c::Unmask(DecodeFixed32(header
));
637 uint32_t actual_crc
= crc32c::Value(header
+ 6, length
+ header_size
- 6);
638 if (actual_crc
!= expected_crc
) {
639 *drop_size
= buffer_
.size();
641 *fragment_type_or_err
= kBadRecordChecksum
;
646 buffer_
.remove_prefix(header_size
+ length
);
648 *fragment
= Slice(header
+ header_size
, length
);
649 *fragment_type_or_err
= type
;
654 } // namespace ROCKSDB_NAMESPACE