X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=ceph%2Fsrc%2Frocksdb%2Ftable%2Fformat.cc;h=8fb66f8a1c8fd34bc16881a957bf27cef937ca65;hb=20effc670b57271cb089376d6d0800990e5218d5;hp=2d03316b4f0e325b90b0a6acadd4de609966b79a;hpb=7c673caec407dd16107e56e4b51a6d00f021315c;p=ceph.git diff --git a/ceph/src/rocksdb/table/format.cc b/ceph/src/rocksdb/table/format.cc index 2d03316b4..8fb66f8a1 100644 --- a/ceph/src/rocksdb/table/format.cc +++ b/ceph/src/rocksdb/table/format.cc @@ -1,7 +1,7 @@ // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). // // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be @@ -9,25 +9,26 @@ #include "table/format.h" +#include #include -#include +#include "block_fetcher.h" +#include "file/random_access_file_reader.h" +#include "memory/memory_allocator.h" #include "monitoring/perf_context_imp.h" #include "monitoring/statistics.h" #include "rocksdb/env.h" -#include "table/block.h" -#include "table/block_based_table_reader.h" +#include "rocksdb/options.h" +#include "table/block_based/block.h" +#include "table/block_based/block_based_table_reader.h" #include "table/persistent_cache_helper.h" #include "util/coding.h" #include "util/compression.h" #include "util/crc32c.h" -#include "util/file_reader_writer.h" -#include "util/logging.h" #include "util/stop_watch.h" #include "util/string_util.h" -#include "util/xxhash.h" -namespace rocksdb { +namespace ROCKSDB_NAMESPACE { extern const uint64_t kLegacyBlockBasedTableMagicNumber; extern const uint64_t kBlockBasedTableMagicNumber; @@ -40,11 +41,11 @@ extern const uint64_t kPlainTableMagicNumber; const uint64_t kLegacyPlainTableMagicNumber = 0; const uint64_t kPlainTableMagicNumber = 0; #endif -const uint32_t DefaultStackBufferSize = 5000; +const char* kHostnameForDbHostId = "__hostname__"; bool ShouldReportDetailedTime(Env* env, Statistics* stats) { return env != nullptr && stats != nullptr && - stats->stats_level_ > kExceptDetailedTimers; + stats->get_stats_level() > kExceptDetailedTimers; } void BlockHandle::EncodeTo(std::string* dst) const { @@ -55,8 +56,19 @@ void BlockHandle::EncodeTo(std::string* dst) const { } Status BlockHandle::DecodeFrom(Slice* input) { - if (GetVarint64(input, &offset_) && - GetVarint64(input, &size_)) { + if (GetVarint64(input, &offset_) && GetVarint64(input, &size_)) { + return Status::OK(); + } else { + // reset in case failure after partially decoding + offset_ = 0; + size_ = 0; + return Status::Corruption("bad block handle"); + } +} + +Status BlockHandle::DecodeSizeFrom(uint64_t _offset, Slice* input) { + if (GetVarint64(input, &size_)) { + offset_ = _offset; return Status::OK(); } else { // reset in case failure after partially decoding @@ -79,6 +91,58 @@ std::string BlockHandle::ToString(bool hex) const { const BlockHandle BlockHandle::kNullBlockHandle(0, 0); +void IndexValue::EncodeTo(std::string* dst, bool have_first_key, + const BlockHandle* previous_handle) const { + if (previous_handle) { + assert(handle.offset() == previous_handle->offset() + + previous_handle->size() + kBlockTrailerSize); + PutVarsignedint64(dst, handle.size() - previous_handle->size()); + } else { + handle.EncodeTo(dst); + } + assert(dst->size() != 0); + + if (have_first_key) { + PutLengthPrefixedSlice(dst, first_internal_key); + } +} + +Status IndexValue::DecodeFrom(Slice* input, bool have_first_key, + const BlockHandle* previous_handle) { + if (previous_handle) { + int64_t delta; + if (!GetVarsignedint64(input, &delta)) { + return Status::Corruption("bad delta-encoded index value"); + } + handle = BlockHandle( + previous_handle->offset() + previous_handle->size() + kBlockTrailerSize, + previous_handle->size() + delta); + } else { + Status s = handle.DecodeFrom(input); + if (!s.ok()) { + return s; + } + } + + if (!have_first_key) { + first_internal_key = Slice(); + } else if (!GetLengthPrefixedSlice(input, &first_internal_key)) { + return Status::Corruption("bad first key in block info"); + } + + return Status::OK(); +} + +std::string IndexValue::ToString(bool hex, bool have_first_key) const { + std::string s; + EncodeTo(&s, have_first_key, nullptr); + if (hex) { + return Slice(s).ToString(true); + } else { + return s; + } +} + namespace { inline bool IsLegacyFooterFormat(uint64_t magic_number) { return magic_number == kLegacyBlockBasedTableMagicNumber || @@ -102,7 +166,7 @@ inline uint64_t UpconvertLegacyFooterFormat(uint64_t magic_number) { // to make the total size 2 * BlockHandle::kMaxEncodedLength // table_magic_number (8 bytes) // new footer format: -// checksum (char, 1 byte) +// checksum type (char, 1 byte) // metaindex handle (varint64 offset, varint64 size) // index handle (varint64 offset, varint64 size) // to make the total size 2 * BlockHandle::kMaxEncodedLength + 1 @@ -146,7 +210,7 @@ Status Footer::DecodeFrom(Slice* input) { assert(input != nullptr); assert(input->size() >= kMinEncodedLength); - const char *magic_ptr = + const char* magic_ptr = input->data() + input->size() - kMagicNumberLengthByte; const uint32_t magic_lo = DecodeFixed32(magic_ptr); const uint32_t magic_hi = DecodeFixed32(magic_ptr + 4); @@ -196,7 +260,7 @@ Status Footer::DecodeFrom(Slice* input) { } std::string Footer::ToString() const { - std::string result, handle_; + std::string result; result.reserve(1024); bool legacy = IsLegacyFooterFormat(table_magic_number_); @@ -204,38 +268,65 @@ std::string Footer::ToString() const { result.append("metaindex handle: " + metaindex_handle_.ToString() + "\n "); result.append("index handle: " + index_handle_.ToString() + "\n "); result.append("table_magic_number: " + - rocksdb::ToString(table_magic_number_) + "\n "); + ROCKSDB_NAMESPACE::ToString(table_magic_number_) + "\n "); } else { - result.append("checksum: " + rocksdb::ToString(checksum_) + "\n "); + result.append("checksum: " + ROCKSDB_NAMESPACE::ToString(checksum_) + + "\n "); result.append("metaindex handle: " + metaindex_handle_.ToString() + "\n "); result.append("index handle: " + index_handle_.ToString() + "\n "); - result.append("footer version: " + rocksdb::ToString(version_) + "\n "); + result.append("footer version: " + ROCKSDB_NAMESPACE::ToString(version_) + + "\n "); result.append("table_magic_number: " + - rocksdb::ToString(table_magic_number_) + "\n "); + ROCKSDB_NAMESPACE::ToString(table_magic_number_) + "\n "); } return result; } -Status ReadFooterFromFile(RandomAccessFileReader* file, uint64_t file_size, - Footer* footer, uint64_t enforce_table_magic_number) { +Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file, + FilePrefetchBuffer* prefetch_buffer, + uint64_t file_size, Footer* footer, + uint64_t enforce_table_magic_number) { if (file_size < Footer::kMinEncodedLength) { - return Status::Corruption("file is too short to be an sstable"); + return Status::Corruption("file is too short (" + ToString(file_size) + + " bytes) to be an " + "sstable: " + + file->file_name()); } - char footer_space[Footer::kMaxEncodedLength]; + std::string footer_buf; + AlignedBuf internal_buf; Slice footer_input; size_t read_offset = (file_size > Footer::kMaxEncodedLength) ? static_cast(file_size - Footer::kMaxEncodedLength) : 0; - Status s = file->Read(read_offset, Footer::kMaxEncodedLength, &footer_input, - footer_space); - if (!s.ok()) return s; + Status s; + // TODO: Need to pass appropriate deadline to TryReadFromCache(). Right now, + // there is no readahead for point lookups, so TryReadFromCache will fail if + // the required data is not in the prefetch buffer. Once deadline is enabled + // for iterator, TryReadFromCache might do a readahead. Revisit to see if we + // need to pass a timeout at that point + if (prefetch_buffer == nullptr || + !prefetch_buffer->TryReadFromCache( + IOOptions(), read_offset, Footer::kMaxEncodedLength, &footer_input)) { + if (file->use_direct_io()) { + s = file->Read(opts, read_offset, Footer::kMaxEncodedLength, + &footer_input, nullptr, &internal_buf); + } else { + footer_buf.reserve(Footer::kMaxEncodedLength); + s = file->Read(opts, read_offset, Footer::kMaxEncodedLength, + &footer_input, &footer_buf[0], nullptr); + } + if (!s.ok()) return s; + } // Check that we actually read the whole footer from the file. It may be // that size isn't correct. if (footer_input.size() < Footer::kMinEncodedLength) { - return Status::Corruption("file is too short to be an sstable"); + return Status::Corruption("file is too short (" + ToString(file_size) + + " bytes) to be an " + "sstable" + + file->file_name()); } s = footer->DecodeFrom(&footer_input); @@ -244,287 +335,54 @@ Status ReadFooterFromFile(RandomAccessFileReader* file, uint64_t file_size, } if (enforce_table_magic_number != 0 && enforce_table_magic_number != footer->table_magic_number()) { - return Status::Corruption("Bad table magic number"); + return Status::Corruption( + "Bad table magic number: expected " + + ToString(enforce_table_magic_number) + ", found " + + ToString(footer->table_magic_number()) + " in " + file->file_name()); } return Status::OK(); } -// Without anonymous namespace here, we fail the warning -Wmissing-prototypes -namespace { - -// Read a block and check its CRC -// contents is the result of reading. -// According to the implementation of file->Read, contents may not point to buf -Status ReadBlock(RandomAccessFileReader* file, const Footer& footer, - const ReadOptions& options, const BlockHandle& handle, - Slice* contents, /* result of reading */ char* buf) { - size_t n = static_cast(handle.size()); - Status s; - - { - PERF_TIMER_GUARD(block_read_time); - s = file->Read(handle.offset(), n + kBlockTrailerSize, contents, buf); - } - - PERF_COUNTER_ADD(block_read_count, 1); - PERF_COUNTER_ADD(block_read_byte, n + kBlockTrailerSize); - - if (!s.ok()) { - return s; - } - if (contents->size() != n + kBlockTrailerSize) { - return Status::Corruption("truncated block read"); - } - - // Check the crc of the type and the block contents - const char* data = contents->data(); // Pointer to where Read put the data - if (options.verify_checksums) { - PERF_TIMER_GUARD(block_checksum_time); - uint32_t value = DecodeFixed32(data + n + 1); - uint32_t actual = 0; - switch (footer.checksum()) { - case kCRC32c: - value = crc32c::Unmask(value); - actual = crc32c::Value(data, n + 1); - break; - case kxxHash: - actual = XXH32(data, static_cast(n) + 1, 0); - break; - default: - s = Status::Corruption("unknown checksum type"); - } - if (s.ok() && actual != value) { - s = Status::Corruption("block checksum mismatch"); - } - if (!s.ok()) { - return s; - } - } - return s; -} - -} // namespace - -Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer, - const ReadOptions& read_options, - const BlockHandle& handle, BlockContents* contents, - const ImmutableCFOptions &ioptions, - bool decompression_requested, - const Slice& compression_dict, - const PersistentCacheOptions& cache_options) { - Status status; - Slice slice; - size_t n = static_cast(handle.size()); - std::unique_ptr heap_buf; - char stack_buf[DefaultStackBufferSize]; - char* used_buf = nullptr; - rocksdb::CompressionType compression_type; - - if (cache_options.persistent_cache && - !cache_options.persistent_cache->IsCompressed()) { - status = PersistentCacheHelper::LookupUncompressedPage(cache_options, - handle, contents); - if (status.ok()) { - // uncompressed page is found for the block handle - return status; - } else { - // uncompressed page is not found - if (ioptions.info_log && !status.IsNotFound()) { - assert(!status.ok()); - ROCKS_LOG_INFO(ioptions.info_log, - "Error reading from persistent cache. %s", - status.ToString().c_str()); - } - } - } - - if (cache_options.persistent_cache && - cache_options.persistent_cache->IsCompressed()) { - // lookup uncompressed cache mode p-cache - status = PersistentCacheHelper::LookupRawPage( - cache_options, handle, &heap_buf, n + kBlockTrailerSize); - } else { - status = Status::NotFound(); - } - - if (status.ok()) { - // cache hit - used_buf = heap_buf.get(); - slice = Slice(heap_buf.get(), n); - } else { - if (ioptions.info_log && !status.IsNotFound()) { - assert(!status.ok()); - ROCKS_LOG_INFO(ioptions.info_log, - "Error reading from persistent cache. %s", - status.ToString().c_str()); - } - // cache miss read from device - if (decompression_requested && - n + kBlockTrailerSize < DefaultStackBufferSize) { - // If we've got a small enough hunk of data, read it in to the - // trivially allocated stack buffer instead of needing a full malloc() - used_buf = &stack_buf[0]; - } else { - heap_buf = std::unique_ptr(new char[n + kBlockTrailerSize]); - used_buf = heap_buf.get(); - } - - status = ReadBlock(file, footer, read_options, handle, &slice, used_buf); - if (status.ok() && read_options.fill_cache && - cache_options.persistent_cache && - cache_options.persistent_cache->IsCompressed()) { - // insert to raw cache - PersistentCacheHelper::InsertRawPage(cache_options, handle, used_buf, - n + kBlockTrailerSize); - } - } - - if (!status.ok()) { - return status; - } - - PERF_TIMER_GUARD(block_decompress_time); - - compression_type = static_cast(slice.data()[n]); - - if (decompression_requested && compression_type != kNoCompression) { - // compressed page, uncompress, update cache - status = UncompressBlockContents(slice.data(), n, contents, - footer.version(), compression_dict, - ioptions); - } else if (slice.data() != used_buf) { - // the slice content is not the buffer provided - *contents = BlockContents(Slice(slice.data(), n), false, compression_type); - } else { - // page is uncompressed, the buffer either stack or heap provided - if (used_buf == &stack_buf[0]) { - heap_buf = std::unique_ptr(new char[n]); - memcpy(heap_buf.get(), stack_buf, n); - } - *contents = BlockContents(std::move(heap_buf), n, true, compression_type); - } - - if (status.ok() && read_options.fill_cache && - cache_options.persistent_cache && - !cache_options.persistent_cache->IsCompressed()) { - // insert to uncompressed cache - PersistentCacheHelper::InsertUncompressedPage(cache_options, handle, - *contents); - } - - return status; -} - Status UncompressBlockContentsForCompressionType( - const char* data, size_t n, BlockContents* contents, - uint32_t format_version, const Slice& compression_dict, - CompressionType compression_type, const ImmutableCFOptions &ioptions) { - std::unique_ptr ubuf; - - assert(compression_type != kNoCompression && "Invalid compression type"); - - StopWatchNano timer(ioptions.env, - ShouldReportDetailedTime(ioptions.env, ioptions.statistics)); - int decompress_size = 0; - switch (compression_type) { - case kSnappyCompression: { - size_t ulength = 0; - static char snappy_corrupt_msg[] = - "Snappy not supported or corrupted Snappy compressed block contents"; - if (!Snappy_GetUncompressedLength(data, n, &ulength)) { - return Status::Corruption(snappy_corrupt_msg); - } - ubuf.reset(new char[ulength]); - if (!Snappy_Uncompress(data, n, ubuf.get())) { - return Status::Corruption(snappy_corrupt_msg); - } - *contents = BlockContents(std::move(ubuf), ulength, true, kNoCompression); - break; - } - case kZlibCompression: - ubuf.reset(Zlib_Uncompress( - data, n, &decompress_size, - GetCompressFormatForVersion(kZlibCompression, format_version), - compression_dict)); - if (!ubuf) { - static char zlib_corrupt_msg[] = - "Zlib not supported or corrupted Zlib compressed block contents"; - return Status::Corruption(zlib_corrupt_msg); - } - *contents = - BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); - break; - case kBZip2Compression: - ubuf.reset(BZip2_Uncompress( - data, n, &decompress_size, - GetCompressFormatForVersion(kBZip2Compression, format_version))); - if (!ubuf) { - static char bzip2_corrupt_msg[] = - "Bzip2 not supported or corrupted Bzip2 compressed block contents"; - return Status::Corruption(bzip2_corrupt_msg); - } - *contents = - BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); - break; - case kLZ4Compression: - ubuf.reset(LZ4_Uncompress( - data, n, &decompress_size, - GetCompressFormatForVersion(kLZ4Compression, format_version), - compression_dict)); - if (!ubuf) { - static char lz4_corrupt_msg[] = - "LZ4 not supported or corrupted LZ4 compressed block contents"; - return Status::Corruption(lz4_corrupt_msg); - } - *contents = - BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); - break; - case kLZ4HCCompression: - ubuf.reset(LZ4_Uncompress( - data, n, &decompress_size, - GetCompressFormatForVersion(kLZ4HCCompression, format_version), - compression_dict)); - if (!ubuf) { - static char lz4hc_corrupt_msg[] = - "LZ4HC not supported or corrupted LZ4HC compressed block contents"; - return Status::Corruption(lz4hc_corrupt_msg); - } - *contents = - BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); - break; - case kXpressCompression: - ubuf.reset(XPRESS_Uncompress(data, n, &decompress_size)); - if (!ubuf) { - static char xpress_corrupt_msg[] = - "XPRESS not supported or corrupted XPRESS compressed block contents"; - return Status::Corruption(xpress_corrupt_msg); - } - *contents = - BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); - break; - case kZSTD: - case kZSTDNotFinalCompression: - ubuf.reset(ZSTD_Uncompress(data, n, &decompress_size, compression_dict)); - if (!ubuf) { - static char zstd_corrupt_msg[] = - "ZSTD not supported or corrupted ZSTD compressed block contents"; - return Status::Corruption(zstd_corrupt_msg); - } - *contents = - BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); - break; - default: - return Status::Corruption("bad block type"); + const UncompressionInfo& uncompression_info, const char* data, size_t n, + BlockContents* contents, uint32_t format_version, + const ImmutableCFOptions& ioptions, MemoryAllocator* allocator) { + Status ret = Status::OK(); + + assert(uncompression_info.type() != kNoCompression && + "Invalid compression type"); + + StopWatchNano timer(ioptions.env, ShouldReportDetailedTime( + ioptions.env, ioptions.statistics)); + size_t uncompressed_size = 0; + CacheAllocationPtr ubuf = + UncompressData(uncompression_info, data, n, &uncompressed_size, + GetCompressFormatForVersion(format_version), allocator); + if (!ubuf) { + return Status::Corruption( + "Unsupported compression method or corrupted compressed block contents", + CompressionTypeToString(uncompression_info.type())); } - if(ShouldReportDetailedTime(ioptions.env, ioptions.statistics)){ - MeasureTime(ioptions.statistics, DECOMPRESSION_TIMES_NANOS, - timer.ElapsedNanos()); - MeasureTime(ioptions.statistics, BYTES_DECOMPRESSED, contents->data.size()); - RecordTick(ioptions.statistics, NUMBER_BLOCK_DECOMPRESSED); - } + *contents = BlockContents(std::move(ubuf), uncompressed_size); - return Status::OK(); + if (ShouldReportDetailedTime(ioptions.env, ioptions.statistics)) { + RecordTimeToHistogram(ioptions.statistics, DECOMPRESSION_TIMES_NANOS, + timer.ElapsedNanos()); + } + RecordTimeToHistogram(ioptions.statistics, BYTES_DECOMPRESSED, + contents->data.size()); + RecordTick(ioptions.statistics, NUMBER_BLOCK_DECOMPRESSED); + + TEST_SYNC_POINT_CALLBACK( + "UncompressBlockContentsForCompressionType:TamperWithReturnValue", + static_cast(&ret)); + TEST_SYNC_POINT_CALLBACK( + "UncompressBlockContentsForCompressionType:" + "TamperWithDecompressionOutput", + static_cast(contents)); + + return ret; } // @@ -534,14 +392,30 @@ Status UncompressBlockContentsForCompressionType( // buffer is returned via 'result' and it is upto the caller to // free this buffer. // format_version is the block format as defined in include/rocksdb/table.h -Status UncompressBlockContents(const char* data, size_t n, +Status UncompressBlockContents(const UncompressionInfo& uncompression_info, + const char* data, size_t n, BlockContents* contents, uint32_t format_version, - const Slice& compression_dict, - const ImmutableCFOptions &ioptions) { + const ImmutableCFOptions& ioptions, + MemoryAllocator* allocator) { assert(data[n] != kNoCompression); - return UncompressBlockContentsForCompressionType( - data, n, contents, format_version, compression_dict, - (CompressionType)data[n], ioptions); + assert(data[n] == static_cast(uncompression_info.type())); + return UncompressBlockContentsForCompressionType(uncompression_info, data, n, + contents, format_version, + ioptions, allocator); } -} // namespace rocksdb +// Replace the contents of db_host_id with the actual hostname, if db_host_id +// matches the keyword kHostnameForDbHostId +Status ReifyDbHostIdProperty(Env* env, std::string* db_host_id) { + assert(db_host_id); + if (*db_host_id == kHostnameForDbHostId) { + Status s = env->GetHostNameString(db_host_id); + if (!s.ok()) { + db_host_id->clear(); + } + return s; + } + + return Status::OK(); +} +} // namespace ROCKSDB_NAMESPACE