#include "memory/memory_allocator.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/statistics.h"
+#include "options/options_helper.h"
#include "rocksdb/env.h"
#include "rocksdb/options.h"
+#include "rocksdb/table.h"
#include "table/block_based/block.h"
#include "table/block_based/block_based_table_reader.h"
#include "table/persistent_cache_helper.h"
+#include "util/cast_util.h"
#include "util/coding.h"
#include "util/compression.h"
#include "util/crc32c.h"
+#include "util/hash.h"
#include "util/stop_watch.h"
#include "util/string_util.h"
+#include "util/xxhash.h"
namespace ROCKSDB_NAMESPACE {
void BlockHandle::EncodeTo(std::string* dst) const {
// Sanity check that all fields have been set
- assert(offset_ != ~static_cast<uint64_t>(0));
- assert(size_ != ~static_cast<uint64_t>(0));
+ assert(offset_ != ~uint64_t{0});
+ assert(size_ != ~uint64_t{0});
PutVarint64Varint64(dst, offset_, size_);
}
+char* BlockHandle::EncodeTo(char* dst) const {
+ // Sanity check that all fields have been set
+ assert(offset_ != ~uint64_t{0});
+ assert(size_ != ~uint64_t{0});
+ char* cur = EncodeVarint64(dst, offset_);
+ cur = EncodeVarint64(cur, size_);
+ return cur;
+}
+
Status BlockHandle::DecodeFrom(Slice* input) {
if (GetVarint64(input, &offset_) && GetVarint64(input, &size_)) {
return Status::OK();
void IndexValue::EncodeTo(std::string* dst, bool have_first_key,
const BlockHandle* previous_handle) const {
if (previous_handle) {
+ // WART: this is specific to Block-based table
assert(handle.offset() == previous_handle->offset() +
- previous_handle->size() + kBlockTrailerSize);
+ previous_handle->size() +
+ BlockBasedTable::kBlockTrailerSize);
PutVarsignedint64(dst, handle.size() - previous_handle->size());
} else {
handle.EncodeTo(dst);
if (!GetVarsignedint64(input, &delta)) {
return Status::Corruption("bad delta-encoded index value");
}
- handle = BlockHandle(
- previous_handle->offset() + previous_handle->size() + kBlockTrailerSize,
- previous_handle->size() + delta);
+ // WART: this is specific to Block-based table
+ handle = BlockHandle(previous_handle->offset() + previous_handle->size() +
+ BlockBasedTable::kBlockTrailerSize,
+ previous_handle->size() + delta);
} else {
Status s = handle.DecodeFrom(input);
if (!s.ok()) {
return kPlainTableMagicNumber;
}
assert(false);
- return 0;
+ return magic_number;
+}
+inline uint64_t DownconvertToLegacyFooterFormat(uint64_t magic_number) {
+ if (magic_number == kBlockBasedTableMagicNumber) {
+ return kLegacyBlockBasedTableMagicNumber;
+ }
+ if (magic_number == kPlainTableMagicNumber) {
+ return kLegacyPlainTableMagicNumber;
+ }
+ assert(false);
+ return magic_number;
+}
+inline uint8_t BlockTrailerSizeForMagicNumber(uint64_t magic_number) {
+ if (magic_number == kBlockBasedTableMagicNumber ||
+ magic_number == kLegacyBlockBasedTableMagicNumber) {
+ return static_cast<uint8_t>(BlockBasedTable::kBlockTrailerSize);
+ } else {
+ return 0;
+ }
}
+
+// Footer format, in three parts:
+// * Part1
+// -> format_version == 0 (inferred from legacy magic number)
+// <empty> (0 bytes)
+// -> format_version >= 1
+// checksum type (char, 1 byte)
+// * Part2
+// metaindex handle (varint64 offset, varint64 size)
+// index handle (varint64 offset, varint64 size)
+// <zero padding> for part2 size = 2 * BlockHandle::kMaxEncodedLength = 40
+// * Part3
+// -> format_version == 0 (inferred from legacy magic number)
+// legacy magic number (8 bytes)
+// -> format_version >= 1 (inferred from NOT legacy magic number)
+// format_version (uint32LE, 4 bytes), also called "footer version"
+// newer magic number (8 bytes)
+
+constexpr size_t kFooterPart2Size = 2 * BlockHandle::kMaxEncodedLength;
} // namespace
-// legacy footer format:
-// metaindex handle (varint64 offset, varint64 size)
-// index handle (varint64 offset, varint64 size)
-// <padding> to make the total size 2 * BlockHandle::kMaxEncodedLength
-// table_magic_number (8 bytes)
-// new footer format:
-// checksum type (char, 1 byte)
-// metaindex handle (varint64 offset, varint64 size)
-// index handle (varint64 offset, varint64 size)
-// <padding> to make the total size 2 * BlockHandle::kMaxEncodedLength + 1
-// footer version (4 bytes)
-// table_magic_number (8 bytes)
-void Footer::EncodeTo(std::string* dst) const {
- assert(HasInitializedTableMagicNumber());
- if (IsLegacyFooterFormat(table_magic_number())) {
- // has to be default checksum with legacy footer
- assert(checksum_ == kCRC32c);
- const size_t original_size = dst->size();
- metaindex_handle_.EncodeTo(dst);
- index_handle_.EncodeTo(dst);
- dst->resize(original_size + 2 * BlockHandle::kMaxEncodedLength); // Padding
- PutFixed32(dst, static_cast<uint32_t>(table_magic_number() & 0xffffffffu));
- PutFixed32(dst, static_cast<uint32_t>(table_magic_number() >> 32));
- assert(dst->size() == original_size + kVersion0EncodedLength);
+void FooterBuilder::Build(uint64_t magic_number, uint32_t format_version,
+ uint64_t footer_offset, ChecksumType checksum_type,
+ const BlockHandle& metaindex_handle,
+ const BlockHandle& index_handle) {
+ (void)footer_offset; // Future use
+
+ assert(magic_number != Footer::kNullTableMagicNumber);
+ assert(IsSupportedFormatVersion(format_version));
+
+ char* part2;
+ char* part3;
+ if (format_version > 0) {
+ slice_ = Slice(data_.data(), Footer::kNewVersionsEncodedLength);
+ // Generate parts 1 and 3
+ char* cur = data_.data();
+ // Part 1
+ *(cur++) = checksum_type;
+ // Part 2
+ part2 = cur;
+ // Skip over part 2 for now
+ cur += kFooterPart2Size;
+ // Part 3
+ part3 = cur;
+ EncodeFixed32(cur, format_version);
+ cur += 4;
+ EncodeFixed64(cur, magic_number);
+ assert(cur + 8 == slice_.data() + slice_.size());
} else {
- const size_t original_size = dst->size();
- dst->push_back(static_cast<char>(checksum_));
- metaindex_handle_.EncodeTo(dst);
- index_handle_.EncodeTo(dst);
- dst->resize(original_size + kNewVersionsEncodedLength - 12); // Padding
- PutFixed32(dst, version());
- PutFixed32(dst, static_cast<uint32_t>(table_magic_number() & 0xffffffffu));
- PutFixed32(dst, static_cast<uint32_t>(table_magic_number() >> 32));
- assert(dst->size() == original_size + kNewVersionsEncodedLength);
+ slice_ = Slice(data_.data(), Footer::kVersion0EncodedLength);
+ // Legacy SST files use kCRC32c checksum but it's not stored in footer.
+ assert(checksum_type == kNoChecksum || checksum_type == kCRC32c);
+ // Generate part 3 (part 1 empty, skip part 2 for now)
+ part2 = data_.data();
+ part3 = part2 + kFooterPart2Size;
+ char* cur = part3;
+ // Use legacy magic numbers to indicate format_version=0, for
+ // compatibility. No other cases should use format_version=0.
+ EncodeFixed64(cur, DownconvertToLegacyFooterFormat(magic_number));
+ assert(cur + 8 == slice_.data() + slice_.size());
}
-}
-Footer::Footer(uint64_t _table_magic_number, uint32_t _version)
- : version_(_version),
- checksum_(kCRC32c),
- table_magic_number_(_table_magic_number) {
- // This should be guaranteed by constructor callers
- assert(!IsLegacyFooterFormat(_table_magic_number) || version_ == 0);
+ {
+ char* cur = part2;
+ cur = metaindex_handle.EncodeTo(cur);
+ cur = index_handle.EncodeTo(cur);
+ // Zero pad remainder
+ std::fill(cur, part3, char{0});
+ }
}
-Status Footer::DecodeFrom(Slice* input) {
- assert(!HasInitializedTableMagicNumber());
+Status Footer::DecodeFrom(Slice input, uint64_t input_offset) {
+ (void)input_offset; // Future use
+
+ // Only decode to unused Footer
+ assert(table_magic_number_ == kNullTableMagicNumber);
assert(input != nullptr);
- assert(input->size() >= kMinEncodedLength);
+ assert(input.size() >= kMinEncodedLength);
- const char* magic_ptr =
- input->data() + input->size() - kMagicNumberLengthByte;
- const uint32_t magic_lo = DecodeFixed32(magic_ptr);
- const uint32_t magic_hi = DecodeFixed32(magic_ptr + 4);
- uint64_t magic = ((static_cast<uint64_t>(magic_hi) << 32) |
- (static_cast<uint64_t>(magic_lo)));
+ const char* magic_ptr = input.data() + input.size() - kMagicNumberLengthByte;
+ uint64_t magic = DecodeFixed64(magic_ptr);
// We check for legacy formats here and silently upconvert them
bool legacy = IsLegacyFooterFormat(magic);
if (legacy) {
magic = UpconvertLegacyFooterFormat(magic);
}
- set_table_magic_number(magic);
+ table_magic_number_ = magic;
+ block_trailer_size_ = BlockTrailerSizeForMagicNumber(magic);
+ // Parse Part3
if (legacy) {
// The size is already asserted to be at least kMinEncodedLength
// at the beginning of the function
- input->remove_prefix(input->size() - kVersion0EncodedLength);
- version_ = 0 /* legacy */;
- checksum_ = kCRC32c;
+ input.remove_prefix(input.size() - kVersion0EncodedLength);
+ format_version_ = 0 /* legacy */;
+ checksum_type_ = kCRC32c;
} else {
- version_ = DecodeFixed32(magic_ptr - 4);
- // Footer version 1 and higher will always occupy exactly this many bytes.
- // It consists of the checksum type, two block handles, padding,
- // a version number, and a magic number
- if (input->size() < kNewVersionsEncodedLength) {
- return Status::Corruption("input is too short to be an sstable");
- } else {
- input->remove_prefix(input->size() - kNewVersionsEncodedLength);
+ const char* part3_ptr = magic_ptr - 4;
+ format_version_ = DecodeFixed32(part3_ptr);
+ if (!IsSupportedFormatVersion(format_version_)) {
+ return Status::Corruption("Corrupt or unsupported format_version: " +
+ std::to_string(format_version_));
}
- uint32_t chksum;
- if (!GetVarint32(input, &chksum)) {
- return Status::Corruption("bad checksum type");
+ // All known format versions >= 1 occupy exactly this many bytes.
+ if (input.size() < kNewVersionsEncodedLength) {
+ return Status::Corruption("Input is too short to be an SST file");
}
- checksum_ = static_cast<ChecksumType>(chksum);
+ uint64_t adjustment = input.size() - kNewVersionsEncodedLength;
+ input.remove_prefix(adjustment);
+
+ // Parse Part1
+ char chksum = input.data()[0];
+ checksum_type_ = lossless_cast<ChecksumType>(chksum);
+ if (!IsSupportedChecksumType(checksum_type())) {
+ return Status::Corruption("Corrupt or unsupported checksum type: " +
+ std::to_string(lossless_cast<uint8_t>(chksum)));
+ }
+ // Consume checksum type field
+ input.remove_prefix(1);
}
- Status result = metaindex_handle_.DecodeFrom(input);
- if (result.ok()) {
- result = index_handle_.DecodeFrom(input);
- }
+ // Parse Part2
+ Status result = metaindex_handle_.DecodeFrom(&input);
if (result.ok()) {
- // We skip over any leftover data (just padding for now) in "input"
- const char* end = magic_ptr + kMagicNumberLengthByte;
- *input = Slice(end, input->data() + input->size() - end);
+ result = index_handle_.DecodeFrom(&input);
}
return result;
+ // Padding in part2 is ignored
}
std::string Footer::ToString() const {
if (legacy) {
result.append("metaindex handle: " + metaindex_handle_.ToString() + "\n ");
result.append("index handle: " + index_handle_.ToString() + "\n ");
- result.append("table_magic_number: " +
- ROCKSDB_NAMESPACE::ToString(table_magic_number_) + "\n ");
- } else {
- result.append("checksum: " + ROCKSDB_NAMESPACE::ToString(checksum_) +
+ result.append("table_magic_number: " + std::to_string(table_magic_number_) +
"\n ");
+ } else {
result.append("metaindex handle: " + metaindex_handle_.ToString() + "\n ");
result.append("index handle: " + index_handle_.ToString() + "\n ");
- result.append("footer version: " + ROCKSDB_NAMESPACE::ToString(version_) +
+ result.append("table_magic_number: " + std::to_string(table_magic_number_) +
+ "\n ");
+ result.append("format version: " + std::to_string(format_version_) +
"\n ");
- result.append("table_magic_number: " +
- ROCKSDB_NAMESPACE::ToString(table_magic_number_) + "\n ");
}
return result;
}
uint64_t file_size, Footer* footer,
uint64_t enforce_table_magic_number) {
if (file_size < Footer::kMinEncodedLength) {
- return Status::Corruption("file is too short (" + ToString(file_size) +
+ return Status::Corruption("file is too short (" +
+ std::to_string(file_size) +
" bytes) to be an "
"sstable: " +
file->file_name());
std::string footer_buf;
AlignedBuf internal_buf;
Slice footer_input;
- size_t read_offset =
- (file_size > Footer::kMaxEncodedLength)
- ? static_cast<size_t>(file_size - Footer::kMaxEncodedLength)
- : 0;
+ uint64_t read_offset = (file_size > Footer::kMaxEncodedLength)
+ ? file_size - Footer::kMaxEncodedLength
+ : 0;
Status s;
// TODO: Need to pass appropriate deadline to TryReadFromCache(). Right now,
// there is no readahead for point lookups, so TryReadFromCache will fail if
// the required data is not in the prefetch buffer. Once deadline is enabled
// for iterator, TryReadFromCache might do a readahead. Revisit to see if we
// need to pass a timeout at that point
+ // TODO: rate limit footer reads.
if (prefetch_buffer == nullptr ||
!prefetch_buffer->TryReadFromCache(
- IOOptions(), read_offset, Footer::kMaxEncodedLength, &footer_input)) {
+ opts, file, read_offset, Footer::kMaxEncodedLength, &footer_input,
+ nullptr, opts.rate_limiter_priority)) {
if (file->use_direct_io()) {
s = file->Read(opts, read_offset, Footer::kMaxEncodedLength,
- &footer_input, nullptr, &internal_buf);
+ &footer_input, nullptr, &internal_buf,
+ opts.rate_limiter_priority);
} else {
footer_buf.reserve(Footer::kMaxEncodedLength);
s = file->Read(opts, read_offset, Footer::kMaxEncodedLength,
- &footer_input, &footer_buf[0], nullptr);
+ &footer_input, &footer_buf[0], nullptr,
+ opts.rate_limiter_priority);
}
if (!s.ok()) return s;
}
// Check that we actually read the whole footer from the file. It may be
// that size isn't correct.
if (footer_input.size() < Footer::kMinEncodedLength) {
- return Status::Corruption("file is too short (" + ToString(file_size) +
+ // FIXME: this error message is bad. We should be checking whether the
+ // provided file_size matches what's on disk, at least in this case.
+ // Unfortunately FileSystem/Env does not provide a way to get the size
+ // of an open file, so getting file size requires a full path seek.
+ return Status::Corruption("file is too short (" +
+ std::to_string(file_size) +
" bytes) to be an "
"sstable" +
file->file_name());
}
- s = footer->DecodeFrom(&footer_input);
+ s = footer->DecodeFrom(footer_input, read_offset);
if (!s.ok()) {
return s;
}
if (enforce_table_magic_number != 0 &&
enforce_table_magic_number != footer->table_magic_number()) {
- return Status::Corruption(
- "Bad table magic number: expected " +
- ToString(enforce_table_magic_number) + ", found " +
- ToString(footer->table_magic_number()) + " in " + file->file_name());
+ return Status::Corruption("Bad table magic number: expected " +
+ std::to_string(enforce_table_magic_number) +
+ ", found " +
+ std::to_string(footer->table_magic_number()) +
+ " in " + file->file_name());
}
return Status::OK();
}
-Status UncompressBlockContentsForCompressionType(
- const UncompressionInfo& uncompression_info, const char* data, size_t n,
- BlockContents* contents, uint32_t format_version,
- const ImmutableCFOptions& ioptions, MemoryAllocator* allocator) {
+namespace {
+// Custom handling for the last byte of a block, to avoid invoking streaming
+// API to get an effective block checksum. This function is its own inverse
+// because it uses xor.
+inline uint32_t ModifyChecksumForLastByte(uint32_t checksum, char last_byte) {
+ // This strategy bears some resemblance to extending a CRC checksum by one
+ // more byte, except we don't need to re-mix the input checksum as long as
+ // we do this step only once (per checksum).
+ const uint32_t kRandomPrime = 0x6b9083d9;
+ return checksum ^ lossless_cast<uint8_t>(last_byte) * kRandomPrime;
+}
+} // namespace
+
+uint32_t ComputeBuiltinChecksum(ChecksumType type, const char* data,
+ size_t data_size) {
+ switch (type) {
+ case kCRC32c:
+ return crc32c::Mask(crc32c::Value(data, data_size));
+ case kxxHash:
+ return XXH32(data, data_size, /*seed*/ 0);
+ case kxxHash64:
+ return Lower32of64(XXH64(data, data_size, /*seed*/ 0));
+ case kXXH3: {
+ if (data_size == 0) {
+ // Special case because of special handling for last byte, not
+ // present in this case. Can be any value different from other
+ // small input size checksums.
+ return 0;
+ } else {
+ // See corresponding code in ComputeBuiltinChecksumWithLastByte
+ uint32_t v = Lower32of64(XXH3_64bits(data, data_size - 1));
+ return ModifyChecksumForLastByte(v, data[data_size - 1]);
+ }
+ }
+ default: // including kNoChecksum
+ return 0;
+ }
+}
+
+uint32_t ComputeBuiltinChecksumWithLastByte(ChecksumType type, const char* data,
+ size_t data_size, char last_byte) {
+ switch (type) {
+ case kCRC32c: {
+ uint32_t crc = crc32c::Value(data, data_size);
+ // Extend to cover last byte (compression type)
+ crc = crc32c::Extend(crc, &last_byte, 1);
+ return crc32c::Mask(crc);
+ }
+ case kxxHash: {
+ XXH32_state_t* const state = XXH32_createState();
+ XXH32_reset(state, 0);
+ XXH32_update(state, data, data_size);
+ // Extend to cover last byte (compression type)
+ XXH32_update(state, &last_byte, 1);
+ uint32_t v = XXH32_digest(state);
+ XXH32_freeState(state);
+ return v;
+ }
+ case kxxHash64: {
+ XXH64_state_t* const state = XXH64_createState();
+ XXH64_reset(state, 0);
+ XXH64_update(state, data, data_size);
+ // Extend to cover last byte (compression type)
+ XXH64_update(state, &last_byte, 1);
+ uint32_t v = Lower32of64(XXH64_digest(state));
+ XXH64_freeState(state);
+ return v;
+ }
+ case kXXH3: {
+ // XXH3 is a complicated hash function that is extremely fast on
+ // contiguous input, but that makes its streaming support rather
+ // complex. It is worth custom handling of the last byte (`type`)
+ // in order to avoid allocating a large state object and bringing
+ // that code complexity into CPU working set.
+ uint32_t v = Lower32of64(XXH3_64bits(data, data_size));
+ return ModifyChecksumForLastByte(v, last_byte);
+ }
+ default: // including kNoChecksum
+ return 0;
+ }
+}
+
+Status UncompressBlockData(const UncompressionInfo& uncompression_info,
+ const char* data, size_t size,
+ BlockContents* out_contents, uint32_t format_version,
+ const ImmutableOptions& ioptions,
+ MemoryAllocator* allocator) {
Status ret = Status::OK();
assert(uncompression_info.type() != kNoCompression &&
"Invalid compression type");
- StopWatchNano timer(ioptions.env, ShouldReportDetailedTime(
- ioptions.env, ioptions.statistics));
+ StopWatchNano timer(ioptions.clock,
+ ShouldReportDetailedTime(ioptions.env, ioptions.stats));
size_t uncompressed_size = 0;
CacheAllocationPtr ubuf =
- UncompressData(uncompression_info, data, n, &uncompressed_size,
+ UncompressData(uncompression_info, data, size, &uncompressed_size,
GetCompressFormatForVersion(format_version), allocator);
if (!ubuf) {
- return Status::Corruption(
- "Unsupported compression method or corrupted compressed block contents",
- CompressionTypeToString(uncompression_info.type()));
+ if (!CompressionTypeSupported(uncompression_info.type())) {
+ return Status::NotSupported(
+ "Unsupported compression method for this build",
+ CompressionTypeToString(uncompression_info.type()));
+ } else {
+ return Status::Corruption(
+ "Corrupted compressed block contents",
+ CompressionTypeToString(uncompression_info.type()));
+ }
}
- *contents = BlockContents(std::move(ubuf), uncompressed_size);
+ *out_contents = BlockContents(std::move(ubuf), uncompressed_size);
- if (ShouldReportDetailedTime(ioptions.env, ioptions.statistics)) {
- RecordTimeToHistogram(ioptions.statistics, DECOMPRESSION_TIMES_NANOS,
+ if (ShouldReportDetailedTime(ioptions.env, ioptions.stats)) {
+ RecordTimeToHistogram(ioptions.stats, DECOMPRESSION_TIMES_NANOS,
timer.ElapsedNanos());
}
- RecordTimeToHistogram(ioptions.statistics, BYTES_DECOMPRESSED,
- contents->data.size());
- RecordTick(ioptions.statistics, NUMBER_BLOCK_DECOMPRESSED);
+ RecordTimeToHistogram(ioptions.stats, BYTES_DECOMPRESSED,
+ out_contents->data.size());
+ RecordTick(ioptions.stats, NUMBER_BLOCK_DECOMPRESSED);
+ TEST_SYNC_POINT_CALLBACK("UncompressBlockData:TamperWithReturnValue",
+ static_cast<void*>(&ret));
TEST_SYNC_POINT_CALLBACK(
- "UncompressBlockContentsForCompressionType:TamperWithReturnValue",
- static_cast<void*>(&ret));
- TEST_SYNC_POINT_CALLBACK(
- "UncompressBlockContentsForCompressionType:"
+ "UncompressBlockData:"
"TamperWithDecompressionOutput",
- static_cast<void*>(contents));
+ static_cast<void*>(out_contents));
return ret;
}
-//
-// The 'data' points to the raw block contents that was read in from file.
-// This method allocates a new heap buffer and the raw block
-// contents are uncompresed into this buffer. This
-// buffer is returned via 'result' and it is upto the caller to
-// free this buffer.
-// format_version is the block format as defined in include/rocksdb/table.h
-Status UncompressBlockContents(const UncompressionInfo& uncompression_info,
- const char* data, size_t n,
- BlockContents* contents, uint32_t format_version,
- const ImmutableCFOptions& ioptions,
- MemoryAllocator* allocator) {
- assert(data[n] != kNoCompression);
- assert(data[n] == static_cast<char>(uncompression_info.type()));
- return UncompressBlockContentsForCompressionType(uncompression_info, data, n,
- contents, format_version,
- ioptions, allocator);
+Status UncompressSerializedBlock(const UncompressionInfo& uncompression_info,
+ const char* data, size_t size,
+ BlockContents* out_contents,
+ uint32_t format_version,
+ const ImmutableOptions& ioptions,
+ MemoryAllocator* allocator) {
+ assert(data[size] != kNoCompression);
+ assert(data[size] == static_cast<char>(uncompression_info.type()));
+ return UncompressBlockData(uncompression_info, data, size, out_contents,
+ format_version, ioptions, allocator);
}
// Replace the contents of db_host_id with the actual hostname, if db_host_id