1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "db/log_reader.h"
13 #include "file/sequence_file_reader.h"
14 #include "rocksdb/env.h"
15 #include "test_util/sync_point.h"
16 #include "util/coding.h"
17 #include "util/crc32c.h"
18 #include "util/util.h"
20 namespace ROCKSDB_NAMESPACE
{
23 Reader::Reporter::~Reporter() {
26 Reader::Reader(std::shared_ptr
<Logger
> info_log
,
27 std::unique_ptr
<SequentialFileReader
>&& _file
,
28 Reporter
* reporter
, bool checksum
, uint64_t log_num
)
29 : info_log_(info_log
),
30 file_(std::move(_file
)),
33 backing_store_(new char[kBlockSize
]),
38 last_record_offset_(0),
39 end_of_buffer_offset_(0),
44 delete[] backing_store_
;
47 // For kAbsoluteConsistency, on clean shutdown we don't expect any error
48 // in the log files. For other modes, we can ignore only incomplete records
49 // in the last log file, which are presumably due to a write in progress
50 // during restart (or from log recycling).
52 // TODO krad: Evaluate if we need to move to a more strict mode where we
53 // restrict the inconsistency to only the last log
54 bool Reader::ReadRecord(Slice
* record
, std::string
* scratch
,
55 WALRecoveryMode wal_recovery_mode
) {
58 bool in_fragmented_record
= false;
59 // Record offset of the logical record that we're reading
60 // 0 is a dummy value to make compilers happy
61 uint64_t prospective_record_offset
= 0;
65 uint64_t physical_record_offset
= end_of_buffer_offset_
- buffer_
.size();
67 const unsigned int record_type
= ReadPhysicalRecord(&fragment
, &drop_size
);
68 switch (record_type
) {
70 case kRecyclableFullType
:
71 if (in_fragmented_record
&& !scratch
->empty()) {
72 // Handle bug in earlier versions of log::Writer where
73 // it could emit an empty kFirstType record at the tail end
74 // of a block followed by a kFullType or kFirstType record
75 // at the beginning of the next block.
76 ReportCorruption(scratch
->size(), "partial record without end(1)");
78 prospective_record_offset
= physical_record_offset
;
81 last_record_offset_
= prospective_record_offset
;
85 case kRecyclableFirstType
:
86 if (in_fragmented_record
&& !scratch
->empty()) {
87 // Handle bug in earlier versions of log::Writer where
88 // it could emit an empty kFirstType record at the tail end
89 // of a block followed by a kFullType or kFirstType record
90 // at the beginning of the next block.
91 ReportCorruption(scratch
->size(), "partial record without end(2)");
93 prospective_record_offset
= physical_record_offset
;
94 scratch
->assign(fragment
.data(), fragment
.size());
95 in_fragmented_record
= true;
99 case kRecyclableMiddleType
:
100 if (!in_fragmented_record
) {
101 ReportCorruption(fragment
.size(),
102 "missing start of fragmented record(1)");
104 scratch
->append(fragment
.data(), fragment
.size());
109 case kRecyclableLastType
:
110 if (!in_fragmented_record
) {
111 ReportCorruption(fragment
.size(),
112 "missing start of fragmented record(2)");
114 scratch
->append(fragment
.data(), fragment
.size());
115 *record
= Slice(*scratch
);
116 last_record_offset_
= prospective_record_offset
;
122 if (wal_recovery_mode
== WALRecoveryMode::kAbsoluteConsistency
) {
123 // in clean shutdown we don't expect any error in the log files
124 ReportCorruption(drop_size
, "truncated header");
126 FALLTHROUGH_INTENDED
;
129 if (in_fragmented_record
) {
130 if (wal_recovery_mode
== WALRecoveryMode::kAbsoluteConsistency
) {
131 // in clean shutdown we don't expect any error in the log files
132 ReportCorruption(scratch
->size(), "error reading trailing data");
134 // This can be caused by the writer dying immediately after
135 // writing a physical record but before completing the next; don't
136 // treat it as a corruption, just ignore the entire logical record.
142 if (wal_recovery_mode
!= WALRecoveryMode::kSkipAnyCorruptedRecords
) {
143 // Treat a record from a previous instance of the log as EOF.
144 if (in_fragmented_record
) {
145 if (wal_recovery_mode
== WALRecoveryMode::kAbsoluteConsistency
) {
146 // in clean shutdown we don't expect any error in the log files
147 ReportCorruption(scratch
->size(), "error reading trailing data");
149 // This can be caused by the writer dying immediately after
150 // writing a physical record but before completing the next; don't
151 // treat it as a corruption, just ignore the entire logical record.
156 FALLTHROUGH_INTENDED
;
159 if (in_fragmented_record
) {
160 ReportCorruption(scratch
->size(), "error in middle of record");
161 in_fragmented_record
= false;
167 case kBadRecordChecksum
:
170 WALRecoveryMode::kTolerateCorruptedTailRecords
) {
174 if (record_type
== kBadRecordLen
) {
175 ReportCorruption(drop_size
, "bad record length");
177 ReportCorruption(drop_size
, "checksum mismatch");
179 if (in_fragmented_record
) {
180 ReportCorruption(scratch
->size(), "error in middle of record");
181 in_fragmented_record
= false;
188 snprintf(buf
, sizeof(buf
), "unknown record type %u", record_type
);
190 (fragment
.size() + (in_fragmented_record
? scratch
->size() : 0)),
192 in_fragmented_record
= false;
201 uint64_t Reader::LastRecordOffset() {
202 return last_record_offset_
;
205 void Reader::UnmarkEOF() {
210 if (eof_offset_
== 0) {
216 void Reader::UnmarkEOFInternal() {
217 // If the EOF was in the middle of a block (a partial block was read) we have
218 // to read the rest of the block as ReadPhysicalRecord can only read full
219 // blocks and expects the file position indicator to be aligned to the start
222 // consumed_bytes + buffer_size() + remaining == kBlockSize
224 size_t consumed_bytes
= eof_offset_
- buffer_
.size();
225 size_t remaining
= kBlockSize
- eof_offset_
;
227 // backing_store_ is used to concatenate what is left in buffer_ and
228 // the remainder of the block. If buffer_ already uses backing_store_,
229 // we just append the new data.
230 if (buffer_
.data() != backing_store_
+ consumed_bytes
) {
231 // Buffer_ does not use backing_store_ for storage.
232 // Copy what is left in buffer_ to backing_store.
233 memmove(backing_store_
+ consumed_bytes
, buffer_
.data(), buffer_
.size());
237 Status status
= file_
->Read(remaining
, &read_buffer
,
238 backing_store_
+ eof_offset_
);
240 size_t added
= read_buffer
.size();
241 end_of_buffer_offset_
+= added
;
245 ReportDrop(added
, status
);
252 if (read_buffer
.data() != backing_store_
+ eof_offset_
) {
253 // Read did not write to backing_store_
254 memmove(backing_store_
+ eof_offset_
, read_buffer
.data(),
258 buffer_
= Slice(backing_store_
+ consumed_bytes
,
259 eof_offset_
+ added
- consumed_bytes
);
261 if (added
< remaining
) {
263 eof_offset_
+= added
;
269 void Reader::ReportCorruption(size_t bytes
, const char* reason
) {
270 ReportDrop(bytes
, Status::Corruption(reason
));
273 void Reader::ReportDrop(size_t bytes
, const Status
& reason
) {
274 if (reporter_
!= nullptr) {
275 reporter_
->Corruption(bytes
, reason
);
279 bool Reader::ReadMore(size_t* drop_size
, int *error
) {
280 if (!eof_
&& !read_error_
) {
281 // Last read was a full read, so this is a trailer to skip
283 Status status
= file_
->Read(kBlockSize
, &buffer_
, backing_store_
);
284 end_of_buffer_offset_
+= buffer_
.size();
287 ReportDrop(kBlockSize
, status
);
291 } else if (buffer_
.size() < static_cast<size_t>(kBlockSize
)) {
293 eof_offset_
= buffer_
.size();
297 // Note that if buffer_ is non-empty, we have a truncated header at the
298 // end of the file, which can be caused by the writer crashing in the
299 // middle of writing the header. Unless explicitly requested we don't
300 // considering this an error, just report EOF.
301 if (buffer_
.size()) {
302 *drop_size
= buffer_
.size();
313 unsigned int Reader::ReadPhysicalRecord(Slice
* result
, size_t* drop_size
) {
315 // We need at least the minimum header size
316 if (buffer_
.size() < static_cast<size_t>(kHeaderSize
)) {
317 // the default value of r is meaningless because ReadMore will overwrite
318 // it if it returns false; in case it returns true, the return value will
319 // not be used anyway
321 if (!ReadMore(drop_size
, &r
)) {
328 const char* header
= buffer_
.data();
329 const uint32_t a
= static_cast<uint32_t>(header
[4]) & 0xff;
330 const uint32_t b
= static_cast<uint32_t>(header
[5]) & 0xff;
331 const unsigned int type
= header
[6];
332 const uint32_t length
= a
| (b
<< 8);
333 int header_size
= kHeaderSize
;
334 if (type
>= kRecyclableFullType
&& type
<= kRecyclableLastType
) {
335 if (end_of_buffer_offset_
- buffer_
.size() == 0) {
338 header_size
= kRecyclableHeaderSize
;
339 // We need enough for the larger header
340 if (buffer_
.size() < static_cast<size_t>(kRecyclableHeaderSize
)) {
342 if (!ReadMore(drop_size
, &r
)) {
347 const uint32_t log_num
= DecodeFixed32(header
+ 7);
348 if (log_num
!= log_number_
) {
352 if (header_size
+ length
> buffer_
.size()) {
353 *drop_size
= buffer_
.size();
356 return kBadRecordLen
;
358 // If the end of the file has been reached without reading |length|
359 // bytes of payload, assume the writer died in the middle of writing the
360 // record. Don't report a corruption unless requested.
367 if (type
== kZeroType
&& length
== 0) {
368 // Skip zero length record without reporting any drops since
369 // such records are produced by the mmap based writing code in
370 // env_posix.cc that preallocates file regions.
371 // NOTE: this should never happen in DB written by new RocksDB versions,
372 // since we turn off mmap writes to manifest and log files
379 uint32_t expected_crc
= crc32c::Unmask(DecodeFixed32(header
));
380 uint32_t actual_crc
= crc32c::Value(header
+ 6, length
+ header_size
- 6);
381 if (actual_crc
!= expected_crc
) {
382 // Drop the rest of the buffer since "length" itself may have
383 // been corrupted and if we trust it, we could find some
384 // fragment of a real log record that just happens to look
385 // like a valid log record.
386 *drop_size
= buffer_
.size();
388 return kBadRecordChecksum
;
392 buffer_
.remove_prefix(header_size
+ length
);
394 *result
= Slice(header
+ header_size
, length
);
399 bool FragmentBufferedReader::ReadRecord(Slice
* record
, std::string
* scratch
,
400 WALRecoveryMode
/*unused*/) {
401 assert(record
!= nullptr);
402 assert(scratch
!= nullptr);
406 uint64_t prospective_record_offset
= 0;
407 uint64_t physical_record_offset
= end_of_buffer_offset_
- buffer_
.size();
408 size_t drop_size
= 0;
409 unsigned int fragment_type_or_err
= 0; // Initialize to make compiler happy
411 while (TryReadFragment(&fragment
, &drop_size
, &fragment_type_or_err
)) {
412 switch (fragment_type_or_err
) {
414 case kRecyclableFullType
:
415 if (in_fragmented_record_
&& !fragments_
.empty()) {
416 ReportCorruption(fragments_
.size(), "partial record without end(1)");
420 prospective_record_offset
= physical_record_offset
;
421 last_record_offset_
= prospective_record_offset
;
422 in_fragmented_record_
= false;
426 case kRecyclableFirstType
:
427 if (in_fragmented_record_
|| !fragments_
.empty()) {
428 ReportCorruption(fragments_
.size(), "partial record without end(2)");
430 prospective_record_offset
= physical_record_offset
;
431 fragments_
.assign(fragment
.data(), fragment
.size());
432 in_fragmented_record_
= true;
436 case kRecyclableMiddleType
:
437 if (!in_fragmented_record_
) {
438 ReportCorruption(fragment
.size(),
439 "missing start of fragmented record(1)");
441 fragments_
.append(fragment
.data(), fragment
.size());
446 case kRecyclableLastType
:
447 if (!in_fragmented_record_
) {
448 ReportCorruption(fragment
.size(),
449 "missing start of fragmented record(2)");
451 fragments_
.append(fragment
.data(), fragment
.size());
452 scratch
->assign(fragments_
.data(), fragments_
.size());
454 *record
= Slice(*scratch
);
455 last_record_offset_
= prospective_record_offset
;
456 in_fragmented_record_
= false;
465 if (in_fragmented_record_
) {
466 ReportCorruption(fragments_
.size(), "error in middle of record");
467 in_fragmented_record_
= false;
472 case kBadRecordChecksum
:
477 ReportCorruption(drop_size
, "checksum mismatch");
478 if (in_fragmented_record_
) {
479 ReportCorruption(fragments_
.size(), "error in middle of record");
480 in_fragmented_record_
= false;
487 snprintf(buf
, sizeof(buf
), "unknown record type %u",
488 fragment_type_or_err
);
490 fragment
.size() + (in_fragmented_record_
? fragments_
.size() : 0),
492 in_fragmented_record_
= false;
501 void FragmentBufferedReader::UnmarkEOF() {
509 bool FragmentBufferedReader::TryReadMore(size_t* drop_size
, int* error
) {
510 if (!eof_
&& !read_error_
) {
511 // Last read was a full read, so this is a trailer to skip
513 Status status
= file_
->Read(kBlockSize
, &buffer_
, backing_store_
);
514 end_of_buffer_offset_
+= buffer_
.size();
517 ReportDrop(kBlockSize
, status
);
521 } else if (buffer_
.size() < static_cast<size_t>(kBlockSize
)) {
523 eof_offset_
= buffer_
.size();
524 TEST_SYNC_POINT_CALLBACK(
525 "FragmentBufferedLogReader::TryReadMore:FirstEOF", nullptr);
528 } else if (!read_error_
) {
535 *drop_size
= buffer_
.size();
536 if (buffer_
.size() > 0) {
543 // return true if the caller should process the fragment_type_or_err.
544 bool FragmentBufferedReader::TryReadFragment(
545 Slice
* fragment
, size_t* drop_size
, unsigned int* fragment_type_or_err
) {
546 assert(fragment
!= nullptr);
547 assert(drop_size
!= nullptr);
548 assert(fragment_type_or_err
!= nullptr);
550 while (buffer_
.size() < static_cast<size_t>(kHeaderSize
)) {
551 size_t old_size
= buffer_
.size();
553 if (!TryReadMore(drop_size
, &error
)) {
554 *fragment_type_or_err
= error
;
556 } else if (old_size
== buffer_
.size()) {
560 const char* header
= buffer_
.data();
561 const uint32_t a
= static_cast<uint32_t>(header
[4]) & 0xff;
562 const uint32_t b
= static_cast<uint32_t>(header
[5]) & 0xff;
563 const unsigned int type
= header
[6];
564 const uint32_t length
= a
| (b
<< 8);
565 int header_size
= kHeaderSize
;
566 if (type
>= kRecyclableFullType
&& type
<= kRecyclableLastType
) {
567 if (end_of_buffer_offset_
- buffer_
.size() == 0) {
570 header_size
= kRecyclableHeaderSize
;
571 while (buffer_
.size() < static_cast<size_t>(kRecyclableHeaderSize
)) {
572 size_t old_size
= buffer_
.size();
574 if (!TryReadMore(drop_size
, &error
)) {
575 *fragment_type_or_err
= error
;
577 } else if (old_size
== buffer_
.size()) {
581 const uint32_t log_num
= DecodeFixed32(header
+ 7);
582 if (log_num
!= log_number_
) {
583 *fragment_type_or_err
= kOldRecord
;
588 while (header_size
+ length
> buffer_
.size()) {
589 size_t old_size
= buffer_
.size();
591 if (!TryReadMore(drop_size
, &error
)) {
592 *fragment_type_or_err
= error
;
594 } else if (old_size
== buffer_
.size()) {
599 if (type
== kZeroType
&& length
== 0) {
601 *fragment_type_or_err
= kBadRecord
;
606 uint32_t expected_crc
= crc32c::Unmask(DecodeFixed32(header
));
607 uint32_t actual_crc
= crc32c::Value(header
+ 6, length
+ header_size
- 6);
608 if (actual_crc
!= expected_crc
) {
609 *drop_size
= buffer_
.size();
611 *fragment_type_or_err
= kBadRecordChecksum
;
616 buffer_
.remove_prefix(header_size
+ length
);
618 *fragment
= Slice(header
+ header_size
, length
);
619 *fragment_type_or_err
= type
;
624 } // namespace ROCKSDB_NAMESPACE