]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | ||
10 | #include "db/log_reader.h" | |
11 | ||
12 | #include <stdio.h> | |
f67539c2 | 13 | #include "file/sequence_file_reader.h" |
7c673cae | 14 | #include "rocksdb/env.h" |
f67539c2 | 15 | #include "test_util/sync_point.h" |
7c673cae FG |
16 | #include "util/coding.h" |
17 | #include "util/crc32c.h" | |
11fdf7f2 | 18 | #include "util/util.h" |
7c673cae | 19 | |
f67539c2 | 20 | namespace ROCKSDB_NAMESPACE { |
7c673cae FG |
21 | namespace log { |
22 | ||
23 | Reader::Reporter::~Reporter() { | |
24 | } | |
25 | ||
26 | Reader::Reader(std::shared_ptr<Logger> info_log, | |
494da23a TL |
27 | std::unique_ptr<SequentialFileReader>&& _file, |
28 | Reporter* reporter, bool checksum, uint64_t log_num) | |
7c673cae FG |
29 | : info_log_(info_log), |
30 | file_(std::move(_file)), | |
31 | reporter_(reporter), | |
32 | checksum_(checksum), | |
33 | backing_store_(new char[kBlockSize]), | |
34 | buffer_(), | |
35 | eof_(false), | |
36 | read_error_(false), | |
37 | eof_offset_(0), | |
38 | last_record_offset_(0), | |
39 | end_of_buffer_offset_(0), | |
7c673cae FG |
40 | log_number_(log_num), |
41 | recycled_(false) {} | |
42 | ||
43 | Reader::~Reader() { | |
44 | delete[] backing_store_; | |
45 | } | |
46 | ||
7c673cae FG |
47 | // For kAbsoluteConsistency, on clean shutdown we don't expect any error |
48 | // in the log files. For other modes, we can ignore only incomplete records | |
49 | // in the last log file, which are presumably due to a write in progress | |
50 | // during restart (or from log recycling). | |
51 | // | |
52 | // TODO krad: Evaluate if we need to move to a more strict mode where we | |
53 | // restrict the inconsistency to only the last log | |
54 | bool Reader::ReadRecord(Slice* record, std::string* scratch, | |
55 | WALRecoveryMode wal_recovery_mode) { | |
7c673cae FG |
56 | scratch->clear(); |
57 | record->clear(); | |
58 | bool in_fragmented_record = false; | |
59 | // Record offset of the logical record that we're reading | |
60 | // 0 is a dummy value to make compilers happy | |
61 | uint64_t prospective_record_offset = 0; | |
62 | ||
63 | Slice fragment; | |
64 | while (true) { | |
65 | uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size(); | |
66 | size_t drop_size = 0; | |
67 | const unsigned int record_type = ReadPhysicalRecord(&fragment, &drop_size); | |
68 | switch (record_type) { | |
69 | case kFullType: | |
70 | case kRecyclableFullType: | |
71 | if (in_fragmented_record && !scratch->empty()) { | |
72 | // Handle bug in earlier versions of log::Writer where | |
73 | // it could emit an empty kFirstType record at the tail end | |
74 | // of a block followed by a kFullType or kFirstType record | |
75 | // at the beginning of the next block. | |
76 | ReportCorruption(scratch->size(), "partial record without end(1)"); | |
77 | } | |
78 | prospective_record_offset = physical_record_offset; | |
79 | scratch->clear(); | |
80 | *record = fragment; | |
81 | last_record_offset_ = prospective_record_offset; | |
82 | return true; | |
83 | ||
84 | case kFirstType: | |
85 | case kRecyclableFirstType: | |
86 | if (in_fragmented_record && !scratch->empty()) { | |
87 | // Handle bug in earlier versions of log::Writer where | |
88 | // it could emit an empty kFirstType record at the tail end | |
89 | // of a block followed by a kFullType or kFirstType record | |
90 | // at the beginning of the next block. | |
91 | ReportCorruption(scratch->size(), "partial record without end(2)"); | |
92 | } | |
93 | prospective_record_offset = physical_record_offset; | |
94 | scratch->assign(fragment.data(), fragment.size()); | |
95 | in_fragmented_record = true; | |
96 | break; | |
97 | ||
98 | case kMiddleType: | |
99 | case kRecyclableMiddleType: | |
100 | if (!in_fragmented_record) { | |
101 | ReportCorruption(fragment.size(), | |
102 | "missing start of fragmented record(1)"); | |
103 | } else { | |
104 | scratch->append(fragment.data(), fragment.size()); | |
105 | } | |
106 | break; | |
107 | ||
108 | case kLastType: | |
109 | case kRecyclableLastType: | |
110 | if (!in_fragmented_record) { | |
111 | ReportCorruption(fragment.size(), | |
112 | "missing start of fragmented record(2)"); | |
113 | } else { | |
114 | scratch->append(fragment.data(), fragment.size()); | |
115 | *record = Slice(*scratch); | |
116 | last_record_offset_ = prospective_record_offset; | |
117 | return true; | |
118 | } | |
119 | break; | |
120 | ||
121 | case kBadHeader: | |
122 | if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) { | |
123 | // in clean shutdown we don't expect any error in the log files | |
124 | ReportCorruption(drop_size, "truncated header"); | |
125 | } | |
494da23a | 126 | FALLTHROUGH_INTENDED; |
7c673cae FG |
127 | |
128 | case kEof: | |
129 | if (in_fragmented_record) { | |
130 | if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) { | |
131 | // in clean shutdown we don't expect any error in the log files | |
132 | ReportCorruption(scratch->size(), "error reading trailing data"); | |
133 | } | |
134 | // This can be caused by the writer dying immediately after | |
135 | // writing a physical record but before completing the next; don't | |
136 | // treat it as a corruption, just ignore the entire logical record. | |
137 | scratch->clear(); | |
138 | } | |
139 | return false; | |
140 | ||
141 | case kOldRecord: | |
142 | if (wal_recovery_mode != WALRecoveryMode::kSkipAnyCorruptedRecords) { | |
143 | // Treat a record from a previous instance of the log as EOF. | |
144 | if (in_fragmented_record) { | |
145 | if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) { | |
146 | // in clean shutdown we don't expect any error in the log files | |
147 | ReportCorruption(scratch->size(), "error reading trailing data"); | |
148 | } | |
149 | // This can be caused by the writer dying immediately after | |
150 | // writing a physical record but before completing the next; don't | |
151 | // treat it as a corruption, just ignore the entire logical record. | |
152 | scratch->clear(); | |
153 | } | |
154 | return false; | |
155 | } | |
494da23a | 156 | FALLTHROUGH_INTENDED; |
7c673cae FG |
157 | |
158 | case kBadRecord: | |
159 | if (in_fragmented_record) { | |
160 | ReportCorruption(scratch->size(), "error in middle of record"); | |
161 | in_fragmented_record = false; | |
162 | scratch->clear(); | |
163 | } | |
164 | break; | |
165 | ||
166 | case kBadRecordLen: | |
167 | case kBadRecordChecksum: | |
168 | if (recycled_ && | |
169 | wal_recovery_mode == | |
170 | WALRecoveryMode::kTolerateCorruptedTailRecords) { | |
171 | scratch->clear(); | |
172 | return false; | |
173 | } | |
174 | if (record_type == kBadRecordLen) { | |
175 | ReportCorruption(drop_size, "bad record length"); | |
176 | } else { | |
177 | ReportCorruption(drop_size, "checksum mismatch"); | |
178 | } | |
179 | if (in_fragmented_record) { | |
180 | ReportCorruption(scratch->size(), "error in middle of record"); | |
181 | in_fragmented_record = false; | |
182 | scratch->clear(); | |
183 | } | |
184 | break; | |
185 | ||
186 | default: { | |
187 | char buf[40]; | |
188 | snprintf(buf, sizeof(buf), "unknown record type %u", record_type); | |
189 | ReportCorruption( | |
190 | (fragment.size() + (in_fragmented_record ? scratch->size() : 0)), | |
191 | buf); | |
192 | in_fragmented_record = false; | |
193 | scratch->clear(); | |
194 | break; | |
195 | } | |
196 | } | |
197 | } | |
198 | return false; | |
199 | } | |
200 | ||
201 | uint64_t Reader::LastRecordOffset() { | |
202 | return last_record_offset_; | |
203 | } | |
204 | ||
205 | void Reader::UnmarkEOF() { | |
206 | if (read_error_) { | |
207 | return; | |
208 | } | |
7c673cae | 209 | eof_ = false; |
7c673cae FG |
210 | if (eof_offset_ == 0) { |
211 | return; | |
212 | } | |
494da23a TL |
213 | UnmarkEOFInternal(); |
214 | } | |
7c673cae | 215 | |
494da23a | 216 | void Reader::UnmarkEOFInternal() { |
7c673cae FG |
217 | // If the EOF was in the middle of a block (a partial block was read) we have |
218 | // to read the rest of the block as ReadPhysicalRecord can only read full | |
219 | // blocks and expects the file position indicator to be aligned to the start | |
220 | // of a block. | |
221 | // | |
222 | // consumed_bytes + buffer_size() + remaining == kBlockSize | |
223 | ||
224 | size_t consumed_bytes = eof_offset_ - buffer_.size(); | |
225 | size_t remaining = kBlockSize - eof_offset_; | |
226 | ||
227 | // backing_store_ is used to concatenate what is left in buffer_ and | |
228 | // the remainder of the block. If buffer_ already uses backing_store_, | |
229 | // we just append the new data. | |
230 | if (buffer_.data() != backing_store_ + consumed_bytes) { | |
231 | // Buffer_ does not use backing_store_ for storage. | |
232 | // Copy what is left in buffer_ to backing_store. | |
233 | memmove(backing_store_ + consumed_bytes, buffer_.data(), buffer_.size()); | |
234 | } | |
235 | ||
236 | Slice read_buffer; | |
237 | Status status = file_->Read(remaining, &read_buffer, | |
238 | backing_store_ + eof_offset_); | |
239 | ||
240 | size_t added = read_buffer.size(); | |
241 | end_of_buffer_offset_ += added; | |
242 | ||
243 | if (!status.ok()) { | |
244 | if (added > 0) { | |
245 | ReportDrop(added, status); | |
246 | } | |
247 | ||
248 | read_error_ = true; | |
249 | return; | |
250 | } | |
251 | ||
252 | if (read_buffer.data() != backing_store_ + eof_offset_) { | |
253 | // Read did not write to backing_store_ | |
254 | memmove(backing_store_ + eof_offset_, read_buffer.data(), | |
255 | read_buffer.size()); | |
256 | } | |
257 | ||
258 | buffer_ = Slice(backing_store_ + consumed_bytes, | |
259 | eof_offset_ + added - consumed_bytes); | |
260 | ||
261 | if (added < remaining) { | |
262 | eof_ = true; | |
263 | eof_offset_ += added; | |
264 | } else { | |
265 | eof_offset_ = 0; | |
266 | } | |
267 | } | |
268 | ||
269 | void Reader::ReportCorruption(size_t bytes, const char* reason) { | |
270 | ReportDrop(bytes, Status::Corruption(reason)); | |
271 | } | |
272 | ||
273 | void Reader::ReportDrop(size_t bytes, const Status& reason) { | |
11fdf7f2 | 274 | if (reporter_ != nullptr) { |
7c673cae FG |
275 | reporter_->Corruption(bytes, reason); |
276 | } | |
277 | } | |
278 | ||
279 | bool Reader::ReadMore(size_t* drop_size, int *error) { | |
280 | if (!eof_ && !read_error_) { | |
281 | // Last read was a full read, so this is a trailer to skip | |
282 | buffer_.clear(); | |
283 | Status status = file_->Read(kBlockSize, &buffer_, backing_store_); | |
284 | end_of_buffer_offset_ += buffer_.size(); | |
285 | if (!status.ok()) { | |
286 | buffer_.clear(); | |
287 | ReportDrop(kBlockSize, status); | |
288 | read_error_ = true; | |
289 | *error = kEof; | |
290 | return false; | |
11fdf7f2 | 291 | } else if (buffer_.size() < static_cast<size_t>(kBlockSize)) { |
7c673cae FG |
292 | eof_ = true; |
293 | eof_offset_ = buffer_.size(); | |
294 | } | |
295 | return true; | |
296 | } else { | |
297 | // Note that if buffer_ is non-empty, we have a truncated header at the | |
298 | // end of the file, which can be caused by the writer crashing in the | |
299 | // middle of writing the header. Unless explicitly requested we don't | |
300 | // considering this an error, just report EOF. | |
301 | if (buffer_.size()) { | |
302 | *drop_size = buffer_.size(); | |
303 | buffer_.clear(); | |
304 | *error = kBadHeader; | |
305 | return false; | |
306 | } | |
307 | buffer_.clear(); | |
308 | *error = kEof; | |
309 | return false; | |
310 | } | |
311 | } | |
312 | ||
313 | unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) { | |
314 | while (true) { | |
315 | // We need at least the minimum header size | |
11fdf7f2 | 316 | if (buffer_.size() < static_cast<size_t>(kHeaderSize)) { |
494da23a TL |
317 | // the default value of r is meaningless because ReadMore will overwrite |
318 | // it if it returns false; in case it returns true, the return value will | |
319 | // not be used anyway | |
320 | int r = kEof; | |
7c673cae FG |
321 | if (!ReadMore(drop_size, &r)) { |
322 | return r; | |
323 | } | |
324 | continue; | |
325 | } | |
326 | ||
327 | // Parse the header | |
328 | const char* header = buffer_.data(); | |
329 | const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff; | |
330 | const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff; | |
331 | const unsigned int type = header[6]; | |
332 | const uint32_t length = a | (b << 8); | |
333 | int header_size = kHeaderSize; | |
334 | if (type >= kRecyclableFullType && type <= kRecyclableLastType) { | |
335 | if (end_of_buffer_offset_ - buffer_.size() == 0) { | |
336 | recycled_ = true; | |
337 | } | |
338 | header_size = kRecyclableHeaderSize; | |
339 | // We need enough for the larger header | |
11fdf7f2 | 340 | if (buffer_.size() < static_cast<size_t>(kRecyclableHeaderSize)) { |
494da23a | 341 | int r = kEof; |
7c673cae FG |
342 | if (!ReadMore(drop_size, &r)) { |
343 | return r; | |
344 | } | |
345 | continue; | |
346 | } | |
347 | const uint32_t log_num = DecodeFixed32(header + 7); | |
348 | if (log_num != log_number_) { | |
349 | return kOldRecord; | |
350 | } | |
351 | } | |
352 | if (header_size + length > buffer_.size()) { | |
353 | *drop_size = buffer_.size(); | |
354 | buffer_.clear(); | |
355 | if (!eof_) { | |
356 | return kBadRecordLen; | |
357 | } | |
494da23a TL |
358 | // If the end of the file has been reached without reading |length| |
359 | // bytes of payload, assume the writer died in the middle of writing the | |
360 | // record. Don't report a corruption unless requested. | |
7c673cae FG |
361 | if (*drop_size) { |
362 | return kBadHeader; | |
363 | } | |
364 | return kEof; | |
365 | } | |
366 | ||
367 | if (type == kZeroType && length == 0) { | |
368 | // Skip zero length record without reporting any drops since | |
369 | // such records are produced by the mmap based writing code in | |
370 | // env_posix.cc that preallocates file regions. | |
371 | // NOTE: this should never happen in DB written by new RocksDB versions, | |
372 | // since we turn off mmap writes to manifest and log files | |
373 | buffer_.clear(); | |
374 | return kBadRecord; | |
375 | } | |
376 | ||
377 | // Check crc | |
378 | if (checksum_) { | |
379 | uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header)); | |
380 | uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6); | |
381 | if (actual_crc != expected_crc) { | |
382 | // Drop the rest of the buffer since "length" itself may have | |
383 | // been corrupted and if we trust it, we could find some | |
384 | // fragment of a real log record that just happens to look | |
385 | // like a valid log record. | |
386 | *drop_size = buffer_.size(); | |
387 | buffer_.clear(); | |
388 | return kBadRecordChecksum; | |
389 | } | |
390 | } | |
391 | ||
392 | buffer_.remove_prefix(header_size + length); | |
393 | ||
7c673cae FG |
394 | *result = Slice(header + header_size, length); |
395 | return type; | |
396 | } | |
397 | } | |
398 | ||
494da23a TL |
399 | bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch, |
400 | WALRecoveryMode /*unused*/) { | |
401 | assert(record != nullptr); | |
402 | assert(scratch != nullptr); | |
403 | record->clear(); | |
404 | scratch->clear(); | |
405 | ||
406 | uint64_t prospective_record_offset = 0; | |
407 | uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size(); | |
408 | size_t drop_size = 0; | |
409 | unsigned int fragment_type_or_err = 0; // Initialize to make compiler happy | |
410 | Slice fragment; | |
411 | while (TryReadFragment(&fragment, &drop_size, &fragment_type_or_err)) { | |
412 | switch (fragment_type_or_err) { | |
413 | case kFullType: | |
414 | case kRecyclableFullType: | |
415 | if (in_fragmented_record_ && !fragments_.empty()) { | |
416 | ReportCorruption(fragments_.size(), "partial record without end(1)"); | |
417 | } | |
418 | fragments_.clear(); | |
419 | *record = fragment; | |
420 | prospective_record_offset = physical_record_offset; | |
421 | last_record_offset_ = prospective_record_offset; | |
422 | in_fragmented_record_ = false; | |
423 | return true; | |
424 | ||
425 | case kFirstType: | |
426 | case kRecyclableFirstType: | |
427 | if (in_fragmented_record_ || !fragments_.empty()) { | |
428 | ReportCorruption(fragments_.size(), "partial record without end(2)"); | |
429 | } | |
430 | prospective_record_offset = physical_record_offset; | |
431 | fragments_.assign(fragment.data(), fragment.size()); | |
432 | in_fragmented_record_ = true; | |
433 | break; | |
434 | ||
435 | case kMiddleType: | |
436 | case kRecyclableMiddleType: | |
437 | if (!in_fragmented_record_) { | |
438 | ReportCorruption(fragment.size(), | |
439 | "missing start of fragmented record(1)"); | |
440 | } else { | |
441 | fragments_.append(fragment.data(), fragment.size()); | |
442 | } | |
443 | break; | |
444 | ||
445 | case kLastType: | |
446 | case kRecyclableLastType: | |
447 | if (!in_fragmented_record_) { | |
448 | ReportCorruption(fragment.size(), | |
449 | "missing start of fragmented record(2)"); | |
450 | } else { | |
451 | fragments_.append(fragment.data(), fragment.size()); | |
452 | scratch->assign(fragments_.data(), fragments_.size()); | |
453 | fragments_.clear(); | |
454 | *record = Slice(*scratch); | |
455 | last_record_offset_ = prospective_record_offset; | |
456 | in_fragmented_record_ = false; | |
457 | return true; | |
458 | } | |
459 | break; | |
460 | ||
461 | case kBadHeader: | |
462 | case kBadRecord: | |
463 | case kEof: | |
464 | case kOldRecord: | |
465 | if (in_fragmented_record_) { | |
466 | ReportCorruption(fragments_.size(), "error in middle of record"); | |
467 | in_fragmented_record_ = false; | |
468 | fragments_.clear(); | |
469 | } | |
470 | break; | |
471 | ||
472 | case kBadRecordChecksum: | |
473 | if (recycled_) { | |
474 | fragments_.clear(); | |
475 | return false; | |
476 | } | |
477 | ReportCorruption(drop_size, "checksum mismatch"); | |
478 | if (in_fragmented_record_) { | |
479 | ReportCorruption(fragments_.size(), "error in middle of record"); | |
480 | in_fragmented_record_ = false; | |
481 | fragments_.clear(); | |
482 | } | |
483 | break; | |
484 | ||
485 | default: { | |
486 | char buf[40]; | |
487 | snprintf(buf, sizeof(buf), "unknown record type %u", | |
488 | fragment_type_or_err); | |
489 | ReportCorruption( | |
490 | fragment.size() + (in_fragmented_record_ ? fragments_.size() : 0), | |
491 | buf); | |
492 | in_fragmented_record_ = false; | |
493 | fragments_.clear(); | |
494 | break; | |
495 | } | |
496 | } | |
497 | } | |
498 | return false; | |
499 | } | |
500 | ||
501 | void FragmentBufferedReader::UnmarkEOF() { | |
502 | if (read_error_) { | |
503 | return; | |
504 | } | |
505 | eof_ = false; | |
506 | UnmarkEOFInternal(); | |
507 | } | |
508 | ||
509 | bool FragmentBufferedReader::TryReadMore(size_t* drop_size, int* error) { | |
510 | if (!eof_ && !read_error_) { | |
511 | // Last read was a full read, so this is a trailer to skip | |
512 | buffer_.clear(); | |
513 | Status status = file_->Read(kBlockSize, &buffer_, backing_store_); | |
514 | end_of_buffer_offset_ += buffer_.size(); | |
515 | if (!status.ok()) { | |
516 | buffer_.clear(); | |
517 | ReportDrop(kBlockSize, status); | |
518 | read_error_ = true; | |
519 | *error = kEof; | |
520 | return false; | |
521 | } else if (buffer_.size() < static_cast<size_t>(kBlockSize)) { | |
522 | eof_ = true; | |
523 | eof_offset_ = buffer_.size(); | |
524 | TEST_SYNC_POINT_CALLBACK( | |
525 | "FragmentBufferedLogReader::TryReadMore:FirstEOF", nullptr); | |
526 | } | |
527 | return true; | |
528 | } else if (!read_error_) { | |
529 | UnmarkEOF(); | |
530 | } | |
531 | if (!read_error_) { | |
532 | return true; | |
533 | } | |
534 | *error = kEof; | |
535 | *drop_size = buffer_.size(); | |
536 | if (buffer_.size() > 0) { | |
537 | *error = kBadHeader; | |
538 | } | |
539 | buffer_.clear(); | |
540 | return false; | |
541 | } | |
542 | ||
543 | // return true if the caller should process the fragment_type_or_err. | |
544 | bool FragmentBufferedReader::TryReadFragment( | |
545 | Slice* fragment, size_t* drop_size, unsigned int* fragment_type_or_err) { | |
546 | assert(fragment != nullptr); | |
547 | assert(drop_size != nullptr); | |
548 | assert(fragment_type_or_err != nullptr); | |
549 | ||
550 | while (buffer_.size() < static_cast<size_t>(kHeaderSize)) { | |
551 | size_t old_size = buffer_.size(); | |
552 | int error = kEof; | |
553 | if (!TryReadMore(drop_size, &error)) { | |
554 | *fragment_type_or_err = error; | |
555 | return false; | |
556 | } else if (old_size == buffer_.size()) { | |
557 | return false; | |
558 | } | |
559 | } | |
560 | const char* header = buffer_.data(); | |
561 | const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff; | |
562 | const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff; | |
563 | const unsigned int type = header[6]; | |
564 | const uint32_t length = a | (b << 8); | |
565 | int header_size = kHeaderSize; | |
566 | if (type >= kRecyclableFullType && type <= kRecyclableLastType) { | |
567 | if (end_of_buffer_offset_ - buffer_.size() == 0) { | |
568 | recycled_ = true; | |
569 | } | |
570 | header_size = kRecyclableHeaderSize; | |
571 | while (buffer_.size() < static_cast<size_t>(kRecyclableHeaderSize)) { | |
572 | size_t old_size = buffer_.size(); | |
573 | int error = kEof; | |
574 | if (!TryReadMore(drop_size, &error)) { | |
575 | *fragment_type_or_err = error; | |
576 | return false; | |
577 | } else if (old_size == buffer_.size()) { | |
578 | return false; | |
579 | } | |
580 | } | |
581 | const uint32_t log_num = DecodeFixed32(header + 7); | |
582 | if (log_num != log_number_) { | |
583 | *fragment_type_or_err = kOldRecord; | |
584 | return true; | |
585 | } | |
586 | } | |
587 | ||
588 | while (header_size + length > buffer_.size()) { | |
589 | size_t old_size = buffer_.size(); | |
590 | int error = kEof; | |
591 | if (!TryReadMore(drop_size, &error)) { | |
592 | *fragment_type_or_err = error; | |
593 | return false; | |
594 | } else if (old_size == buffer_.size()) { | |
595 | return false; | |
596 | } | |
597 | } | |
598 | ||
599 | if (type == kZeroType && length == 0) { | |
600 | buffer_.clear(); | |
601 | *fragment_type_or_err = kBadRecord; | |
602 | return true; | |
603 | } | |
604 | ||
605 | if (checksum_) { | |
606 | uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header)); | |
607 | uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6); | |
608 | if (actual_crc != expected_crc) { | |
609 | *drop_size = buffer_.size(); | |
610 | buffer_.clear(); | |
611 | *fragment_type_or_err = kBadRecordChecksum; | |
612 | return true; | |
613 | } | |
614 | } | |
615 | ||
616 | buffer_.remove_prefix(header_size + length); | |
617 | ||
618 | *fragment = Slice(header + header_size, length); | |
619 | *fragment_type_or_err = type; | |
620 | return true; | |
621 | } | |
622 | ||
7c673cae | 623 | } // namespace log |
f67539c2 | 624 | } // namespace ROCKSDB_NAMESPACE |