]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/log_reader.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / log_reader.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/log_reader.h"
11
12 #include <stdio.h>
13 #include "rocksdb/env.h"
14 #include "util/coding.h"
15 #include "util/crc32c.h"
16 #include "util/file_reader_writer.h"
17 #include "util/util.h"
18
19 namespace rocksdb {
20 namespace log {
21
22 Reader::Reporter::~Reporter() {
23 }
24
25 Reader::Reader(std::shared_ptr<Logger> info_log,
26 std::unique_ptr<SequentialFileReader>&& _file,
27 Reporter* reporter, bool checksum, uint64_t log_num)
28 : info_log_(info_log),
29 file_(std::move(_file)),
30 reporter_(reporter),
31 checksum_(checksum),
32 backing_store_(new char[kBlockSize]),
33 buffer_(),
34 eof_(false),
35 read_error_(false),
36 eof_offset_(0),
37 last_record_offset_(0),
38 end_of_buffer_offset_(0),
39 log_number_(log_num),
40 recycled_(false) {}
41
42 Reader::~Reader() {
43 delete[] backing_store_;
44 }
45
46 // For kAbsoluteConsistency, on clean shutdown we don't expect any error
47 // in the log files. For other modes, we can ignore only incomplete records
48 // in the last log file, which are presumably due to a write in progress
49 // during restart (or from log recycling).
50 //
51 // TODO krad: Evaluate if we need to move to a more strict mode where we
52 // restrict the inconsistency to only the last log
53 bool Reader::ReadRecord(Slice* record, std::string* scratch,
54 WALRecoveryMode wal_recovery_mode) {
55 scratch->clear();
56 record->clear();
57 bool in_fragmented_record = false;
58 // Record offset of the logical record that we're reading
59 // 0 is a dummy value to make compilers happy
60 uint64_t prospective_record_offset = 0;
61
62 Slice fragment;
63 while (true) {
64 uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
65 size_t drop_size = 0;
66 const unsigned int record_type = ReadPhysicalRecord(&fragment, &drop_size);
67 switch (record_type) {
68 case kFullType:
69 case kRecyclableFullType:
70 if (in_fragmented_record && !scratch->empty()) {
71 // Handle bug in earlier versions of log::Writer where
72 // it could emit an empty kFirstType record at the tail end
73 // of a block followed by a kFullType or kFirstType record
74 // at the beginning of the next block.
75 ReportCorruption(scratch->size(), "partial record without end(1)");
76 }
77 prospective_record_offset = physical_record_offset;
78 scratch->clear();
79 *record = fragment;
80 last_record_offset_ = prospective_record_offset;
81 return true;
82
83 case kFirstType:
84 case kRecyclableFirstType:
85 if (in_fragmented_record && !scratch->empty()) {
86 // Handle bug in earlier versions of log::Writer where
87 // it could emit an empty kFirstType record at the tail end
88 // of a block followed by a kFullType or kFirstType record
89 // at the beginning of the next block.
90 ReportCorruption(scratch->size(), "partial record without end(2)");
91 }
92 prospective_record_offset = physical_record_offset;
93 scratch->assign(fragment.data(), fragment.size());
94 in_fragmented_record = true;
95 break;
96
97 case kMiddleType:
98 case kRecyclableMiddleType:
99 if (!in_fragmented_record) {
100 ReportCorruption(fragment.size(),
101 "missing start of fragmented record(1)");
102 } else {
103 scratch->append(fragment.data(), fragment.size());
104 }
105 break;
106
107 case kLastType:
108 case kRecyclableLastType:
109 if (!in_fragmented_record) {
110 ReportCorruption(fragment.size(),
111 "missing start of fragmented record(2)");
112 } else {
113 scratch->append(fragment.data(), fragment.size());
114 *record = Slice(*scratch);
115 last_record_offset_ = prospective_record_offset;
116 return true;
117 }
118 break;
119
120 case kBadHeader:
121 if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {
122 // in clean shutdown we don't expect any error in the log files
123 ReportCorruption(drop_size, "truncated header");
124 }
125 FALLTHROUGH_INTENDED;
126
127 case kEof:
128 if (in_fragmented_record) {
129 if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {
130 // in clean shutdown we don't expect any error in the log files
131 ReportCorruption(scratch->size(), "error reading trailing data");
132 }
133 // This can be caused by the writer dying immediately after
134 // writing a physical record but before completing the next; don't
135 // treat it as a corruption, just ignore the entire logical record.
136 scratch->clear();
137 }
138 return false;
139
140 case kOldRecord:
141 if (wal_recovery_mode != WALRecoveryMode::kSkipAnyCorruptedRecords) {
142 // Treat a record from a previous instance of the log as EOF.
143 if (in_fragmented_record) {
144 if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {
145 // in clean shutdown we don't expect any error in the log files
146 ReportCorruption(scratch->size(), "error reading trailing data");
147 }
148 // This can be caused by the writer dying immediately after
149 // writing a physical record but before completing the next; don't
150 // treat it as a corruption, just ignore the entire logical record.
151 scratch->clear();
152 }
153 return false;
154 }
155 FALLTHROUGH_INTENDED;
156
157 case kBadRecord:
158 if (in_fragmented_record) {
159 ReportCorruption(scratch->size(), "error in middle of record");
160 in_fragmented_record = false;
161 scratch->clear();
162 }
163 break;
164
165 case kBadRecordLen:
166 case kBadRecordChecksum:
167 if (recycled_ &&
168 wal_recovery_mode ==
169 WALRecoveryMode::kTolerateCorruptedTailRecords) {
170 scratch->clear();
171 return false;
172 }
173 if (record_type == kBadRecordLen) {
174 ReportCorruption(drop_size, "bad record length");
175 } else {
176 ReportCorruption(drop_size, "checksum mismatch");
177 }
178 if (in_fragmented_record) {
179 ReportCorruption(scratch->size(), "error in middle of record");
180 in_fragmented_record = false;
181 scratch->clear();
182 }
183 break;
184
185 default: {
186 char buf[40];
187 snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
188 ReportCorruption(
189 (fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
190 buf);
191 in_fragmented_record = false;
192 scratch->clear();
193 break;
194 }
195 }
196 }
197 return false;
198 }
199
200 uint64_t Reader::LastRecordOffset() {
201 return last_record_offset_;
202 }
203
204 void Reader::UnmarkEOF() {
205 if (read_error_) {
206 return;
207 }
208 eof_ = false;
209 if (eof_offset_ == 0) {
210 return;
211 }
212 UnmarkEOFInternal();
213 }
214
215 void Reader::UnmarkEOFInternal() {
216 // If the EOF was in the middle of a block (a partial block was read) we have
217 // to read the rest of the block as ReadPhysicalRecord can only read full
218 // blocks and expects the file position indicator to be aligned to the start
219 // of a block.
220 //
221 // consumed_bytes + buffer_size() + remaining == kBlockSize
222
223 size_t consumed_bytes = eof_offset_ - buffer_.size();
224 size_t remaining = kBlockSize - eof_offset_;
225
226 // backing_store_ is used to concatenate what is left in buffer_ and
227 // the remainder of the block. If buffer_ already uses backing_store_,
228 // we just append the new data.
229 if (buffer_.data() != backing_store_ + consumed_bytes) {
230 // Buffer_ does not use backing_store_ for storage.
231 // Copy what is left in buffer_ to backing_store.
232 memmove(backing_store_ + consumed_bytes, buffer_.data(), buffer_.size());
233 }
234
235 Slice read_buffer;
236 Status status = file_->Read(remaining, &read_buffer,
237 backing_store_ + eof_offset_);
238
239 size_t added = read_buffer.size();
240 end_of_buffer_offset_ += added;
241
242 if (!status.ok()) {
243 if (added > 0) {
244 ReportDrop(added, status);
245 }
246
247 read_error_ = true;
248 return;
249 }
250
251 if (read_buffer.data() != backing_store_ + eof_offset_) {
252 // Read did not write to backing_store_
253 memmove(backing_store_ + eof_offset_, read_buffer.data(),
254 read_buffer.size());
255 }
256
257 buffer_ = Slice(backing_store_ + consumed_bytes,
258 eof_offset_ + added - consumed_bytes);
259
260 if (added < remaining) {
261 eof_ = true;
262 eof_offset_ += added;
263 } else {
264 eof_offset_ = 0;
265 }
266 }
267
268 void Reader::ReportCorruption(size_t bytes, const char* reason) {
269 ReportDrop(bytes, Status::Corruption(reason));
270 }
271
272 void Reader::ReportDrop(size_t bytes, const Status& reason) {
273 if (reporter_ != nullptr) {
274 reporter_->Corruption(bytes, reason);
275 }
276 }
277
278 bool Reader::ReadMore(size_t* drop_size, int *error) {
279 if (!eof_ && !read_error_) {
280 // Last read was a full read, so this is a trailer to skip
281 buffer_.clear();
282 Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
283 end_of_buffer_offset_ += buffer_.size();
284 if (!status.ok()) {
285 buffer_.clear();
286 ReportDrop(kBlockSize, status);
287 read_error_ = true;
288 *error = kEof;
289 return false;
290 } else if (buffer_.size() < static_cast<size_t>(kBlockSize)) {
291 eof_ = true;
292 eof_offset_ = buffer_.size();
293 }
294 return true;
295 } else {
296 // Note that if buffer_ is non-empty, we have a truncated header at the
297 // end of the file, which can be caused by the writer crashing in the
298 // middle of writing the header. Unless explicitly requested we don't
299 // considering this an error, just report EOF.
300 if (buffer_.size()) {
301 *drop_size = buffer_.size();
302 buffer_.clear();
303 *error = kBadHeader;
304 return false;
305 }
306 buffer_.clear();
307 *error = kEof;
308 return false;
309 }
310 }
311
312 unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
313 while (true) {
314 // We need at least the minimum header size
315 if (buffer_.size() < static_cast<size_t>(kHeaderSize)) {
316 // the default value of r is meaningless because ReadMore will overwrite
317 // it if it returns false; in case it returns true, the return value will
318 // not be used anyway
319 int r = kEof;
320 if (!ReadMore(drop_size, &r)) {
321 return r;
322 }
323 continue;
324 }
325
326 // Parse the header
327 const char* header = buffer_.data();
328 const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
329 const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
330 const unsigned int type = header[6];
331 const uint32_t length = a | (b << 8);
332 int header_size = kHeaderSize;
333 if (type >= kRecyclableFullType && type <= kRecyclableLastType) {
334 if (end_of_buffer_offset_ - buffer_.size() == 0) {
335 recycled_ = true;
336 }
337 header_size = kRecyclableHeaderSize;
338 // We need enough for the larger header
339 if (buffer_.size() < static_cast<size_t>(kRecyclableHeaderSize)) {
340 int r = kEof;
341 if (!ReadMore(drop_size, &r)) {
342 return r;
343 }
344 continue;
345 }
346 const uint32_t log_num = DecodeFixed32(header + 7);
347 if (log_num != log_number_) {
348 return kOldRecord;
349 }
350 }
351 if (header_size + length > buffer_.size()) {
352 *drop_size = buffer_.size();
353 buffer_.clear();
354 if (!eof_) {
355 return kBadRecordLen;
356 }
357 // If the end of the file has been reached without reading |length|
358 // bytes of payload, assume the writer died in the middle of writing the
359 // record. Don't report a corruption unless requested.
360 if (*drop_size) {
361 return kBadHeader;
362 }
363 return kEof;
364 }
365
366 if (type == kZeroType && length == 0) {
367 // Skip zero length record without reporting any drops since
368 // such records are produced by the mmap based writing code in
369 // env_posix.cc that preallocates file regions.
370 // NOTE: this should never happen in DB written by new RocksDB versions,
371 // since we turn off mmap writes to manifest and log files
372 buffer_.clear();
373 return kBadRecord;
374 }
375
376 // Check crc
377 if (checksum_) {
378 uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
379 uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6);
380 if (actual_crc != expected_crc) {
381 // Drop the rest of the buffer since "length" itself may have
382 // been corrupted and if we trust it, we could find some
383 // fragment of a real log record that just happens to look
384 // like a valid log record.
385 *drop_size = buffer_.size();
386 buffer_.clear();
387 return kBadRecordChecksum;
388 }
389 }
390
391 buffer_.remove_prefix(header_size + length);
392
393 *result = Slice(header + header_size, length);
394 return type;
395 }
396 }
397
398 bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch,
399 WALRecoveryMode /*unused*/) {
400 assert(record != nullptr);
401 assert(scratch != nullptr);
402 record->clear();
403 scratch->clear();
404
405 uint64_t prospective_record_offset = 0;
406 uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
407 size_t drop_size = 0;
408 unsigned int fragment_type_or_err = 0; // Initialize to make compiler happy
409 Slice fragment;
410 while (TryReadFragment(&fragment, &drop_size, &fragment_type_or_err)) {
411 switch (fragment_type_or_err) {
412 case kFullType:
413 case kRecyclableFullType:
414 if (in_fragmented_record_ && !fragments_.empty()) {
415 ReportCorruption(fragments_.size(), "partial record without end(1)");
416 }
417 fragments_.clear();
418 *record = fragment;
419 prospective_record_offset = physical_record_offset;
420 last_record_offset_ = prospective_record_offset;
421 in_fragmented_record_ = false;
422 return true;
423
424 case kFirstType:
425 case kRecyclableFirstType:
426 if (in_fragmented_record_ || !fragments_.empty()) {
427 ReportCorruption(fragments_.size(), "partial record without end(2)");
428 }
429 prospective_record_offset = physical_record_offset;
430 fragments_.assign(fragment.data(), fragment.size());
431 in_fragmented_record_ = true;
432 break;
433
434 case kMiddleType:
435 case kRecyclableMiddleType:
436 if (!in_fragmented_record_) {
437 ReportCorruption(fragment.size(),
438 "missing start of fragmented record(1)");
439 } else {
440 fragments_.append(fragment.data(), fragment.size());
441 }
442 break;
443
444 case kLastType:
445 case kRecyclableLastType:
446 if (!in_fragmented_record_) {
447 ReportCorruption(fragment.size(),
448 "missing start of fragmented record(2)");
449 } else {
450 fragments_.append(fragment.data(), fragment.size());
451 scratch->assign(fragments_.data(), fragments_.size());
452 fragments_.clear();
453 *record = Slice(*scratch);
454 last_record_offset_ = prospective_record_offset;
455 in_fragmented_record_ = false;
456 return true;
457 }
458 break;
459
460 case kBadHeader:
461 case kBadRecord:
462 case kEof:
463 case kOldRecord:
464 if (in_fragmented_record_) {
465 ReportCorruption(fragments_.size(), "error in middle of record");
466 in_fragmented_record_ = false;
467 fragments_.clear();
468 }
469 break;
470
471 case kBadRecordChecksum:
472 if (recycled_) {
473 fragments_.clear();
474 return false;
475 }
476 ReportCorruption(drop_size, "checksum mismatch");
477 if (in_fragmented_record_) {
478 ReportCorruption(fragments_.size(), "error in middle of record");
479 in_fragmented_record_ = false;
480 fragments_.clear();
481 }
482 break;
483
484 default: {
485 char buf[40];
486 snprintf(buf, sizeof(buf), "unknown record type %u",
487 fragment_type_or_err);
488 ReportCorruption(
489 fragment.size() + (in_fragmented_record_ ? fragments_.size() : 0),
490 buf);
491 in_fragmented_record_ = false;
492 fragments_.clear();
493 break;
494 }
495 }
496 }
497 return false;
498 }
499
500 void FragmentBufferedReader::UnmarkEOF() {
501 if (read_error_) {
502 return;
503 }
504 eof_ = false;
505 UnmarkEOFInternal();
506 }
507
508 bool FragmentBufferedReader::TryReadMore(size_t* drop_size, int* error) {
509 if (!eof_ && !read_error_) {
510 // Last read was a full read, so this is a trailer to skip
511 buffer_.clear();
512 Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
513 end_of_buffer_offset_ += buffer_.size();
514 if (!status.ok()) {
515 buffer_.clear();
516 ReportDrop(kBlockSize, status);
517 read_error_ = true;
518 *error = kEof;
519 return false;
520 } else if (buffer_.size() < static_cast<size_t>(kBlockSize)) {
521 eof_ = true;
522 eof_offset_ = buffer_.size();
523 TEST_SYNC_POINT_CALLBACK(
524 "FragmentBufferedLogReader::TryReadMore:FirstEOF", nullptr);
525 }
526 return true;
527 } else if (!read_error_) {
528 UnmarkEOF();
529 }
530 if (!read_error_) {
531 return true;
532 }
533 *error = kEof;
534 *drop_size = buffer_.size();
535 if (buffer_.size() > 0) {
536 *error = kBadHeader;
537 }
538 buffer_.clear();
539 return false;
540 }
541
542 // return true if the caller should process the fragment_type_or_err.
543 bool FragmentBufferedReader::TryReadFragment(
544 Slice* fragment, size_t* drop_size, unsigned int* fragment_type_or_err) {
545 assert(fragment != nullptr);
546 assert(drop_size != nullptr);
547 assert(fragment_type_or_err != nullptr);
548
549 while (buffer_.size() < static_cast<size_t>(kHeaderSize)) {
550 size_t old_size = buffer_.size();
551 int error = kEof;
552 if (!TryReadMore(drop_size, &error)) {
553 *fragment_type_or_err = error;
554 return false;
555 } else if (old_size == buffer_.size()) {
556 return false;
557 }
558 }
559 const char* header = buffer_.data();
560 const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
561 const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
562 const unsigned int type = header[6];
563 const uint32_t length = a | (b << 8);
564 int header_size = kHeaderSize;
565 if (type >= kRecyclableFullType && type <= kRecyclableLastType) {
566 if (end_of_buffer_offset_ - buffer_.size() == 0) {
567 recycled_ = true;
568 }
569 header_size = kRecyclableHeaderSize;
570 while (buffer_.size() < static_cast<size_t>(kRecyclableHeaderSize)) {
571 size_t old_size = buffer_.size();
572 int error = kEof;
573 if (!TryReadMore(drop_size, &error)) {
574 *fragment_type_or_err = error;
575 return false;
576 } else if (old_size == buffer_.size()) {
577 return false;
578 }
579 }
580 const uint32_t log_num = DecodeFixed32(header + 7);
581 if (log_num != log_number_) {
582 *fragment_type_or_err = kOldRecord;
583 return true;
584 }
585 }
586
587 while (header_size + length > buffer_.size()) {
588 size_t old_size = buffer_.size();
589 int error = kEof;
590 if (!TryReadMore(drop_size, &error)) {
591 *fragment_type_or_err = error;
592 return false;
593 } else if (old_size == buffer_.size()) {
594 return false;
595 }
596 }
597
598 if (type == kZeroType && length == 0) {
599 buffer_.clear();
600 *fragment_type_or_err = kBadRecord;
601 return true;
602 }
603
604 if (checksum_) {
605 uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
606 uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6);
607 if (actual_crc != expected_crc) {
608 *drop_size = buffer_.size();
609 buffer_.clear();
610 *fragment_type_or_err = kBadRecordChecksum;
611 return true;
612 }
613 }
614
615 buffer_.remove_prefix(header_size + length);
616
617 *fragment = Slice(header + header_size, length);
618 *fragment_type_or_err = type;
619 return true;
620 }
621
622 } // namespace log
623 } // namespace rocksdb