#include "db/log_reader.h"
#include <stdio.h>
+
#include "file/sequence_file_reader.h"
#include "port/lang.h"
#include "rocksdb/env.h"
namespace ROCKSDB_NAMESPACE {
namespace log {
-Reader::Reporter::~Reporter() {
-}
+Reader::Reporter::~Reporter() {}
Reader::Reader(std::shared_ptr<Logger> info_log,
std::unique_ptr<SequentialFileReader>&& _file,
last_record_offset_(0),
end_of_buffer_offset_(0),
log_number_(log_num),
- recycled_(false) {}
+ recycled_(false),
+ first_record_read_(false),
+ compression_type_(kNoCompression),
+ compression_type_record_read_(false),
+ uncompress_(nullptr),
+ hash_state_(nullptr),
+ uncompress_hash_state_(nullptr){};
Reader::~Reader() {
delete[] backing_store_;
+ if (uncompress_) {
+ delete uncompress_;
+ }
+ if (hash_state_) {
+ XXH3_freeState(hash_state_);
+ }
+ if (uncompress_hash_state_) {
+ XXH3_freeState(uncompress_hash_state_);
+ }
}
// For kAbsoluteConsistency, on clean shutdown we don't expect any error
// TODO krad: Evaluate if we need to move to a more strict mode where we
// restrict the inconsistency to only the last log
bool Reader::ReadRecord(Slice* record, std::string* scratch,
- WALRecoveryMode wal_recovery_mode) {
+ WALRecoveryMode wal_recovery_mode,
+ uint64_t* record_checksum) {
scratch->clear();
record->clear();
+ if (record_checksum != nullptr) {
+ if (hash_state_ == nullptr) {
+ hash_state_ = XXH3_createState();
+ }
+ XXH3_64bits_reset(hash_state_);
+ }
+ if (uncompress_) {
+ uncompress_->Reset();
+ }
bool in_fragmented_record = false;
// Record offset of the logical record that we're reading
// 0 is a dummy value to make compilers happy
while (true) {
uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
size_t drop_size = 0;
- const unsigned int record_type = ReadPhysicalRecord(&fragment, &drop_size);
+ const unsigned int record_type =
+ ReadPhysicalRecord(&fragment, &drop_size, record_checksum);
switch (record_type) {
case kFullType:
case kRecyclableFullType:
// at the beginning of the next block.
ReportCorruption(scratch->size(), "partial record without end(1)");
}
+ // No need to compute record_checksum since the record
+ // consists of a single fragment and the checksum is computed
+ // in ReadPhysicalRecord() if WAL compression is enabled
+ if (record_checksum != nullptr && uncompress_ == nullptr) {
+ // No need to stream since the record is a single fragment
+ *record_checksum = XXH3_64bits(fragment.data(), fragment.size());
+ }
prospective_record_offset = physical_record_offset;
scratch->clear();
*record = fragment;
last_record_offset_ = prospective_record_offset;
+ first_record_read_ = true;
return true;
case kFirstType:
// of a block followed by a kFullType or kFirstType record
// at the beginning of the next block.
ReportCorruption(scratch->size(), "partial record without end(2)");
+ XXH3_64bits_reset(hash_state_);
+ }
+ if (record_checksum != nullptr) {
+ XXH3_64bits_update(hash_state_, fragment.data(), fragment.size());
}
prospective_record_offset = physical_record_offset;
scratch->assign(fragment.data(), fragment.size());
ReportCorruption(fragment.size(),
"missing start of fragmented record(1)");
} else {
+ if (record_checksum != nullptr) {
+ XXH3_64bits_update(hash_state_, fragment.data(), fragment.size());
+ }
scratch->append(fragment.data(), fragment.size());
}
break;
ReportCorruption(fragment.size(),
"missing start of fragmented record(2)");
} else {
+ if (record_checksum != nullptr) {
+ XXH3_64bits_update(hash_state_, fragment.data(), fragment.size());
+ *record_checksum = XXH3_64bits_digest(hash_state_);
+ }
scratch->append(fragment.data(), fragment.size());
*record = Slice(*scratch);
last_record_offset_ = prospective_record_offset;
+ first_record_read_ = true;
return true;
}
break;
FALLTHROUGH_INTENDED;
case kBadRecordChecksum:
- if (recycled_ &&
- wal_recovery_mode ==
- WALRecoveryMode::kTolerateCorruptedTailRecords) {
+ if (recycled_ && wal_recovery_mode ==
+ WALRecoveryMode::kTolerateCorruptedTailRecords) {
scratch->clear();
return false;
}
}
break;
+ case kSetCompressionType: {
+ if (compression_type_record_read_) {
+ ReportCorruption(fragment.size(),
+ "read multiple SetCompressionType records");
+ }
+ if (first_record_read_) {
+ ReportCorruption(fragment.size(),
+ "SetCompressionType not the first record");
+ }
+ prospective_record_offset = physical_record_offset;
+ scratch->clear();
+ last_record_offset_ = prospective_record_offset;
+ CompressionTypeRecord compression_record(kNoCompression);
+ Status s = compression_record.DecodeFrom(&fragment);
+ if (!s.ok()) {
+ ReportCorruption(fragment.size(),
+ "could not decode SetCompressionType record");
+ } else {
+ InitCompression(compression_record);
+ }
+ break;
+ }
+
default: {
char buf[40];
snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
return false;
}
-uint64_t Reader::LastRecordOffset() {
- return last_record_offset_;
-}
+uint64_t Reader::LastRecordOffset() { return last_record_offset_; }
uint64_t Reader::LastRecordEnd() {
return end_of_buffer_offset_ - buffer_.size();
}
Slice read_buffer;
- Status status = file_->Read(remaining, &read_buffer,
- backing_store_ + eof_offset_);
+ // TODO: rate limit log reader with approriate priority.
+ // TODO: avoid overcharging rate limiter:
+ // Note that the Read here might overcharge SequentialFileReader's internal
+ // rate limiter if priority is not IO_TOTAL, e.g., when there is not enough
+ // content left until EOF to read.
+ Status status =
+ file_->Read(remaining, &read_buffer, backing_store_ + eof_offset_,
+ Env::IO_TOTAL /* rate_limiter_priority */);
size_t added = read_buffer.size();
end_of_buffer_offset_ += added;
if (read_buffer.data() != backing_store_ + eof_offset_) {
// Read did not write to backing_store_
memmove(backing_store_ + eof_offset_, read_buffer.data(),
- read_buffer.size());
+ read_buffer.size());
}
buffer_ = Slice(backing_store_ + consumed_bytes,
- eof_offset_ + added - consumed_bytes);
+ eof_offset_ + added - consumed_bytes);
if (added < remaining) {
eof_ = true;
}
}
-bool Reader::ReadMore(size_t* drop_size, int *error) {
+bool Reader::ReadMore(size_t* drop_size, int* error) {
if (!eof_ && !read_error_) {
// Last read was a full read, so this is a trailer to skip
buffer_.clear();
- Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
+ // TODO: rate limit log reader with approriate priority.
+ // TODO: avoid overcharging rate limiter:
+ // Note that the Read here might overcharge SequentialFileReader's internal
+ // rate limiter if priority is not IO_TOTAL, e.g., when there is not enough
+ // content left until EOF to read.
+ Status status = file_->Read(kBlockSize, &buffer_, backing_store_,
+ Env::IO_TOTAL /* rate_limiter_priority */);
TEST_SYNC_POINT_CALLBACK("LogReader::ReadMore:AfterReadFile", &status);
end_of_buffer_offset_ += buffer_.size();
if (!status.ok()) {
}
}
-unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
+unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size,
+ uint64_t* fragment_checksum) {
while (true) {
// We need at least the minimum header size
if (buffer_.size() < static_cast<size_t>(kHeaderSize)) {
buffer_.remove_prefix(header_size + length);
- *result = Slice(header + header_size, length);
- return type;
+ if (!uncompress_ || type == kSetCompressionType) {
+ *result = Slice(header + header_size, length);
+ return type;
+ } else {
+ // Uncompress compressed records
+ uncompressed_record_.clear();
+ if (fragment_checksum != nullptr) {
+ if (uncompress_hash_state_ == nullptr) {
+ uncompress_hash_state_ = XXH3_createState();
+ }
+ XXH3_64bits_reset(uncompress_hash_state_);
+ }
+
+ size_t uncompressed_size = 0;
+ int remaining = 0;
+ do {
+ remaining = uncompress_->Uncompress(header + header_size, length,
+ uncompressed_buffer_.get(),
+ &uncompressed_size);
+ if (remaining < 0) {
+ buffer_.clear();
+ return kBadRecord;
+ }
+ if (uncompressed_size > 0) {
+ if (fragment_checksum != nullptr) {
+ XXH3_64bits_update(uncompress_hash_state_,
+ uncompressed_buffer_.get(), uncompressed_size);
+ }
+ uncompressed_record_.append(uncompressed_buffer_.get(),
+ uncompressed_size);
+ }
+ } while (remaining > 0 || uncompressed_size == kBlockSize);
+
+ if (fragment_checksum != nullptr) {
+ // We can remove this check by updating hash_state_ directly,
+ // but that requires resetting hash_state_ for full and first types
+ // for edge cases like consecutive fist type records.
+ // Leaving the check as is since it is cleaner and can revert to the
+ // above approach if it causes performance impact.
+ *fragment_checksum = XXH3_64bits_digest(uncompress_hash_state_);
+ uint64_t actual_checksum = XXH3_64bits(uncompressed_record_.data(),
+ uncompressed_record_.size());
+ if (*fragment_checksum != actual_checksum) {
+ // uncompressed_record_ contains bad content that does not match
+ // actual decompressed content
+ return kBadRecord;
+ }
+ }
+ *result = Slice(uncompressed_record_);
+ return type;
+ }
}
}
+// Initialize uncompress related fields
+void Reader::InitCompression(const CompressionTypeRecord& compression_record) {
+ compression_type_ = compression_record.GetCompressionType();
+ compression_type_record_read_ = true;
+ constexpr uint32_t compression_format_version = 2;
+ uncompress_ = StreamingUncompress::Create(
+ compression_type_, compression_format_version, kBlockSize);
+ assert(uncompress_ != nullptr);
+ uncompressed_buffer_ = std::unique_ptr<char[]>(new char[kBlockSize]);
+ assert(uncompressed_buffer_);
+}
+
bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch,
- WALRecoveryMode /*unused*/) {
+ WALRecoveryMode /*unused*/,
+ uint64_t* /* checksum */) {
assert(record != nullptr);
assert(scratch != nullptr);
record->clear();
scratch->clear();
+ if (uncompress_) {
+ uncompress_->Reset();
+ }
uint64_t prospective_record_offset = 0;
uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
*record = fragment;
prospective_record_offset = physical_record_offset;
last_record_offset_ = prospective_record_offset;
+ first_record_read_ = true;
in_fragmented_record_ = false;
return true;
fragments_.clear();
*record = Slice(*scratch);
last_record_offset_ = prospective_record_offset;
+ first_record_read_ = true;
in_fragmented_record_ = false;
return true;
}
}
break;
+ case kSetCompressionType: {
+ if (compression_type_record_read_) {
+ ReportCorruption(fragment.size(),
+ "read multiple SetCompressionType records");
+ }
+ if (first_record_read_) {
+ ReportCorruption(fragment.size(),
+ "SetCompressionType not the first record");
+ }
+ fragments_.clear();
+ prospective_record_offset = physical_record_offset;
+ last_record_offset_ = prospective_record_offset;
+ in_fragmented_record_ = false;
+ CompressionTypeRecord compression_record(kNoCompression);
+ Status s = compression_record.DecodeFrom(&fragment);
+ if (!s.ok()) {
+ ReportCorruption(fragment.size(),
+ "could not decode SetCompressionType record");
+ } else {
+ InitCompression(compression_record);
+ }
+ break;
+ }
+
default: {
char buf[40];
snprintf(buf, sizeof(buf), "unknown record type %u",
if (!eof_ && !read_error_) {
// Last read was a full read, so this is a trailer to skip
buffer_.clear();
- Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
+ // TODO: rate limit log reader with approriate priority.
+ // TODO: avoid overcharging rate limiter:
+ // Note that the Read here might overcharge SequentialFileReader's internal
+ // rate limiter if priority is not IO_TOTAL, e.g., when there is not enough
+ // content left until EOF to read.
+ Status status = file_->Read(kBlockSize, &buffer_, backing_store_,
+ Env::IO_TOTAL /* rate_limiter_priority */);
end_of_buffer_offset_ += buffer_.size();
if (!status.ok()) {
buffer_.clear();
buffer_.remove_prefix(header_size + length);
- *fragment = Slice(header + header_size, length);
- *fragment_type_or_err = type;
- return true;
+ if (!uncompress_ || type == kSetCompressionType) {
+ *fragment = Slice(header + header_size, length);
+ *fragment_type_or_err = type;
+ return true;
+ } else {
+ // Uncompress compressed records
+ uncompressed_record_.clear();
+ size_t uncompressed_size = 0;
+ int remaining = 0;
+ do {
+ remaining = uncompress_->Uncompress(header + header_size, length,
+ uncompressed_buffer_.get(),
+ &uncompressed_size);
+ if (remaining < 0) {
+ buffer_.clear();
+ *fragment_type_or_err = kBadRecord;
+ return true;
+ }
+ if (uncompressed_size > 0) {
+ uncompressed_record_.append(uncompressed_buffer_.get(),
+ uncompressed_size);
+ }
+ } while (remaining > 0 || uncompressed_size == kBlockSize);
+ *fragment = Slice(std::move(uncompressed_record_));
+ *fragment_type_or_err = type;
+ return true;
+ }
}
} // namespace log