]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/log_reader.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / log_reader.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/log_reader.h"
11
12 #include <stdio.h>
13 #include "file/sequence_file_reader.h"
14 #include "port/lang.h"
15 #include "rocksdb/env.h"
16 #include "test_util/sync_point.h"
17 #include "util/coding.h"
18 #include "util/crc32c.h"
19
20 namespace ROCKSDB_NAMESPACE {
21 namespace log {
22
23 Reader::Reporter::~Reporter() {
24 }
25
26 Reader::Reader(std::shared_ptr<Logger> info_log,
27 std::unique_ptr<SequentialFileReader>&& _file,
28 Reporter* reporter, bool checksum, uint64_t log_num)
29 : info_log_(info_log),
30 file_(std::move(_file)),
31 reporter_(reporter),
32 checksum_(checksum),
33 backing_store_(new char[kBlockSize]),
34 buffer_(),
35 eof_(false),
36 read_error_(false),
37 eof_offset_(0),
38 last_record_offset_(0),
39 end_of_buffer_offset_(0),
40 log_number_(log_num),
41 recycled_(false) {}
42
43 Reader::~Reader() {
44 delete[] backing_store_;
45 }
46
47 // For kAbsoluteConsistency, on clean shutdown we don't expect any error
48 // in the log files. For other modes, we can ignore only incomplete records
49 // in the last log file, which are presumably due to a write in progress
50 // during restart (or from log recycling).
51 //
52 // TODO krad: Evaluate if we need to move to a more strict mode where we
53 // restrict the inconsistency to only the last log
54 bool Reader::ReadRecord(Slice* record, std::string* scratch,
55 WALRecoveryMode wal_recovery_mode) {
56 scratch->clear();
57 record->clear();
58 bool in_fragmented_record = false;
59 // Record offset of the logical record that we're reading
60 // 0 is a dummy value to make compilers happy
61 uint64_t prospective_record_offset = 0;
62
63 Slice fragment;
64 while (true) {
65 uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
66 size_t drop_size = 0;
67 const unsigned int record_type = ReadPhysicalRecord(&fragment, &drop_size);
68 switch (record_type) {
69 case kFullType:
70 case kRecyclableFullType:
71 if (in_fragmented_record && !scratch->empty()) {
72 // Handle bug in earlier versions of log::Writer where
73 // it could emit an empty kFirstType record at the tail end
74 // of a block followed by a kFullType or kFirstType record
75 // at the beginning of the next block.
76 ReportCorruption(scratch->size(), "partial record without end(1)");
77 }
78 prospective_record_offset = physical_record_offset;
79 scratch->clear();
80 *record = fragment;
81 last_record_offset_ = prospective_record_offset;
82 return true;
83
84 case kFirstType:
85 case kRecyclableFirstType:
86 if (in_fragmented_record && !scratch->empty()) {
87 // Handle bug in earlier versions of log::Writer where
88 // it could emit an empty kFirstType record at the tail end
89 // of a block followed by a kFullType or kFirstType record
90 // at the beginning of the next block.
91 ReportCorruption(scratch->size(), "partial record without end(2)");
92 }
93 prospective_record_offset = physical_record_offset;
94 scratch->assign(fragment.data(), fragment.size());
95 in_fragmented_record = true;
96 break;
97
98 case kMiddleType:
99 case kRecyclableMiddleType:
100 if (!in_fragmented_record) {
101 ReportCorruption(fragment.size(),
102 "missing start of fragmented record(1)");
103 } else {
104 scratch->append(fragment.data(), fragment.size());
105 }
106 break;
107
108 case kLastType:
109 case kRecyclableLastType:
110 if (!in_fragmented_record) {
111 ReportCorruption(fragment.size(),
112 "missing start of fragmented record(2)");
113 } else {
114 scratch->append(fragment.data(), fragment.size());
115 *record = Slice(*scratch);
116 last_record_offset_ = prospective_record_offset;
117 return true;
118 }
119 break;
120
121 case kBadHeader:
122 if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency ||
123 wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) {
124 // In clean shutdown we don't expect any error in the log files.
125 // In point-in-time recovery an incomplete record at the end could
126 // produce a hole in the recovered data. Report an error here, which
127 // higher layers can choose to ignore when it's provable there is no
128 // hole.
129 ReportCorruption(drop_size, "truncated header");
130 }
131 FALLTHROUGH_INTENDED;
132
133 case kEof:
134 if (in_fragmented_record) {
135 if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency ||
136 wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) {
137 // In clean shutdown we don't expect any error in the log files.
138 // In point-in-time recovery an incomplete record at the end could
139 // produce a hole in the recovered data. Report an error here, which
140 // higher layers can choose to ignore when it's provable there is no
141 // hole.
142 ReportCorruption(scratch->size(), "error reading trailing data");
143 }
144 // This can be caused by the writer dying immediately after
145 // writing a physical record but before completing the next; don't
146 // treat it as a corruption, just ignore the entire logical record.
147 scratch->clear();
148 }
149 return false;
150
151 case kOldRecord:
152 if (wal_recovery_mode != WALRecoveryMode::kSkipAnyCorruptedRecords) {
153 // Treat a record from a previous instance of the log as EOF.
154 if (in_fragmented_record) {
155 if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency ||
156 wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) {
157 // In clean shutdown we don't expect any error in the log files.
158 // In point-in-time recovery an incomplete record at the end could
159 // produce a hole in the recovered data. Report an error here,
160 // which higher layers can choose to ignore when it's provable
161 // there is no hole.
162 ReportCorruption(scratch->size(), "error reading trailing data");
163 }
164 // This can be caused by the writer dying immediately after
165 // writing a physical record but before completing the next; don't
166 // treat it as a corruption, just ignore the entire logical record.
167 scratch->clear();
168 }
169 return false;
170 }
171 FALLTHROUGH_INTENDED;
172
173 case kBadRecord:
174 if (in_fragmented_record) {
175 ReportCorruption(scratch->size(), "error in middle of record");
176 in_fragmented_record = false;
177 scratch->clear();
178 }
179 break;
180
181 case kBadRecordLen:
182 if (eof_) {
183 if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency ||
184 wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) {
185 // In clean shutdown we don't expect any error in the log files.
186 // In point-in-time recovery an incomplete record at the end could
187 // produce a hole in the recovered data. Report an error here, which
188 // higher layers can choose to ignore when it's provable there is no
189 // hole.
190 ReportCorruption(drop_size, "truncated record body");
191 }
192 return false;
193 }
194 FALLTHROUGH_INTENDED;
195
196 case kBadRecordChecksum:
197 if (recycled_ &&
198 wal_recovery_mode ==
199 WALRecoveryMode::kTolerateCorruptedTailRecords) {
200 scratch->clear();
201 return false;
202 }
203 if (record_type == kBadRecordLen) {
204 ReportCorruption(drop_size, "bad record length");
205 } else {
206 ReportCorruption(drop_size, "checksum mismatch");
207 }
208 if (in_fragmented_record) {
209 ReportCorruption(scratch->size(), "error in middle of record");
210 in_fragmented_record = false;
211 scratch->clear();
212 }
213 break;
214
215 default: {
216 char buf[40];
217 snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
218 ReportCorruption(
219 (fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
220 buf);
221 in_fragmented_record = false;
222 scratch->clear();
223 break;
224 }
225 }
226 }
227 return false;
228 }
229
230 uint64_t Reader::LastRecordOffset() {
231 return last_record_offset_;
232 }
233
234 uint64_t Reader::LastRecordEnd() {
235 return end_of_buffer_offset_ - buffer_.size();
236 }
237
238 void Reader::UnmarkEOF() {
239 if (read_error_) {
240 return;
241 }
242 eof_ = false;
243 if (eof_offset_ == 0) {
244 return;
245 }
246 UnmarkEOFInternal();
247 }
248
249 void Reader::UnmarkEOFInternal() {
250 // If the EOF was in the middle of a block (a partial block was read) we have
251 // to read the rest of the block as ReadPhysicalRecord can only read full
252 // blocks and expects the file position indicator to be aligned to the start
253 // of a block.
254 //
255 // consumed_bytes + buffer_size() + remaining == kBlockSize
256
257 size_t consumed_bytes = eof_offset_ - buffer_.size();
258 size_t remaining = kBlockSize - eof_offset_;
259
260 // backing_store_ is used to concatenate what is left in buffer_ and
261 // the remainder of the block. If buffer_ already uses backing_store_,
262 // we just append the new data.
263 if (buffer_.data() != backing_store_ + consumed_bytes) {
264 // Buffer_ does not use backing_store_ for storage.
265 // Copy what is left in buffer_ to backing_store.
266 memmove(backing_store_ + consumed_bytes, buffer_.data(), buffer_.size());
267 }
268
269 Slice read_buffer;
270 Status status = file_->Read(remaining, &read_buffer,
271 backing_store_ + eof_offset_);
272
273 size_t added = read_buffer.size();
274 end_of_buffer_offset_ += added;
275
276 if (!status.ok()) {
277 if (added > 0) {
278 ReportDrop(added, status);
279 }
280
281 read_error_ = true;
282 return;
283 }
284
285 if (read_buffer.data() != backing_store_ + eof_offset_) {
286 // Read did not write to backing_store_
287 memmove(backing_store_ + eof_offset_, read_buffer.data(),
288 read_buffer.size());
289 }
290
291 buffer_ = Slice(backing_store_ + consumed_bytes,
292 eof_offset_ + added - consumed_bytes);
293
294 if (added < remaining) {
295 eof_ = true;
296 eof_offset_ += added;
297 } else {
298 eof_offset_ = 0;
299 }
300 }
301
302 void Reader::ReportCorruption(size_t bytes, const char* reason) {
303 ReportDrop(bytes, Status::Corruption(reason));
304 }
305
306 void Reader::ReportDrop(size_t bytes, const Status& reason) {
307 if (reporter_ != nullptr) {
308 reporter_->Corruption(bytes, reason);
309 }
310 }
311
312 bool Reader::ReadMore(size_t* drop_size, int *error) {
313 if (!eof_ && !read_error_) {
314 // Last read was a full read, so this is a trailer to skip
315 buffer_.clear();
316 Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
317 TEST_SYNC_POINT_CALLBACK("LogReader::ReadMore:AfterReadFile", &status);
318 end_of_buffer_offset_ += buffer_.size();
319 if (!status.ok()) {
320 buffer_.clear();
321 ReportDrop(kBlockSize, status);
322 read_error_ = true;
323 *error = kEof;
324 return false;
325 } else if (buffer_.size() < static_cast<size_t>(kBlockSize)) {
326 eof_ = true;
327 eof_offset_ = buffer_.size();
328 }
329 return true;
330 } else {
331 // Note that if buffer_ is non-empty, we have a truncated header at the
332 // end of the file, which can be caused by the writer crashing in the
333 // middle of writing the header. Unless explicitly requested we don't
334 // considering this an error, just report EOF.
335 if (buffer_.size()) {
336 *drop_size = buffer_.size();
337 buffer_.clear();
338 *error = kBadHeader;
339 return false;
340 }
341 buffer_.clear();
342 *error = kEof;
343 return false;
344 }
345 }
346
347 unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
348 while (true) {
349 // We need at least the minimum header size
350 if (buffer_.size() < static_cast<size_t>(kHeaderSize)) {
351 // the default value of r is meaningless because ReadMore will overwrite
352 // it if it returns false; in case it returns true, the return value will
353 // not be used anyway
354 int r = kEof;
355 if (!ReadMore(drop_size, &r)) {
356 return r;
357 }
358 continue;
359 }
360
361 // Parse the header
362 const char* header = buffer_.data();
363 const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
364 const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
365 const unsigned int type = header[6];
366 const uint32_t length = a | (b << 8);
367 int header_size = kHeaderSize;
368 if (type >= kRecyclableFullType && type <= kRecyclableLastType) {
369 if (end_of_buffer_offset_ - buffer_.size() == 0) {
370 recycled_ = true;
371 }
372 header_size = kRecyclableHeaderSize;
373 // We need enough for the larger header
374 if (buffer_.size() < static_cast<size_t>(kRecyclableHeaderSize)) {
375 int r = kEof;
376 if (!ReadMore(drop_size, &r)) {
377 return r;
378 }
379 continue;
380 }
381 const uint32_t log_num = DecodeFixed32(header + 7);
382 if (log_num != log_number_) {
383 return kOldRecord;
384 }
385 }
386 if (header_size + length > buffer_.size()) {
387 assert(buffer_.size() >= static_cast<size_t>(header_size));
388 *drop_size = buffer_.size();
389 buffer_.clear();
390 // If the end of the read has been reached without seeing
391 // `header_size + length` bytes of payload, report a corruption. The
392 // higher layers can decide how to handle it based on the recovery mode,
393 // whether this occurred at EOF, whether this is the final WAL, etc.
394 return kBadRecordLen;
395 }
396
397 if (type == kZeroType && length == 0) {
398 // Skip zero length record without reporting any drops since
399 // such records are produced by the mmap based writing code in
400 // env_posix.cc that preallocates file regions.
401 // NOTE: this should never happen in DB written by new RocksDB versions,
402 // since we turn off mmap writes to manifest and log files
403 buffer_.clear();
404 return kBadRecord;
405 }
406
407 // Check crc
408 if (checksum_) {
409 uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
410 uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6);
411 if (actual_crc != expected_crc) {
412 // Drop the rest of the buffer since "length" itself may have
413 // been corrupted and if we trust it, we could find some
414 // fragment of a real log record that just happens to look
415 // like a valid log record.
416 *drop_size = buffer_.size();
417 buffer_.clear();
418 return kBadRecordChecksum;
419 }
420 }
421
422 buffer_.remove_prefix(header_size + length);
423
424 *result = Slice(header + header_size, length);
425 return type;
426 }
427 }
428
429 bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch,
430 WALRecoveryMode /*unused*/) {
431 assert(record != nullptr);
432 assert(scratch != nullptr);
433 record->clear();
434 scratch->clear();
435
436 uint64_t prospective_record_offset = 0;
437 uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
438 size_t drop_size = 0;
439 unsigned int fragment_type_or_err = 0; // Initialize to make compiler happy
440 Slice fragment;
441 while (TryReadFragment(&fragment, &drop_size, &fragment_type_or_err)) {
442 switch (fragment_type_or_err) {
443 case kFullType:
444 case kRecyclableFullType:
445 if (in_fragmented_record_ && !fragments_.empty()) {
446 ReportCorruption(fragments_.size(), "partial record without end(1)");
447 }
448 fragments_.clear();
449 *record = fragment;
450 prospective_record_offset = physical_record_offset;
451 last_record_offset_ = prospective_record_offset;
452 in_fragmented_record_ = false;
453 return true;
454
455 case kFirstType:
456 case kRecyclableFirstType:
457 if (in_fragmented_record_ || !fragments_.empty()) {
458 ReportCorruption(fragments_.size(), "partial record without end(2)");
459 }
460 prospective_record_offset = physical_record_offset;
461 fragments_.assign(fragment.data(), fragment.size());
462 in_fragmented_record_ = true;
463 break;
464
465 case kMiddleType:
466 case kRecyclableMiddleType:
467 if (!in_fragmented_record_) {
468 ReportCorruption(fragment.size(),
469 "missing start of fragmented record(1)");
470 } else {
471 fragments_.append(fragment.data(), fragment.size());
472 }
473 break;
474
475 case kLastType:
476 case kRecyclableLastType:
477 if (!in_fragmented_record_) {
478 ReportCorruption(fragment.size(),
479 "missing start of fragmented record(2)");
480 } else {
481 fragments_.append(fragment.data(), fragment.size());
482 scratch->assign(fragments_.data(), fragments_.size());
483 fragments_.clear();
484 *record = Slice(*scratch);
485 last_record_offset_ = prospective_record_offset;
486 in_fragmented_record_ = false;
487 return true;
488 }
489 break;
490
491 case kBadHeader:
492 case kBadRecord:
493 case kEof:
494 case kOldRecord:
495 if (in_fragmented_record_) {
496 ReportCorruption(fragments_.size(), "error in middle of record");
497 in_fragmented_record_ = false;
498 fragments_.clear();
499 }
500 break;
501
502 case kBadRecordChecksum:
503 if (recycled_) {
504 fragments_.clear();
505 return false;
506 }
507 ReportCorruption(drop_size, "checksum mismatch");
508 if (in_fragmented_record_) {
509 ReportCorruption(fragments_.size(), "error in middle of record");
510 in_fragmented_record_ = false;
511 fragments_.clear();
512 }
513 break;
514
515 default: {
516 char buf[40];
517 snprintf(buf, sizeof(buf), "unknown record type %u",
518 fragment_type_or_err);
519 ReportCorruption(
520 fragment.size() + (in_fragmented_record_ ? fragments_.size() : 0),
521 buf);
522 in_fragmented_record_ = false;
523 fragments_.clear();
524 break;
525 }
526 }
527 }
528 return false;
529 }
530
531 void FragmentBufferedReader::UnmarkEOF() {
532 if (read_error_) {
533 return;
534 }
535 eof_ = false;
536 UnmarkEOFInternal();
537 }
538
539 bool FragmentBufferedReader::TryReadMore(size_t* drop_size, int* error) {
540 if (!eof_ && !read_error_) {
541 // Last read was a full read, so this is a trailer to skip
542 buffer_.clear();
543 Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
544 end_of_buffer_offset_ += buffer_.size();
545 if (!status.ok()) {
546 buffer_.clear();
547 ReportDrop(kBlockSize, status);
548 read_error_ = true;
549 *error = kEof;
550 return false;
551 } else if (buffer_.size() < static_cast<size_t>(kBlockSize)) {
552 eof_ = true;
553 eof_offset_ = buffer_.size();
554 TEST_SYNC_POINT_CALLBACK(
555 "FragmentBufferedLogReader::TryReadMore:FirstEOF", nullptr);
556 }
557 return true;
558 } else if (!read_error_) {
559 UnmarkEOF();
560 }
561 if (!read_error_) {
562 return true;
563 }
564 *error = kEof;
565 *drop_size = buffer_.size();
566 if (buffer_.size() > 0) {
567 *error = kBadHeader;
568 }
569 buffer_.clear();
570 return false;
571 }
572
573 // return true if the caller should process the fragment_type_or_err.
574 bool FragmentBufferedReader::TryReadFragment(
575 Slice* fragment, size_t* drop_size, unsigned int* fragment_type_or_err) {
576 assert(fragment != nullptr);
577 assert(drop_size != nullptr);
578 assert(fragment_type_or_err != nullptr);
579
580 while (buffer_.size() < static_cast<size_t>(kHeaderSize)) {
581 size_t old_size = buffer_.size();
582 int error = kEof;
583 if (!TryReadMore(drop_size, &error)) {
584 *fragment_type_or_err = error;
585 return false;
586 } else if (old_size == buffer_.size()) {
587 return false;
588 }
589 }
590 const char* header = buffer_.data();
591 const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
592 const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
593 const unsigned int type = header[6];
594 const uint32_t length = a | (b << 8);
595 int header_size = kHeaderSize;
596 if (type >= kRecyclableFullType && type <= kRecyclableLastType) {
597 if (end_of_buffer_offset_ - buffer_.size() == 0) {
598 recycled_ = true;
599 }
600 header_size = kRecyclableHeaderSize;
601 while (buffer_.size() < static_cast<size_t>(kRecyclableHeaderSize)) {
602 size_t old_size = buffer_.size();
603 int error = kEof;
604 if (!TryReadMore(drop_size, &error)) {
605 *fragment_type_or_err = error;
606 return false;
607 } else if (old_size == buffer_.size()) {
608 return false;
609 }
610 }
611 const uint32_t log_num = DecodeFixed32(header + 7);
612 if (log_num != log_number_) {
613 *fragment_type_or_err = kOldRecord;
614 return true;
615 }
616 }
617
618 while (header_size + length > buffer_.size()) {
619 size_t old_size = buffer_.size();
620 int error = kEof;
621 if (!TryReadMore(drop_size, &error)) {
622 *fragment_type_or_err = error;
623 return false;
624 } else if (old_size == buffer_.size()) {
625 return false;
626 }
627 }
628
629 if (type == kZeroType && length == 0) {
630 buffer_.clear();
631 *fragment_type_or_err = kBadRecord;
632 return true;
633 }
634
635 if (checksum_) {
636 uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
637 uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6);
638 if (actual_crc != expected_crc) {
639 *drop_size = buffer_.size();
640 buffer_.clear();
641 *fragment_type_or_err = kBadRecordChecksum;
642 return true;
643 }
644 }
645
646 buffer_.remove_prefix(header_size + length);
647
648 *fragment = Slice(header + header_size, length);
649 *fragment_type_or_err = type;
650 return true;
651 }
652
653 } // namespace log
654 } // namespace ROCKSDB_NAMESPACE