]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/log_reader.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / log_reader.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10#include "db/log_reader.h"
11
12#include <stdio.h>
f67539c2 13#include "file/sequence_file_reader.h"
7c673cae 14#include "rocksdb/env.h"
f67539c2 15#include "test_util/sync_point.h"
7c673cae
FG
16#include "util/coding.h"
17#include "util/crc32c.h"
11fdf7f2 18#include "util/util.h"
7c673cae 19
f67539c2 20namespace ROCKSDB_NAMESPACE {
7c673cae
FG
21namespace log {
22
23Reader::Reporter::~Reporter() {
24}
25
26Reader::Reader(std::shared_ptr<Logger> info_log,
494da23a
TL
27 std::unique_ptr<SequentialFileReader>&& _file,
28 Reporter* reporter, bool checksum, uint64_t log_num)
7c673cae
FG
29 : info_log_(info_log),
30 file_(std::move(_file)),
31 reporter_(reporter),
32 checksum_(checksum),
33 backing_store_(new char[kBlockSize]),
34 buffer_(),
35 eof_(false),
36 read_error_(false),
37 eof_offset_(0),
38 last_record_offset_(0),
39 end_of_buffer_offset_(0),
7c673cae
FG
40 log_number_(log_num),
41 recycled_(false) {}
42
43Reader::~Reader() {
44 delete[] backing_store_;
45}
46
7c673cae
FG
47// For kAbsoluteConsistency, on clean shutdown we don't expect any error
48// in the log files. For other modes, we can ignore only incomplete records
49// in the last log file, which are presumably due to a write in progress
50// during restart (or from log recycling).
51//
52// TODO krad: Evaluate if we need to move to a more strict mode where we
53// restrict the inconsistency to only the last log
54bool Reader::ReadRecord(Slice* record, std::string* scratch,
55 WALRecoveryMode wal_recovery_mode) {
7c673cae
FG
56 scratch->clear();
57 record->clear();
58 bool in_fragmented_record = false;
59 // Record offset of the logical record that we're reading
60 // 0 is a dummy value to make compilers happy
61 uint64_t prospective_record_offset = 0;
62
63 Slice fragment;
64 while (true) {
65 uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
66 size_t drop_size = 0;
67 const unsigned int record_type = ReadPhysicalRecord(&fragment, &drop_size);
68 switch (record_type) {
69 case kFullType:
70 case kRecyclableFullType:
71 if (in_fragmented_record && !scratch->empty()) {
72 // Handle bug in earlier versions of log::Writer where
73 // it could emit an empty kFirstType record at the tail end
74 // of a block followed by a kFullType or kFirstType record
75 // at the beginning of the next block.
76 ReportCorruption(scratch->size(), "partial record without end(1)");
77 }
78 prospective_record_offset = physical_record_offset;
79 scratch->clear();
80 *record = fragment;
81 last_record_offset_ = prospective_record_offset;
82 return true;
83
84 case kFirstType:
85 case kRecyclableFirstType:
86 if (in_fragmented_record && !scratch->empty()) {
87 // Handle bug in earlier versions of log::Writer where
88 // it could emit an empty kFirstType record at the tail end
89 // of a block followed by a kFullType or kFirstType record
90 // at the beginning of the next block.
91 ReportCorruption(scratch->size(), "partial record without end(2)");
92 }
93 prospective_record_offset = physical_record_offset;
94 scratch->assign(fragment.data(), fragment.size());
95 in_fragmented_record = true;
96 break;
97
98 case kMiddleType:
99 case kRecyclableMiddleType:
100 if (!in_fragmented_record) {
101 ReportCorruption(fragment.size(),
102 "missing start of fragmented record(1)");
103 } else {
104 scratch->append(fragment.data(), fragment.size());
105 }
106 break;
107
108 case kLastType:
109 case kRecyclableLastType:
110 if (!in_fragmented_record) {
111 ReportCorruption(fragment.size(),
112 "missing start of fragmented record(2)");
113 } else {
114 scratch->append(fragment.data(), fragment.size());
115 *record = Slice(*scratch);
116 last_record_offset_ = prospective_record_offset;
117 return true;
118 }
119 break;
120
121 case kBadHeader:
122 if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {
123 // in clean shutdown we don't expect any error in the log files
124 ReportCorruption(drop_size, "truncated header");
125 }
494da23a 126 FALLTHROUGH_INTENDED;
7c673cae
FG
127
128 case kEof:
129 if (in_fragmented_record) {
130 if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {
131 // in clean shutdown we don't expect any error in the log files
132 ReportCorruption(scratch->size(), "error reading trailing data");
133 }
134 // This can be caused by the writer dying immediately after
135 // writing a physical record but before completing the next; don't
136 // treat it as a corruption, just ignore the entire logical record.
137 scratch->clear();
138 }
139 return false;
140
141 case kOldRecord:
142 if (wal_recovery_mode != WALRecoveryMode::kSkipAnyCorruptedRecords) {
143 // Treat a record from a previous instance of the log as EOF.
144 if (in_fragmented_record) {
145 if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {
146 // in clean shutdown we don't expect any error in the log files
147 ReportCorruption(scratch->size(), "error reading trailing data");
148 }
149 // This can be caused by the writer dying immediately after
150 // writing a physical record but before completing the next; don't
151 // treat it as a corruption, just ignore the entire logical record.
152 scratch->clear();
153 }
154 return false;
155 }
494da23a 156 FALLTHROUGH_INTENDED;
7c673cae
FG
157
158 case kBadRecord:
159 if (in_fragmented_record) {
160 ReportCorruption(scratch->size(), "error in middle of record");
161 in_fragmented_record = false;
162 scratch->clear();
163 }
164 break;
165
166 case kBadRecordLen:
167 case kBadRecordChecksum:
168 if (recycled_ &&
169 wal_recovery_mode ==
170 WALRecoveryMode::kTolerateCorruptedTailRecords) {
171 scratch->clear();
172 return false;
173 }
174 if (record_type == kBadRecordLen) {
175 ReportCorruption(drop_size, "bad record length");
176 } else {
177 ReportCorruption(drop_size, "checksum mismatch");
178 }
179 if (in_fragmented_record) {
180 ReportCorruption(scratch->size(), "error in middle of record");
181 in_fragmented_record = false;
182 scratch->clear();
183 }
184 break;
185
186 default: {
187 char buf[40];
188 snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
189 ReportCorruption(
190 (fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
191 buf);
192 in_fragmented_record = false;
193 scratch->clear();
194 break;
195 }
196 }
197 }
198 return false;
199}
200
201uint64_t Reader::LastRecordOffset() {
202 return last_record_offset_;
203}
204
205void Reader::UnmarkEOF() {
206 if (read_error_) {
207 return;
208 }
7c673cae 209 eof_ = false;
7c673cae
FG
210 if (eof_offset_ == 0) {
211 return;
212 }
494da23a
TL
213 UnmarkEOFInternal();
214}
7c673cae 215
494da23a 216void Reader::UnmarkEOFInternal() {
7c673cae
FG
217 // If the EOF was in the middle of a block (a partial block was read) we have
218 // to read the rest of the block as ReadPhysicalRecord can only read full
219 // blocks and expects the file position indicator to be aligned to the start
220 // of a block.
221 //
222 // consumed_bytes + buffer_size() + remaining == kBlockSize
223
224 size_t consumed_bytes = eof_offset_ - buffer_.size();
225 size_t remaining = kBlockSize - eof_offset_;
226
227 // backing_store_ is used to concatenate what is left in buffer_ and
228 // the remainder of the block. If buffer_ already uses backing_store_,
229 // we just append the new data.
230 if (buffer_.data() != backing_store_ + consumed_bytes) {
231 // Buffer_ does not use backing_store_ for storage.
232 // Copy what is left in buffer_ to backing_store.
233 memmove(backing_store_ + consumed_bytes, buffer_.data(), buffer_.size());
234 }
235
236 Slice read_buffer;
237 Status status = file_->Read(remaining, &read_buffer,
238 backing_store_ + eof_offset_);
239
240 size_t added = read_buffer.size();
241 end_of_buffer_offset_ += added;
242
243 if (!status.ok()) {
244 if (added > 0) {
245 ReportDrop(added, status);
246 }
247
248 read_error_ = true;
249 return;
250 }
251
252 if (read_buffer.data() != backing_store_ + eof_offset_) {
253 // Read did not write to backing_store_
254 memmove(backing_store_ + eof_offset_, read_buffer.data(),
255 read_buffer.size());
256 }
257
258 buffer_ = Slice(backing_store_ + consumed_bytes,
259 eof_offset_ + added - consumed_bytes);
260
261 if (added < remaining) {
262 eof_ = true;
263 eof_offset_ += added;
264 } else {
265 eof_offset_ = 0;
266 }
267}
268
269void Reader::ReportCorruption(size_t bytes, const char* reason) {
270 ReportDrop(bytes, Status::Corruption(reason));
271}
272
273void Reader::ReportDrop(size_t bytes, const Status& reason) {
11fdf7f2 274 if (reporter_ != nullptr) {
7c673cae
FG
275 reporter_->Corruption(bytes, reason);
276 }
277}
278
279bool Reader::ReadMore(size_t* drop_size, int *error) {
280 if (!eof_ && !read_error_) {
281 // Last read was a full read, so this is a trailer to skip
282 buffer_.clear();
283 Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
284 end_of_buffer_offset_ += buffer_.size();
285 if (!status.ok()) {
286 buffer_.clear();
287 ReportDrop(kBlockSize, status);
288 read_error_ = true;
289 *error = kEof;
290 return false;
11fdf7f2 291 } else if (buffer_.size() < static_cast<size_t>(kBlockSize)) {
7c673cae
FG
292 eof_ = true;
293 eof_offset_ = buffer_.size();
294 }
295 return true;
296 } else {
297 // Note that if buffer_ is non-empty, we have a truncated header at the
298 // end of the file, which can be caused by the writer crashing in the
299 // middle of writing the header. Unless explicitly requested we don't
300 // considering this an error, just report EOF.
301 if (buffer_.size()) {
302 *drop_size = buffer_.size();
303 buffer_.clear();
304 *error = kBadHeader;
305 return false;
306 }
307 buffer_.clear();
308 *error = kEof;
309 return false;
310 }
311}
312
313unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
314 while (true) {
315 // We need at least the minimum header size
11fdf7f2 316 if (buffer_.size() < static_cast<size_t>(kHeaderSize)) {
494da23a
TL
317 // the default value of r is meaningless because ReadMore will overwrite
318 // it if it returns false; in case it returns true, the return value will
319 // not be used anyway
320 int r = kEof;
7c673cae
FG
321 if (!ReadMore(drop_size, &r)) {
322 return r;
323 }
324 continue;
325 }
326
327 // Parse the header
328 const char* header = buffer_.data();
329 const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
330 const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
331 const unsigned int type = header[6];
332 const uint32_t length = a | (b << 8);
333 int header_size = kHeaderSize;
334 if (type >= kRecyclableFullType && type <= kRecyclableLastType) {
335 if (end_of_buffer_offset_ - buffer_.size() == 0) {
336 recycled_ = true;
337 }
338 header_size = kRecyclableHeaderSize;
339 // We need enough for the larger header
11fdf7f2 340 if (buffer_.size() < static_cast<size_t>(kRecyclableHeaderSize)) {
494da23a 341 int r = kEof;
7c673cae
FG
342 if (!ReadMore(drop_size, &r)) {
343 return r;
344 }
345 continue;
346 }
347 const uint32_t log_num = DecodeFixed32(header + 7);
348 if (log_num != log_number_) {
349 return kOldRecord;
350 }
351 }
352 if (header_size + length > buffer_.size()) {
353 *drop_size = buffer_.size();
354 buffer_.clear();
355 if (!eof_) {
356 return kBadRecordLen;
357 }
494da23a
TL
358 // If the end of the file has been reached without reading |length|
359 // bytes of payload, assume the writer died in the middle of writing the
360 // record. Don't report a corruption unless requested.
7c673cae
FG
361 if (*drop_size) {
362 return kBadHeader;
363 }
364 return kEof;
365 }
366
367 if (type == kZeroType && length == 0) {
368 // Skip zero length record without reporting any drops since
369 // such records are produced by the mmap based writing code in
370 // env_posix.cc that preallocates file regions.
371 // NOTE: this should never happen in DB written by new RocksDB versions,
372 // since we turn off mmap writes to manifest and log files
373 buffer_.clear();
374 return kBadRecord;
375 }
376
377 // Check crc
378 if (checksum_) {
379 uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
380 uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6);
381 if (actual_crc != expected_crc) {
382 // Drop the rest of the buffer since "length" itself may have
383 // been corrupted and if we trust it, we could find some
384 // fragment of a real log record that just happens to look
385 // like a valid log record.
386 *drop_size = buffer_.size();
387 buffer_.clear();
388 return kBadRecordChecksum;
389 }
390 }
391
392 buffer_.remove_prefix(header_size + length);
393
7c673cae
FG
394 *result = Slice(header + header_size, length);
395 return type;
396 }
397}
398
494da23a
TL
399bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch,
400 WALRecoveryMode /*unused*/) {
401 assert(record != nullptr);
402 assert(scratch != nullptr);
403 record->clear();
404 scratch->clear();
405
406 uint64_t prospective_record_offset = 0;
407 uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
408 size_t drop_size = 0;
409 unsigned int fragment_type_or_err = 0; // Initialize to make compiler happy
410 Slice fragment;
411 while (TryReadFragment(&fragment, &drop_size, &fragment_type_or_err)) {
412 switch (fragment_type_or_err) {
413 case kFullType:
414 case kRecyclableFullType:
415 if (in_fragmented_record_ && !fragments_.empty()) {
416 ReportCorruption(fragments_.size(), "partial record without end(1)");
417 }
418 fragments_.clear();
419 *record = fragment;
420 prospective_record_offset = physical_record_offset;
421 last_record_offset_ = prospective_record_offset;
422 in_fragmented_record_ = false;
423 return true;
424
425 case kFirstType:
426 case kRecyclableFirstType:
427 if (in_fragmented_record_ || !fragments_.empty()) {
428 ReportCorruption(fragments_.size(), "partial record without end(2)");
429 }
430 prospective_record_offset = physical_record_offset;
431 fragments_.assign(fragment.data(), fragment.size());
432 in_fragmented_record_ = true;
433 break;
434
435 case kMiddleType:
436 case kRecyclableMiddleType:
437 if (!in_fragmented_record_) {
438 ReportCorruption(fragment.size(),
439 "missing start of fragmented record(1)");
440 } else {
441 fragments_.append(fragment.data(), fragment.size());
442 }
443 break;
444
445 case kLastType:
446 case kRecyclableLastType:
447 if (!in_fragmented_record_) {
448 ReportCorruption(fragment.size(),
449 "missing start of fragmented record(2)");
450 } else {
451 fragments_.append(fragment.data(), fragment.size());
452 scratch->assign(fragments_.data(), fragments_.size());
453 fragments_.clear();
454 *record = Slice(*scratch);
455 last_record_offset_ = prospective_record_offset;
456 in_fragmented_record_ = false;
457 return true;
458 }
459 break;
460
461 case kBadHeader:
462 case kBadRecord:
463 case kEof:
464 case kOldRecord:
465 if (in_fragmented_record_) {
466 ReportCorruption(fragments_.size(), "error in middle of record");
467 in_fragmented_record_ = false;
468 fragments_.clear();
469 }
470 break;
471
472 case kBadRecordChecksum:
473 if (recycled_) {
474 fragments_.clear();
475 return false;
476 }
477 ReportCorruption(drop_size, "checksum mismatch");
478 if (in_fragmented_record_) {
479 ReportCorruption(fragments_.size(), "error in middle of record");
480 in_fragmented_record_ = false;
481 fragments_.clear();
482 }
483 break;
484
485 default: {
486 char buf[40];
487 snprintf(buf, sizeof(buf), "unknown record type %u",
488 fragment_type_or_err);
489 ReportCorruption(
490 fragment.size() + (in_fragmented_record_ ? fragments_.size() : 0),
491 buf);
492 in_fragmented_record_ = false;
493 fragments_.clear();
494 break;
495 }
496 }
497 }
498 return false;
499}
500
501void FragmentBufferedReader::UnmarkEOF() {
502 if (read_error_) {
503 return;
504 }
505 eof_ = false;
506 UnmarkEOFInternal();
507}
508
509bool FragmentBufferedReader::TryReadMore(size_t* drop_size, int* error) {
510 if (!eof_ && !read_error_) {
511 // Last read was a full read, so this is a trailer to skip
512 buffer_.clear();
513 Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
514 end_of_buffer_offset_ += buffer_.size();
515 if (!status.ok()) {
516 buffer_.clear();
517 ReportDrop(kBlockSize, status);
518 read_error_ = true;
519 *error = kEof;
520 return false;
521 } else if (buffer_.size() < static_cast<size_t>(kBlockSize)) {
522 eof_ = true;
523 eof_offset_ = buffer_.size();
524 TEST_SYNC_POINT_CALLBACK(
525 "FragmentBufferedLogReader::TryReadMore:FirstEOF", nullptr);
526 }
527 return true;
528 } else if (!read_error_) {
529 UnmarkEOF();
530 }
531 if (!read_error_) {
532 return true;
533 }
534 *error = kEof;
535 *drop_size = buffer_.size();
536 if (buffer_.size() > 0) {
537 *error = kBadHeader;
538 }
539 buffer_.clear();
540 return false;
541}
542
543// return true if the caller should process the fragment_type_or_err.
544bool FragmentBufferedReader::TryReadFragment(
545 Slice* fragment, size_t* drop_size, unsigned int* fragment_type_or_err) {
546 assert(fragment != nullptr);
547 assert(drop_size != nullptr);
548 assert(fragment_type_or_err != nullptr);
549
550 while (buffer_.size() < static_cast<size_t>(kHeaderSize)) {
551 size_t old_size = buffer_.size();
552 int error = kEof;
553 if (!TryReadMore(drop_size, &error)) {
554 *fragment_type_or_err = error;
555 return false;
556 } else if (old_size == buffer_.size()) {
557 return false;
558 }
559 }
560 const char* header = buffer_.data();
561 const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
562 const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
563 const unsigned int type = header[6];
564 const uint32_t length = a | (b << 8);
565 int header_size = kHeaderSize;
566 if (type >= kRecyclableFullType && type <= kRecyclableLastType) {
567 if (end_of_buffer_offset_ - buffer_.size() == 0) {
568 recycled_ = true;
569 }
570 header_size = kRecyclableHeaderSize;
571 while (buffer_.size() < static_cast<size_t>(kRecyclableHeaderSize)) {
572 size_t old_size = buffer_.size();
573 int error = kEof;
574 if (!TryReadMore(drop_size, &error)) {
575 *fragment_type_or_err = error;
576 return false;
577 } else if (old_size == buffer_.size()) {
578 return false;
579 }
580 }
581 const uint32_t log_num = DecodeFixed32(header + 7);
582 if (log_num != log_number_) {
583 *fragment_type_or_err = kOldRecord;
584 return true;
585 }
586 }
587
588 while (header_size + length > buffer_.size()) {
589 size_t old_size = buffer_.size();
590 int error = kEof;
591 if (!TryReadMore(drop_size, &error)) {
592 *fragment_type_or_err = error;
593 return false;
594 } else if (old_size == buffer_.size()) {
595 return false;
596 }
597 }
598
599 if (type == kZeroType && length == 0) {
600 buffer_.clear();
601 *fragment_type_or_err = kBadRecord;
602 return true;
603 }
604
605 if (checksum_) {
606 uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
607 uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6);
608 if (actual_crc != expected_crc) {
609 *drop_size = buffer_.size();
610 buffer_.clear();
611 *fragment_type_or_err = kBadRecordChecksum;
612 return true;
613 }
614 }
615
616 buffer_.remove_prefix(header_size + length);
617
618 *fragment = Slice(header + header_size, length);
619 *fragment_type_or_err = type;
620 return true;
621}
622
7c673cae 623} // namespace log
f67539c2 624} // namespace ROCKSDB_NAMESPACE