]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/table/block_fetcher.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / table / block_fetcher.cc
index 9ddee72ccc2cd6988f7888924320d126c571217e..b0880d516a249e9f6d369b804e5d8df40d258b04 100644 (file)
 #include <cinttypes>
 #include <string>
 
+#include "file/file_util.h"
 #include "logging/logging.h"
 #include "memory/memory_allocator.h"
 #include "monitoring/perf_context_imp.h"
 #include "rocksdb/env.h"
 #include "table/block_based/block.h"
 #include "table/block_based/block_based_table_reader.h"
+#include "table/block_based/reader_common.h"
 #include "table/format.h"
 #include "table/persistent_cache_helper.h"
-#include "util/coding.h"
 #include "util/compression.h"
-#include "util/crc32c.h"
 #include "util/stop_watch.h"
-#include "util/string_util.h"
-#include "util/xxhash.h"
 
 namespace ROCKSDB_NAMESPACE {
 
 inline void BlockFetcher::CheckBlockChecksum() {
   // Check the crc of the type and the block contents
   if (read_options_.verify_checksums) {
-    const char* data = slice_.data();  // Pointer to where Read put the data
-    PERF_TIMER_GUARD(block_checksum_time);
-    uint32_t value = DecodeFixed32(data + block_size_ + 1);
-    uint32_t actual = 0;
-    switch (footer_.checksum()) {
-      case kNoChecksum:
-        break;
-      case kCRC32c:
-        value = crc32c::Unmask(value);
-        actual = crc32c::Value(data, block_size_ + 1);
-        break;
-      case kxxHash:
-        actual = XXH32(data, static_cast<int>(block_size_) + 1, 0);
-        break;
-      case kxxHash64:
-        actual = static_cast<uint32_t>(
-            XXH64(data, static_cast<int>(block_size_) + 1, 0) &
-            uint64_t{0xffffffff});
-        break;
-      default:
-        status_ = Status::Corruption(
-            "unknown checksum type " + ToString(footer_.checksum()) + " in " +
-            file_->file_name() + " offset " + ToString(handle_.offset()) +
-            " size " + ToString(block_size_));
-    }
-    if (status_.ok() && actual != value) {
-      status_ = Status::Corruption(
-          "block checksum mismatch: expected " + ToString(actual) + ", got " +
-          ToString(value) + "  in " + file_->file_name() + " offset " +
-          ToString(handle_.offset()) + " size " + ToString(block_size_));
-    }
+    status_ = ROCKSDB_NAMESPACE::VerifyBlockChecksum(
+        footer_.checksum(), slice_.data(), block_size_, file_->file_name(),
+        handle_.offset());
   }
 }
 
@@ -88,18 +58,19 @@ inline bool BlockFetcher::TryGetUncompressBlockFromPersistentCache() {
 }
 
 inline bool BlockFetcher::TryGetFromPrefetchBuffer() {
-  if (prefetch_buffer_ != nullptr &&
-      prefetch_buffer_->TryReadFromCache(
-          handle_.offset(),
-          static_cast<size_t>(handle_.size()) + kBlockTrailerSize, &slice_,
-          for_compaction_)) {
-    block_size_ = static_cast<size_t>(handle_.size());
-    CheckBlockChecksum();
-    if (!status_.ok()) {
-      return true;
+  if (prefetch_buffer_ != nullptr) {
+    IOOptions opts;
+    Status s = PrepareIOFromReadOptions(read_options_, file_->env(), opts);
+    if (s.ok() && prefetch_buffer_->TryReadFromCache(
+                      opts, handle_.offset(), block_size_with_trailer_, &slice_,
+                      for_compaction_)) {
+      CheckBlockChecksum();
+      if (!status_.ok()) {
+        return true;
+      }
+      got_from_prefetch_buffer_ = true;
+      used_buf_ = const_cast<char*>(slice_.data());
     }
-    got_from_prefetch_buffer_ = true;
-    used_buf_ = const_cast<char*>(slice_.data());
   }
   return got_from_prefetch_buffer_;
 }
@@ -110,7 +81,7 @@ inline bool BlockFetcher::TryGetCompressedBlockFromPersistentCache() {
     // lookup uncompressed cache mode p-cache
     std::unique_ptr<char[]> raw_data;
     status_ = PersistentCacheHelper::LookupRawPage(
-        cache_options_, handle_, &raw_data, block_size_ + kBlockTrailerSize);
+        cache_options_, handle_, &raw_data, block_size_with_trailer_);
     if (status_.ok()) {
       heap_buf_ = CacheAllocationPtr(raw_data.release());
       used_buf_ = heap_buf_.get();
@@ -128,18 +99,36 @@ inline bool BlockFetcher::TryGetCompressedBlockFromPersistentCache() {
 
 inline void BlockFetcher::PrepareBufferForBlockFromFile() {
   // cache miss read from device
-  if (do_uncompress_ &&
-      block_size_ + kBlockTrailerSize < kDefaultStackBufferSize) {
+  if ((do_uncompress_ || ioptions_.allow_mmap_reads) &&
+      block_size_with_trailer_ < kDefaultStackBufferSize) {
     // If we've got a small enough hunk of data, read it in to the
     // trivially allocated stack buffer instead of needing a full malloc()
+    //
+    // `GetBlockContents()` cannot return this data as its lifetime is tied to
+    // this `BlockFetcher`'s lifetime. That is fine because this is only used
+    // in cases where we do not expect the `GetBlockContents()` result to be the
+    // same buffer we are assigning here. If we guess incorrectly, there will be
+    // a heap allocation and memcpy in `GetBlockContents()` to obtain the final
+    // result. Considering we are eliding a heap allocation here by using the
+    // stack buffer, the cost of guessing incorrectly here is one extra memcpy.
+    //
+    // When `do_uncompress_` is true, we expect the uncompression step will
+    // allocate heap memory for the final result. However this expectation will
+    // be wrong if the block turns out to already be uncompressed, which we
+    // won't know for sure until after reading it.
+    //
+    // When `ioptions_.allow_mmap_reads` is true, we do not expect the file
+    // reader to use the scratch buffer at all, but instead return a pointer
+    // into the mapped memory. This expectation will be wrong when using a
+    // file reader that does not implement mmap reads properly.
     used_buf_ = &stack_buf_[0];
   } else if (maybe_compressed_ && !do_uncompress_) {
-    compressed_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize,
+    compressed_buf_ = AllocateBlock(block_size_with_trailer_,
                                     memory_allocator_compressed_);
     used_buf_ = compressed_buf_.get();
   } else {
     heap_buf_ =
-        AllocateBlock(block_size_ + kBlockTrailerSize, memory_allocator_);
+        AllocateBlock(block_size_with_trailer_, memory_allocator_);
     used_buf_ = heap_buf_.get();
   }
 }
@@ -150,7 +139,7 @@ inline void BlockFetcher::InsertCompressedBlockToPersistentCacheIfNeeded() {
       cache_options_.persistent_cache->IsCompressed()) {
     // insert to raw cache
     PersistentCacheHelper::InsertRawPage(cache_options_, handle_, used_buf_,
-                                         block_size_ + kBlockTrailerSize);
+                                         block_size_with_trailer_);
   }
 }
 
@@ -164,12 +153,35 @@ inline void BlockFetcher::InsertUncompressedBlockToPersistentCacheIfNeeded() {
   }
 }
 
-inline void BlockFetcher::CopyBufferToHeap() {
+inline void BlockFetcher::CopyBufferToHeapBuf() {
   assert(used_buf_ != heap_buf_.get());
-  heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, memory_allocator_);
-  memcpy(heap_buf_.get(), used_buf_, block_size_ + kBlockTrailerSize);
+  heap_buf_ = AllocateBlock(block_size_with_trailer_, memory_allocator_);
+  memcpy(heap_buf_.get(), used_buf_, block_size_with_trailer_);
+#ifndef NDEBUG
+  num_heap_buf_memcpy_++;
+#endif
+}
+
+inline void BlockFetcher::CopyBufferToCompressedBuf() {
+  assert(used_buf_ != compressed_buf_.get());
+  compressed_buf_ = AllocateBlock(block_size_with_trailer_,
+                                  memory_allocator_compressed_);
+  memcpy(compressed_buf_.get(), used_buf_, block_size_with_trailer_);
+#ifndef NDEBUG
+  num_compressed_buf_memcpy_++;
+#endif
 }
 
+// Entering this method means the block is not compressed or do not need to be
+// uncompressed. The block can be in one of the following buffers:
+// 1. prefetch buffer if prefetch is enabled and the block is prefetched before
+// 2. stack_buf_ if block size is smaller than the stack_buf_ size and block
+//    is not compressed
+// 3. heap_buf_ if the block is not compressed
+// 4. compressed_buf_ if the block is compressed
+// 5. direct_io_buf_ if direct IO is enabled
+// After this method, if the block is compressed, it should be in
+// compressed_buf_, otherwise should be in heap_buf_.
 inline void BlockFetcher::GetBlockContents() {
   if (slice_.data() != used_buf_) {
     // the slice content is not the buffer provided
@@ -178,12 +190,19 @@ inline void BlockFetcher::GetBlockContents() {
     // page can be either uncompressed or compressed, the buffer either stack
     // or heap provided. Refer to https://github.com/facebook/rocksdb/pull/4096
     if (got_from_prefetch_buffer_ || used_buf_ == &stack_buf_[0]) {
-      CopyBufferToHeap();
+      CopyBufferToHeapBuf();
     } else if (used_buf_ == compressed_buf_.get()) {
       if (compression_type_ == kNoCompression &&
           memory_allocator_ != memory_allocator_compressed_) {
-        CopyBufferToHeap();
+        CopyBufferToHeapBuf();
+      } else {
+        heap_buf_ = std::move(compressed_buf_);
+      }
+    } else if (direct_io_buf_.get() != nullptr) {
+      if (compression_type_ == kNoCompression) {
+        CopyBufferToHeapBuf();
       } else {
+        CopyBufferToCompressedBuf();
         heap_buf_ = std::move(compressed_buf_);
       }
     }
@@ -195,8 +214,6 @@ inline void BlockFetcher::GetBlockContents() {
 }
 
 Status BlockFetcher::ReadBlockContents() {
-  block_size_ = static_cast<size_t>(handle_.size());
-
   if (TryGetUncompressBlockFromPersistentCache()) {
     compression_type_ = kNoCompression;
 #ifndef NDEBUG
@@ -209,16 +226,34 @@ Status BlockFetcher::ReadBlockContents() {
       return status_;
     }
   } else if (!TryGetCompressedBlockFromPersistentCache()) {
-    PrepareBufferForBlockFromFile();
-    Status s;
-
-    {
-      PERF_TIMER_GUARD(block_read_time);
-      // Actual file read
-      status_ = file_->Read(handle_.offset(), block_size_ + kBlockTrailerSize,
-                            &slice_, used_buf_, for_compaction_);
+    IOOptions opts;
+    status_ = PrepareIOFromReadOptions(read_options_, file_->env(), opts);
+    // Actual file read
+    if (status_.ok()) {
+      if (file_->use_direct_io()) {
+        PERF_TIMER_GUARD(block_read_time);
+        status_ =
+            file_->Read(opts, handle_.offset(), block_size_with_trailer_,
+                        &slice_, nullptr, &direct_io_buf_, for_compaction_);
+        PERF_COUNTER_ADD(block_read_count, 1);
+        used_buf_ = const_cast<char*>(slice_.data());
+      } else {
+        PrepareBufferForBlockFromFile();
+        PERF_TIMER_GUARD(block_read_time);
+        status_ = file_->Read(opts, handle_.offset(), block_size_with_trailer_,
+                              &slice_, used_buf_, nullptr, for_compaction_);
+        PERF_COUNTER_ADD(block_read_count, 1);
+#ifndef NDEBUG
+        if (slice_.data() == &stack_buf_[0]) {
+          num_stack_buf_memcpy_++;
+        } else if (slice_.data() == heap_buf_.get()) {
+          num_heap_buf_memcpy_++;
+        } else if (slice_.data() == compressed_buf_.get()) {
+          num_compressed_buf_memcpy_++;
+        }
+#endif
+      }
     }
-    PERF_COUNTER_ADD(block_read_count, 1);
 
     // TODO: introduce dedicated perf counter for range tombstones
     switch (block_type_) {
@@ -239,16 +274,16 @@ Status BlockFetcher::ReadBlockContents() {
         break;
     }
 
-    PERF_COUNTER_ADD(block_read_byte, block_size_ + kBlockTrailerSize);
+    PERF_COUNTER_ADD(block_read_byte, block_size_with_trailer_);
     if (!status_.ok()) {
       return status_;
     }
 
-    if (slice_.size() != block_size_ + kBlockTrailerSize) {
+    if (slice_.size() != block_size_with_trailer_) {
       return Status::Corruption("truncated block read from " +
                                 file_->file_name() + " offset " +
                                 ToString(handle_.offset()) + ", expected " +
-                                ToString(block_size_ + kBlockTrailerSize) +
+                                ToString(block_size_with_trailer_) +
                                 " bytes, got " + ToString(slice_.size()));
     }
 
@@ -260,17 +295,19 @@ Status BlockFetcher::ReadBlockContents() {
     }
   }
 
-  PERF_TIMER_GUARD(block_decompress_time);
-
   compression_type_ = get_block_compression_type(slice_.data(), block_size_);
 
   if (do_uncompress_ && compression_type_ != kNoCompression) {
+    PERF_TIMER_GUARD(block_decompress_time);
     // compressed page, uncompress, update cache
     UncompressionContext context(compression_type_);
     UncompressionInfo info(context, uncompression_dict_, compression_type_);
     status_ = UncompressBlockContents(info, slice_.data(), block_size_,
                                       contents_, footer_.version(), ioptions_,
                                       memory_allocator_);
+#ifndef NDEBUG
+    num_heap_buf_memcpy_++;
+#endif
     compression_type_ = kNoCompression;
   } else {
     GetBlockContents();