// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
-// This source code is licensed under the BSD-style license found in the
-// LICENSE file in the root directory of this source tree. An additional grant
-// of patent rights can be found in the PATENTS file in the same directory.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
#include "table/format.h"
+#include <cinttypes>
#include <string>
-#include <inttypes.h>
+#include "block_fetcher.h"
+#include "file/random_access_file_reader.h"
+#include "memory/memory_allocator.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/statistics.h"
#include "rocksdb/env.h"
-#include "table/block.h"
-#include "table/block_based_table_reader.h"
+#include "rocksdb/options.h"
+#include "table/block_based/block.h"
+#include "table/block_based/block_based_table_reader.h"
#include "table/persistent_cache_helper.h"
#include "util/coding.h"
#include "util/compression.h"
#include "util/crc32c.h"
-#include "util/file_reader_writer.h"
-#include "util/logging.h"
#include "util/stop_watch.h"
#include "util/string_util.h"
-#include "util/xxhash.h"
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
extern const uint64_t kLegacyBlockBasedTableMagicNumber;
extern const uint64_t kBlockBasedTableMagicNumber;
const uint64_t kLegacyPlainTableMagicNumber = 0;
const uint64_t kPlainTableMagicNumber = 0;
#endif
-const uint32_t DefaultStackBufferSize = 5000;
+const char* kHostnameForDbHostId = "__hostname__";
bool ShouldReportDetailedTime(Env* env, Statistics* stats) {
return env != nullptr && stats != nullptr &&
- stats->stats_level_ > kExceptDetailedTimers;
+ stats->get_stats_level() > kExceptDetailedTimers;
}
void BlockHandle::EncodeTo(std::string* dst) const {
}
Status BlockHandle::DecodeFrom(Slice* input) {
- if (GetVarint64(input, &offset_) &&
- GetVarint64(input, &size_)) {
+ if (GetVarint64(input, &offset_) && GetVarint64(input, &size_)) {
+ return Status::OK();
+ } else {
+ // reset in case failure after partially decoding
+ offset_ = 0;
+ size_ = 0;
+ return Status::Corruption("bad block handle");
+ }
+}
+
+Status BlockHandle::DecodeSizeFrom(uint64_t _offset, Slice* input) {
+ if (GetVarint64(input, &size_)) {
+ offset_ = _offset;
return Status::OK();
} else {
// reset in case failure after partially decoding
const BlockHandle BlockHandle::kNullBlockHandle(0, 0);
+void IndexValue::EncodeTo(std::string* dst, bool have_first_key,
+ const BlockHandle* previous_handle) const {
+ if (previous_handle) {
+ assert(handle.offset() == previous_handle->offset() +
+ previous_handle->size() + kBlockTrailerSize);
+ PutVarsignedint64(dst, handle.size() - previous_handle->size());
+ } else {
+ handle.EncodeTo(dst);
+ }
+ assert(dst->size() != 0);
+
+ if (have_first_key) {
+ PutLengthPrefixedSlice(dst, first_internal_key);
+ }
+}
+
+Status IndexValue::DecodeFrom(Slice* input, bool have_first_key,
+ const BlockHandle* previous_handle) {
+ if (previous_handle) {
+ int64_t delta;
+ if (!GetVarsignedint64(input, &delta)) {
+ return Status::Corruption("bad delta-encoded index value");
+ }
+ handle = BlockHandle(
+ previous_handle->offset() + previous_handle->size() + kBlockTrailerSize,
+ previous_handle->size() + delta);
+ } else {
+ Status s = handle.DecodeFrom(input);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ if (!have_first_key) {
+ first_internal_key = Slice();
+ } else if (!GetLengthPrefixedSlice(input, &first_internal_key)) {
+ return Status::Corruption("bad first key in block info");
+ }
+
+ return Status::OK();
+}
+
+std::string IndexValue::ToString(bool hex, bool have_first_key) const {
+ std::string s;
+ EncodeTo(&s, have_first_key, nullptr);
+ if (hex) {
+ return Slice(s).ToString(true);
+ } else {
+ return s;
+ }
+}
+
namespace {
inline bool IsLegacyFooterFormat(uint64_t magic_number) {
return magic_number == kLegacyBlockBasedTableMagicNumber ||
// <padding> to make the total size 2 * BlockHandle::kMaxEncodedLength
// table_magic_number (8 bytes)
// new footer format:
-// checksum (char, 1 byte)
+// checksum type (char, 1 byte)
// metaindex handle (varint64 offset, varint64 size)
// index handle (varint64 offset, varint64 size)
// <padding> to make the total size 2 * BlockHandle::kMaxEncodedLength + 1
assert(input != nullptr);
assert(input->size() >= kMinEncodedLength);
- const char *magic_ptr =
+ const char* magic_ptr =
input->data() + input->size() - kMagicNumberLengthByte;
const uint32_t magic_lo = DecodeFixed32(magic_ptr);
const uint32_t magic_hi = DecodeFixed32(magic_ptr + 4);
}
std::string Footer::ToString() const {
- std::string result, handle_;
+ std::string result;
result.reserve(1024);
bool legacy = IsLegacyFooterFormat(table_magic_number_);
result.append("metaindex handle: " + metaindex_handle_.ToString() + "\n ");
result.append("index handle: " + index_handle_.ToString() + "\n ");
result.append("table_magic_number: " +
- rocksdb::ToString(table_magic_number_) + "\n ");
+ ROCKSDB_NAMESPACE::ToString(table_magic_number_) + "\n ");
} else {
- result.append("checksum: " + rocksdb::ToString(checksum_) + "\n ");
+ result.append("checksum: " + ROCKSDB_NAMESPACE::ToString(checksum_) +
+ "\n ");
result.append("metaindex handle: " + metaindex_handle_.ToString() + "\n ");
result.append("index handle: " + index_handle_.ToString() + "\n ");
- result.append("footer version: " + rocksdb::ToString(version_) + "\n ");
+ result.append("footer version: " + ROCKSDB_NAMESPACE::ToString(version_) +
+ "\n ");
result.append("table_magic_number: " +
- rocksdb::ToString(table_magic_number_) + "\n ");
+ ROCKSDB_NAMESPACE::ToString(table_magic_number_) + "\n ");
}
return result;
}
-Status ReadFooterFromFile(RandomAccessFileReader* file, uint64_t file_size,
- Footer* footer, uint64_t enforce_table_magic_number) {
+Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file,
+ FilePrefetchBuffer* prefetch_buffer,
+ uint64_t file_size, Footer* footer,
+ uint64_t enforce_table_magic_number) {
if (file_size < Footer::kMinEncodedLength) {
- return Status::Corruption("file is too short to be an sstable");
+ return Status::Corruption("file is too short (" + ToString(file_size) +
+ " bytes) to be an "
+ "sstable: " +
+ file->file_name());
}
- char footer_space[Footer::kMaxEncodedLength];
+ std::string footer_buf;
+ AlignedBuf internal_buf;
Slice footer_input;
size_t read_offset =
(file_size > Footer::kMaxEncodedLength)
? static_cast<size_t>(file_size - Footer::kMaxEncodedLength)
: 0;
- Status s = file->Read(read_offset, Footer::kMaxEncodedLength, &footer_input,
- footer_space);
- if (!s.ok()) return s;
+ Status s;
+ // TODO: Need to pass appropriate deadline to TryReadFromCache(). Right now,
+ // there is no readahead for point lookups, so TryReadFromCache will fail if
+ // the required data is not in the prefetch buffer. Once deadline is enabled
+ // for iterator, TryReadFromCache might do a readahead. Revisit to see if we
+ // need to pass a timeout at that point
+ if (prefetch_buffer == nullptr ||
+ !prefetch_buffer->TryReadFromCache(
+ IOOptions(), read_offset, Footer::kMaxEncodedLength, &footer_input)) {
+ if (file->use_direct_io()) {
+ s = file->Read(opts, read_offset, Footer::kMaxEncodedLength,
+ &footer_input, nullptr, &internal_buf);
+ } else {
+ footer_buf.reserve(Footer::kMaxEncodedLength);
+ s = file->Read(opts, read_offset, Footer::kMaxEncodedLength,
+ &footer_input, &footer_buf[0], nullptr);
+ }
+ if (!s.ok()) return s;
+ }
// Check that we actually read the whole footer from the file. It may be
// that size isn't correct.
if (footer_input.size() < Footer::kMinEncodedLength) {
- return Status::Corruption("file is too short to be an sstable");
+ return Status::Corruption("file is too short (" + ToString(file_size) +
+ " bytes) to be an "
+ "sstable" +
+ file->file_name());
}
s = footer->DecodeFrom(&footer_input);
}
if (enforce_table_magic_number != 0 &&
enforce_table_magic_number != footer->table_magic_number()) {
- return Status::Corruption("Bad table magic number");
+ return Status::Corruption(
+ "Bad table magic number: expected " +
+ ToString(enforce_table_magic_number) + ", found " +
+ ToString(footer->table_magic_number()) + " in " + file->file_name());
}
return Status::OK();
}
-// Without anonymous namespace here, we fail the warning -Wmissing-prototypes
-namespace {
-
-// Read a block and check its CRC
-// contents is the result of reading.
-// According to the implementation of file->Read, contents may not point to buf
-Status ReadBlock(RandomAccessFileReader* file, const Footer& footer,
- const ReadOptions& options, const BlockHandle& handle,
- Slice* contents, /* result of reading */ char* buf) {
- size_t n = static_cast<size_t>(handle.size());
- Status s;
-
- {
- PERF_TIMER_GUARD(block_read_time);
- s = file->Read(handle.offset(), n + kBlockTrailerSize, contents, buf);
- }
-
- PERF_COUNTER_ADD(block_read_count, 1);
- PERF_COUNTER_ADD(block_read_byte, n + kBlockTrailerSize);
-
- if (!s.ok()) {
- return s;
- }
- if (contents->size() != n + kBlockTrailerSize) {
- return Status::Corruption("truncated block read");
- }
-
- // Check the crc of the type and the block contents
- const char* data = contents->data(); // Pointer to where Read put the data
- if (options.verify_checksums) {
- PERF_TIMER_GUARD(block_checksum_time);
- uint32_t value = DecodeFixed32(data + n + 1);
- uint32_t actual = 0;
- switch (footer.checksum()) {
- case kCRC32c:
- value = crc32c::Unmask(value);
- actual = crc32c::Value(data, n + 1);
- break;
- case kxxHash:
- actual = XXH32(data, static_cast<int>(n) + 1, 0);
- break;
- default:
- s = Status::Corruption("unknown checksum type");
- }
- if (s.ok() && actual != value) {
- s = Status::Corruption("block checksum mismatch");
- }
- if (!s.ok()) {
- return s;
- }
- }
- return s;
-}
-
-} // namespace
-
-Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer,
- const ReadOptions& read_options,
- const BlockHandle& handle, BlockContents* contents,
- const ImmutableCFOptions &ioptions,
- bool decompression_requested,
- const Slice& compression_dict,
- const PersistentCacheOptions& cache_options) {
- Status status;
- Slice slice;
- size_t n = static_cast<size_t>(handle.size());
- std::unique_ptr<char[]> heap_buf;
- char stack_buf[DefaultStackBufferSize];
- char* used_buf = nullptr;
- rocksdb::CompressionType compression_type;
-
- if (cache_options.persistent_cache &&
- !cache_options.persistent_cache->IsCompressed()) {
- status = PersistentCacheHelper::LookupUncompressedPage(cache_options,
- handle, contents);
- if (status.ok()) {
- // uncompressed page is found for the block handle
- return status;
- } else {
- // uncompressed page is not found
- if (ioptions.info_log && !status.IsNotFound()) {
- assert(!status.ok());
- ROCKS_LOG_INFO(ioptions.info_log,
- "Error reading from persistent cache. %s",
- status.ToString().c_str());
- }
- }
- }
-
- if (cache_options.persistent_cache &&
- cache_options.persistent_cache->IsCompressed()) {
- // lookup uncompressed cache mode p-cache
- status = PersistentCacheHelper::LookupRawPage(
- cache_options, handle, &heap_buf, n + kBlockTrailerSize);
- } else {
- status = Status::NotFound();
- }
-
- if (status.ok()) {
- // cache hit
- used_buf = heap_buf.get();
- slice = Slice(heap_buf.get(), n);
- } else {
- if (ioptions.info_log && !status.IsNotFound()) {
- assert(!status.ok());
- ROCKS_LOG_INFO(ioptions.info_log,
- "Error reading from persistent cache. %s",
- status.ToString().c_str());
- }
- // cache miss read from device
- if (decompression_requested &&
- n + kBlockTrailerSize < DefaultStackBufferSize) {
- // If we've got a small enough hunk of data, read it in to the
- // trivially allocated stack buffer instead of needing a full malloc()
- used_buf = &stack_buf[0];
- } else {
- heap_buf = std::unique_ptr<char[]>(new char[n + kBlockTrailerSize]);
- used_buf = heap_buf.get();
- }
-
- status = ReadBlock(file, footer, read_options, handle, &slice, used_buf);
- if (status.ok() && read_options.fill_cache &&
- cache_options.persistent_cache &&
- cache_options.persistent_cache->IsCompressed()) {
- // insert to raw cache
- PersistentCacheHelper::InsertRawPage(cache_options, handle, used_buf,
- n + kBlockTrailerSize);
- }
- }
-
- if (!status.ok()) {
- return status;
- }
-
- PERF_TIMER_GUARD(block_decompress_time);
-
- compression_type = static_cast<rocksdb::CompressionType>(slice.data()[n]);
-
- if (decompression_requested && compression_type != kNoCompression) {
- // compressed page, uncompress, update cache
- status = UncompressBlockContents(slice.data(), n, contents,
- footer.version(), compression_dict,
- ioptions);
- } else if (slice.data() != used_buf) {
- // the slice content is not the buffer provided
- *contents = BlockContents(Slice(slice.data(), n), false, compression_type);
- } else {
- // page is uncompressed, the buffer either stack or heap provided
- if (used_buf == &stack_buf[0]) {
- heap_buf = std::unique_ptr<char[]>(new char[n]);
- memcpy(heap_buf.get(), stack_buf, n);
- }
- *contents = BlockContents(std::move(heap_buf), n, true, compression_type);
- }
-
- if (status.ok() && read_options.fill_cache &&
- cache_options.persistent_cache &&
- !cache_options.persistent_cache->IsCompressed()) {
- // insert to uncompressed cache
- PersistentCacheHelper::InsertUncompressedPage(cache_options, handle,
- *contents);
- }
-
- return status;
-}
-
Status UncompressBlockContentsForCompressionType(
- const char* data, size_t n, BlockContents* contents,
- uint32_t format_version, const Slice& compression_dict,
- CompressionType compression_type, const ImmutableCFOptions &ioptions) {
- std::unique_ptr<char[]> ubuf;
-
- assert(compression_type != kNoCompression && "Invalid compression type");
-
- StopWatchNano timer(ioptions.env,
- ShouldReportDetailedTime(ioptions.env, ioptions.statistics));
- int decompress_size = 0;
- switch (compression_type) {
- case kSnappyCompression: {
- size_t ulength = 0;
- static char snappy_corrupt_msg[] =
- "Snappy not supported or corrupted Snappy compressed block contents";
- if (!Snappy_GetUncompressedLength(data, n, &ulength)) {
- return Status::Corruption(snappy_corrupt_msg);
- }
- ubuf.reset(new char[ulength]);
- if (!Snappy_Uncompress(data, n, ubuf.get())) {
- return Status::Corruption(snappy_corrupt_msg);
- }
- *contents = BlockContents(std::move(ubuf), ulength, true, kNoCompression);
- break;
- }
- case kZlibCompression:
- ubuf.reset(Zlib_Uncompress(
- data, n, &decompress_size,
- GetCompressFormatForVersion(kZlibCompression, format_version),
- compression_dict));
- if (!ubuf) {
- static char zlib_corrupt_msg[] =
- "Zlib not supported or corrupted Zlib compressed block contents";
- return Status::Corruption(zlib_corrupt_msg);
- }
- *contents =
- BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
- break;
- case kBZip2Compression:
- ubuf.reset(BZip2_Uncompress(
- data, n, &decompress_size,
- GetCompressFormatForVersion(kBZip2Compression, format_version)));
- if (!ubuf) {
- static char bzip2_corrupt_msg[] =
- "Bzip2 not supported or corrupted Bzip2 compressed block contents";
- return Status::Corruption(bzip2_corrupt_msg);
- }
- *contents =
- BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
- break;
- case kLZ4Compression:
- ubuf.reset(LZ4_Uncompress(
- data, n, &decompress_size,
- GetCompressFormatForVersion(kLZ4Compression, format_version),
- compression_dict));
- if (!ubuf) {
- static char lz4_corrupt_msg[] =
- "LZ4 not supported or corrupted LZ4 compressed block contents";
- return Status::Corruption(lz4_corrupt_msg);
- }
- *contents =
- BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
- break;
- case kLZ4HCCompression:
- ubuf.reset(LZ4_Uncompress(
- data, n, &decompress_size,
- GetCompressFormatForVersion(kLZ4HCCompression, format_version),
- compression_dict));
- if (!ubuf) {
- static char lz4hc_corrupt_msg[] =
- "LZ4HC not supported or corrupted LZ4HC compressed block contents";
- return Status::Corruption(lz4hc_corrupt_msg);
- }
- *contents =
- BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
- break;
- case kXpressCompression:
- ubuf.reset(XPRESS_Uncompress(data, n, &decompress_size));
- if (!ubuf) {
- static char xpress_corrupt_msg[] =
- "XPRESS not supported or corrupted XPRESS compressed block contents";
- return Status::Corruption(xpress_corrupt_msg);
- }
- *contents =
- BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
- break;
- case kZSTD:
- case kZSTDNotFinalCompression:
- ubuf.reset(ZSTD_Uncompress(data, n, &decompress_size, compression_dict));
- if (!ubuf) {
- static char zstd_corrupt_msg[] =
- "ZSTD not supported or corrupted ZSTD compressed block contents";
- return Status::Corruption(zstd_corrupt_msg);
- }
- *contents =
- BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
- break;
- default:
- return Status::Corruption("bad block type");
+ const UncompressionInfo& uncompression_info, const char* data, size_t n,
+ BlockContents* contents, uint32_t format_version,
+ const ImmutableCFOptions& ioptions, MemoryAllocator* allocator) {
+ Status ret = Status::OK();
+
+ assert(uncompression_info.type() != kNoCompression &&
+ "Invalid compression type");
+
+ StopWatchNano timer(ioptions.env, ShouldReportDetailedTime(
+ ioptions.env, ioptions.statistics));
+ size_t uncompressed_size = 0;
+ CacheAllocationPtr ubuf =
+ UncompressData(uncompression_info, data, n, &uncompressed_size,
+ GetCompressFormatForVersion(format_version), allocator);
+ if (!ubuf) {
+ return Status::Corruption(
+ "Unsupported compression method or corrupted compressed block contents",
+ CompressionTypeToString(uncompression_info.type()));
}
- if(ShouldReportDetailedTime(ioptions.env, ioptions.statistics)){
- MeasureTime(ioptions.statistics, DECOMPRESSION_TIMES_NANOS,
- timer.ElapsedNanos());
- MeasureTime(ioptions.statistics, BYTES_DECOMPRESSED, contents->data.size());
- RecordTick(ioptions.statistics, NUMBER_BLOCK_DECOMPRESSED);
- }
+ *contents = BlockContents(std::move(ubuf), uncompressed_size);
- return Status::OK();
+ if (ShouldReportDetailedTime(ioptions.env, ioptions.statistics)) {
+ RecordTimeToHistogram(ioptions.statistics, DECOMPRESSION_TIMES_NANOS,
+ timer.ElapsedNanos());
+ }
+ RecordTimeToHistogram(ioptions.statistics, BYTES_DECOMPRESSED,
+ contents->data.size());
+ RecordTick(ioptions.statistics, NUMBER_BLOCK_DECOMPRESSED);
+
+ TEST_SYNC_POINT_CALLBACK(
+ "UncompressBlockContentsForCompressionType:TamperWithReturnValue",
+ static_cast<void*>(&ret));
+ TEST_SYNC_POINT_CALLBACK(
+ "UncompressBlockContentsForCompressionType:"
+ "TamperWithDecompressionOutput",
+ static_cast<void*>(contents));
+
+ return ret;
}
//
// buffer is returned via 'result' and it is upto the caller to
// free this buffer.
// format_version is the block format as defined in include/rocksdb/table.h
-Status UncompressBlockContents(const char* data, size_t n,
+Status UncompressBlockContents(const UncompressionInfo& uncompression_info,
+ const char* data, size_t n,
BlockContents* contents, uint32_t format_version,
- const Slice& compression_dict,
- const ImmutableCFOptions &ioptions) {
+ const ImmutableCFOptions& ioptions,
+ MemoryAllocator* allocator) {
assert(data[n] != kNoCompression);
- return UncompressBlockContentsForCompressionType(
- data, n, contents, format_version, compression_dict,
- (CompressionType)data[n], ioptions);
+ assert(data[n] == static_cast<char>(uncompression_info.type()));
+ return UncompressBlockContentsForCompressionType(uncompression_info, data, n,
+ contents, format_version,
+ ioptions, allocator);
}
-} // namespace rocksdb
+// Replace the contents of db_host_id with the actual hostname, if db_host_id
+// matches the keyword kHostnameForDbHostId
+Status ReifyDbHostIdProperty(Env* env, std::string* db_host_id) {
+ assert(db_host_id);
+ if (*db_host_id == kHostnameForDbHostId) {
+ Status s = env->GetHostNameString(db_host_id);
+ if (!s.ok()) {
+ db_host_id->clear();
+ }
+ return s;
+ }
+
+ return Status::OK();
+}
+} // namespace ROCKSDB_NAMESPACE