]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/table/format.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / table / format.cc
index ee3766eb8a889960fe7832ed8efda5b8482d1af8..8fb66f8a1c8fd34bc16881a957bf27cef937ca65 100644 (file)
 
 #include "block_fetcher.h"
 #include "file/random_access_file_reader.h"
-#include "logging/logging.h"
 #include "memory/memory_allocator.h"
 #include "monitoring/perf_context_imp.h"
 #include "monitoring/statistics.h"
 #include "rocksdb/env.h"
+#include "rocksdb/options.h"
 #include "table/block_based/block.h"
 #include "table/block_based/block_based_table_reader.h"
 #include "table/persistent_cache_helper.h"
@@ -41,6 +41,7 @@ extern const uint64_t kPlainTableMagicNumber;
 const uint64_t kLegacyPlainTableMagicNumber = 0;
 const uint64_t kPlainTableMagicNumber = 0;
 #endif
+const char* kHostnameForDbHostId = "__hostname__";
 
 bool ShouldReportDetailedTime(Env* env, Statistics* stats) {
   return env != nullptr && stats != nullptr &&
@@ -281,7 +282,7 @@ std::string Footer::ToString() const {
   return result;
 }
 
-Status ReadFooterFromFile(RandomAccessFileReader* file,
+Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file,
                           FilePrefetchBuffer* prefetch_buffer,
                           uint64_t file_size, Footer* footer,
                           uint64_t enforce_table_magic_number) {
@@ -292,18 +293,30 @@ Status ReadFooterFromFile(RandomAccessFileReader* file,
                               file->file_name());
   }
 
-  char footer_space[Footer::kMaxEncodedLength];
+  std::string footer_buf;
+  AlignedBuf internal_buf;
   Slice footer_input;
   size_t read_offset =
       (file_size > Footer::kMaxEncodedLength)
           ? static_cast<size_t>(file_size - Footer::kMaxEncodedLength)
           : 0;
   Status s;
+  // TODO: Need to pass appropriate deadline to TryReadFromCache(). Right now,
+  // there is no readahead for point lookups, so TryReadFromCache will fail if
+  // the required data is not in the prefetch buffer. Once deadline is enabled
+  // for iterator, TryReadFromCache might do a readahead. Revisit to see if we
+  // need to pass a timeout at that point
   if (prefetch_buffer == nullptr ||
-      !prefetch_buffer->TryReadFromCache(read_offset, Footer::kMaxEncodedLength,
-                                         &footer_input)) {
-    s = file->Read(read_offset, Footer::kMaxEncodedLength, &footer_input,
-                   footer_space);
+      !prefetch_buffer->TryReadFromCache(
+          IOOptions(), read_offset, Footer::kMaxEncodedLength, &footer_input)) {
+    if (file->use_direct_io()) {
+      s = file->Read(opts, read_offset, Footer::kMaxEncodedLength,
+                     &footer_input, nullptr, &internal_buf);
+    } else {
+      footer_buf.reserve(Footer::kMaxEncodedLength);
+      s = file->Read(opts, read_offset, Footer::kMaxEncodedLength,
+                     &footer_input, &footer_buf[0], nullptr);
+    }
     if (!s.ok()) return s;
   }
 
@@ -334,104 +347,25 @@ Status UncompressBlockContentsForCompressionType(
     const UncompressionInfo& uncompression_info, const char* data, size_t n,
     BlockContents* contents, uint32_t format_version,
     const ImmutableCFOptions& ioptions, MemoryAllocator* allocator) {
-  CacheAllocationPtr ubuf;
+  Status ret = Status::OK();
 
   assert(uncompression_info.type() != kNoCompression &&
          "Invalid compression type");
 
   StopWatchNano timer(ioptions.env, ShouldReportDetailedTime(
                                         ioptions.env, ioptions.statistics));
-  int decompress_size = 0;
-  switch (uncompression_info.type()) {
-    case kSnappyCompression: {
-      size_t ulength = 0;
-      static char snappy_corrupt_msg[] =
-          "Snappy not supported or corrupted Snappy compressed block contents";
-      if (!Snappy_GetUncompressedLength(data, n, &ulength)) {
-        return Status::Corruption(snappy_corrupt_msg);
-      }
-      ubuf = AllocateBlock(ulength, allocator);
-      if (!Snappy_Uncompress(data, n, ubuf.get())) {
-        return Status::Corruption(snappy_corrupt_msg);
-      }
-      *contents = BlockContents(std::move(ubuf), ulength);
-      break;
-    }
-    case kZlibCompression:
-      ubuf = Zlib_Uncompress(
-          uncompression_info, data, n, &decompress_size,
-          GetCompressFormatForVersion(kZlibCompression, format_version),
-          allocator);
-      if (!ubuf) {
-        static char zlib_corrupt_msg[] =
-            "Zlib not supported or corrupted Zlib compressed block contents";
-        return Status::Corruption(zlib_corrupt_msg);
-      }
-      *contents = BlockContents(std::move(ubuf), decompress_size);
-      break;
-    case kBZip2Compression:
-      ubuf = BZip2_Uncompress(
-          data, n, &decompress_size,
-          GetCompressFormatForVersion(kBZip2Compression, format_version),
-          allocator);
-      if (!ubuf) {
-        static char bzip2_corrupt_msg[] =
-            "Bzip2 not supported or corrupted Bzip2 compressed block contents";
-        return Status::Corruption(bzip2_corrupt_msg);
-      }
-      *contents = BlockContents(std::move(ubuf), decompress_size);
-      break;
-    case kLZ4Compression:
-      ubuf = LZ4_Uncompress(
-          uncompression_info, data, n, &decompress_size,
-          GetCompressFormatForVersion(kLZ4Compression, format_version),
-          allocator);
-      if (!ubuf) {
-        static char lz4_corrupt_msg[] =
-            "LZ4 not supported or corrupted LZ4 compressed block contents";
-        return Status::Corruption(lz4_corrupt_msg);
-      }
-      *contents = BlockContents(std::move(ubuf), decompress_size);
-      break;
-    case kLZ4HCCompression:
-      ubuf = LZ4_Uncompress(
-          uncompression_info, data, n, &decompress_size,
-          GetCompressFormatForVersion(kLZ4HCCompression, format_version),
-          allocator);
-      if (!ubuf) {
-        static char lz4hc_corrupt_msg[] =
-            "LZ4HC not supported or corrupted LZ4HC compressed block contents";
-        return Status::Corruption(lz4hc_corrupt_msg);
-      }
-      *contents = BlockContents(std::move(ubuf), decompress_size);
-      break;
-    case kXpressCompression:
-      // XPRESS allocates memory internally, thus no support for custom
-      // allocator.
-      ubuf.reset(XPRESS_Uncompress(data, n, &decompress_size));
-      if (!ubuf) {
-        static char xpress_corrupt_msg[] =
-            "XPRESS not supported or corrupted XPRESS compressed block "
-            "contents";
-        return Status::Corruption(xpress_corrupt_msg);
-      }
-      *contents = BlockContents(std::move(ubuf), decompress_size);
-      break;
-    case kZSTD:
-    case kZSTDNotFinalCompression:
-      ubuf = ZSTD_Uncompress(uncompression_info, data, n, &decompress_size,
-                             allocator);
-      if (!ubuf) {
-        static char zstd_corrupt_msg[] =
-            "ZSTD not supported or corrupted ZSTD compressed block contents";
-        return Status::Corruption(zstd_corrupt_msg);
-      }
-      *contents = BlockContents(std::move(ubuf), decompress_size);
-      break;
-    default:
-      return Status::Corruption("bad block type");
+  size_t uncompressed_size = 0;
+  CacheAllocationPtr ubuf =
+      UncompressData(uncompression_info, data, n, &uncompressed_size,
+                     GetCompressFormatForVersion(format_version), allocator);
+  if (!ubuf) {
+    return Status::Corruption(
+        "Unsupported compression method or corrupted compressed block contents",
+        CompressionTypeToString(uncompression_info.type()));
   }
 
+  *contents = BlockContents(std::move(ubuf), uncompressed_size);
+
   if (ShouldReportDetailedTime(ioptions.env, ioptions.statistics)) {
     RecordTimeToHistogram(ioptions.statistics, DECOMPRESSION_TIMES_NANOS,
                           timer.ElapsedNanos());
@@ -440,7 +374,15 @@ Status UncompressBlockContentsForCompressionType(
                         contents->data.size());
   RecordTick(ioptions.statistics, NUMBER_BLOCK_DECOMPRESSED);
 
-  return Status::OK();
+  TEST_SYNC_POINT_CALLBACK(
+      "UncompressBlockContentsForCompressionType:TamperWithReturnValue",
+      static_cast<void*>(&ret));
+  TEST_SYNC_POINT_CALLBACK(
+      "UncompressBlockContentsForCompressionType:"
+      "TamperWithDecompressionOutput",
+      static_cast<void*>(contents));
+
+  return ret;
 }
 
 //
@@ -456,10 +398,24 @@ Status UncompressBlockContents(const UncompressionInfo& uncompression_info,
                                const ImmutableCFOptions& ioptions,
                                MemoryAllocator* allocator) {
   assert(data[n] != kNoCompression);
-  assert(data[n] == uncompression_info.type());
+  assert(data[n] == static_cast<char>(uncompression_info.type()));
   return UncompressBlockContentsForCompressionType(uncompression_info, data, n,
                                                    contents, format_version,
                                                    ioptions, allocator);
 }
 
+// Replace the contents of db_host_id with the actual hostname, if db_host_id
+// matches the keyword kHostnameForDbHostId
+Status ReifyDbHostIdProperty(Env* env, std::string* db_host_id) {
+  assert(db_host_id);
+  if (*db_host_id == kHostnameForDbHostId) {
+    Status s = env->GetHostNameString(db_host_id);
+    if (!s.ok()) {
+      db_host_id->clear();
+    }
+    return s;
+  }
+
+  return Status::OK();
+}
 }  // namespace ROCKSDB_NAMESPACE