#include "block_fetcher.h"
#include "file/random_access_file_reader.h"
-#include "logging/logging.h"
#include "memory/memory_allocator.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/statistics.h"
#include "rocksdb/env.h"
+#include "rocksdb/options.h"
#include "table/block_based/block.h"
#include "table/block_based/block_based_table_reader.h"
#include "table/persistent_cache_helper.h"
const uint64_t kLegacyPlainTableMagicNumber = 0;
const uint64_t kPlainTableMagicNumber = 0;
#endif
+const char* kHostnameForDbHostId = "__hostname__";
bool ShouldReportDetailedTime(Env* env, Statistics* stats) {
return env != nullptr && stats != nullptr &&
return result;
}
-Status ReadFooterFromFile(RandomAccessFileReader* file,
+Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file,
FilePrefetchBuffer* prefetch_buffer,
uint64_t file_size, Footer* footer,
uint64_t enforce_table_magic_number) {
file->file_name());
}
- char footer_space[Footer::kMaxEncodedLength];
+ std::string footer_buf;
+ AlignedBuf internal_buf;
Slice footer_input;
size_t read_offset =
(file_size > Footer::kMaxEncodedLength)
? static_cast<size_t>(file_size - Footer::kMaxEncodedLength)
: 0;
Status s;
+ // TODO: Need to pass appropriate deadline to TryReadFromCache(). Right now,
+ // there is no readahead for point lookups, so TryReadFromCache will fail if
+ // the required data is not in the prefetch buffer. Once deadline is enabled
+ // for iterator, TryReadFromCache might do a readahead. Revisit to see if we
+ // need to pass a timeout at that point
if (prefetch_buffer == nullptr ||
- !prefetch_buffer->TryReadFromCache(read_offset, Footer::kMaxEncodedLength,
- &footer_input)) {
- s = file->Read(read_offset, Footer::kMaxEncodedLength, &footer_input,
- footer_space);
+ !prefetch_buffer->TryReadFromCache(
+ IOOptions(), read_offset, Footer::kMaxEncodedLength, &footer_input)) {
+ if (file->use_direct_io()) {
+ s = file->Read(opts, read_offset, Footer::kMaxEncodedLength,
+ &footer_input, nullptr, &internal_buf);
+ } else {
+ footer_buf.reserve(Footer::kMaxEncodedLength);
+ s = file->Read(opts, read_offset, Footer::kMaxEncodedLength,
+ &footer_input, &footer_buf[0], nullptr);
+ }
if (!s.ok()) return s;
}
const UncompressionInfo& uncompression_info, const char* data, size_t n,
BlockContents* contents, uint32_t format_version,
const ImmutableCFOptions& ioptions, MemoryAllocator* allocator) {
- CacheAllocationPtr ubuf;
+ Status ret = Status::OK();
assert(uncompression_info.type() != kNoCompression &&
"Invalid compression type");
StopWatchNano timer(ioptions.env, ShouldReportDetailedTime(
ioptions.env, ioptions.statistics));
- int decompress_size = 0;
- switch (uncompression_info.type()) {
- case kSnappyCompression: {
- size_t ulength = 0;
- static char snappy_corrupt_msg[] =
- "Snappy not supported or corrupted Snappy compressed block contents";
- if (!Snappy_GetUncompressedLength(data, n, &ulength)) {
- return Status::Corruption(snappy_corrupt_msg);
- }
- ubuf = AllocateBlock(ulength, allocator);
- if (!Snappy_Uncompress(data, n, ubuf.get())) {
- return Status::Corruption(snappy_corrupt_msg);
- }
- *contents = BlockContents(std::move(ubuf), ulength);
- break;
- }
- case kZlibCompression:
- ubuf = Zlib_Uncompress(
- uncompression_info, data, n, &decompress_size,
- GetCompressFormatForVersion(kZlibCompression, format_version),
- allocator);
- if (!ubuf) {
- static char zlib_corrupt_msg[] =
- "Zlib not supported or corrupted Zlib compressed block contents";
- return Status::Corruption(zlib_corrupt_msg);
- }
- *contents = BlockContents(std::move(ubuf), decompress_size);
- break;
- case kBZip2Compression:
- ubuf = BZip2_Uncompress(
- data, n, &decompress_size,
- GetCompressFormatForVersion(kBZip2Compression, format_version),
- allocator);
- if (!ubuf) {
- static char bzip2_corrupt_msg[] =
- "Bzip2 not supported or corrupted Bzip2 compressed block contents";
- return Status::Corruption(bzip2_corrupt_msg);
- }
- *contents = BlockContents(std::move(ubuf), decompress_size);
- break;
- case kLZ4Compression:
- ubuf = LZ4_Uncompress(
- uncompression_info, data, n, &decompress_size,
- GetCompressFormatForVersion(kLZ4Compression, format_version),
- allocator);
- if (!ubuf) {
- static char lz4_corrupt_msg[] =
- "LZ4 not supported or corrupted LZ4 compressed block contents";
- return Status::Corruption(lz4_corrupt_msg);
- }
- *contents = BlockContents(std::move(ubuf), decompress_size);
- break;
- case kLZ4HCCompression:
- ubuf = LZ4_Uncompress(
- uncompression_info, data, n, &decompress_size,
- GetCompressFormatForVersion(kLZ4HCCompression, format_version),
- allocator);
- if (!ubuf) {
- static char lz4hc_corrupt_msg[] =
- "LZ4HC not supported or corrupted LZ4HC compressed block contents";
- return Status::Corruption(lz4hc_corrupt_msg);
- }
- *contents = BlockContents(std::move(ubuf), decompress_size);
- break;
- case kXpressCompression:
- // XPRESS allocates memory internally, thus no support for custom
- // allocator.
- ubuf.reset(XPRESS_Uncompress(data, n, &decompress_size));
- if (!ubuf) {
- static char xpress_corrupt_msg[] =
- "XPRESS not supported or corrupted XPRESS compressed block "
- "contents";
- return Status::Corruption(xpress_corrupt_msg);
- }
- *contents = BlockContents(std::move(ubuf), decompress_size);
- break;
- case kZSTD:
- case kZSTDNotFinalCompression:
- ubuf = ZSTD_Uncompress(uncompression_info, data, n, &decompress_size,
- allocator);
- if (!ubuf) {
- static char zstd_corrupt_msg[] =
- "ZSTD not supported or corrupted ZSTD compressed block contents";
- return Status::Corruption(zstd_corrupt_msg);
- }
- *contents = BlockContents(std::move(ubuf), decompress_size);
- break;
- default:
- return Status::Corruption("bad block type");
+ size_t uncompressed_size = 0;
+ CacheAllocationPtr ubuf =
+ UncompressData(uncompression_info, data, n, &uncompressed_size,
+ GetCompressFormatForVersion(format_version), allocator);
+ if (!ubuf) {
+ return Status::Corruption(
+ "Unsupported compression method or corrupted compressed block contents",
+ CompressionTypeToString(uncompression_info.type()));
}
+ *contents = BlockContents(std::move(ubuf), uncompressed_size);
+
if (ShouldReportDetailedTime(ioptions.env, ioptions.statistics)) {
RecordTimeToHistogram(ioptions.statistics, DECOMPRESSION_TIMES_NANOS,
timer.ElapsedNanos());
contents->data.size());
RecordTick(ioptions.statistics, NUMBER_BLOCK_DECOMPRESSED);
- return Status::OK();
+ TEST_SYNC_POINT_CALLBACK(
+ "UncompressBlockContentsForCompressionType:TamperWithReturnValue",
+ static_cast<void*>(&ret));
+ TEST_SYNC_POINT_CALLBACK(
+ "UncompressBlockContentsForCompressionType:"
+ "TamperWithDecompressionOutput",
+ static_cast<void*>(contents));
+
+ return ret;
}
//
const ImmutableCFOptions& ioptions,
MemoryAllocator* allocator) {
assert(data[n] != kNoCompression);
- assert(data[n] == uncompression_info.type());
+ assert(data[n] == static_cast<char>(uncompression_info.type()));
return UncompressBlockContentsForCompressionType(uncompression_info, data, n,
contents, format_version,
ioptions, allocator);
}
+// Replace the contents of db_host_id with the actual hostname, if db_host_id
+// matches the keyword kHostnameForDbHostId
+Status ReifyDbHostIdProperty(Env* env, std::string* db_host_id) {
+ assert(db_host_id);
+ if (*db_host_id == kHostnameForDbHostId) {
+ Status s = env->GetHostNameString(db_host_id);
+ if (!s.ok()) {
+ db_host_id->clear();
+ }
+ return s;
+ }
+
+ return Status::OK();
+}
} // namespace ROCKSDB_NAMESPACE