]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/log_reader.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / log_reader.cc
index 84082e8ea8a8bae1b2ca98521d455d8f2f7ea585..a21868776141f6ec780c62bb56e55f8515839fff 100644 (file)
@@ -10,6 +10,7 @@
 #include "db/log_reader.h"
 
 #include <stdio.h>
+
 #include "file/sequence_file_reader.h"
 #include "port/lang.h"
 #include "rocksdb/env.h"
@@ -20,8 +21,7 @@
 namespace ROCKSDB_NAMESPACE {
 namespace log {
 
-Reader::Reporter::~Reporter() {
-}
+Reader::Reporter::~Reporter() {}
 
 Reader::Reader(std::shared_ptr<Logger> info_log,
                std::unique_ptr<SequentialFileReader>&& _file,
@@ -38,10 +38,25 @@ Reader::Reader(std::shared_ptr<Logger> info_log,
       last_record_offset_(0),
       end_of_buffer_offset_(0),
       log_number_(log_num),
-      recycled_(false) {}
+      recycled_(false),
+      first_record_read_(false),
+      compression_type_(kNoCompression),
+      compression_type_record_read_(false),
+      uncompress_(nullptr),
+      hash_state_(nullptr),
+      uncompress_hash_state_(nullptr){};
 
 Reader::~Reader() {
   delete[] backing_store_;
+  if (uncompress_) {
+    delete uncompress_;
+  }
+  if (hash_state_) {
+    XXH3_freeState(hash_state_);
+  }
+  if (uncompress_hash_state_) {
+    XXH3_freeState(uncompress_hash_state_);
+  }
 }
 
 // For kAbsoluteConsistency, on clean shutdown we don't expect any error
@@ -52,9 +67,19 @@ Reader::~Reader() {
 // TODO krad: Evaluate if we need to move to a more strict mode where we
 // restrict the inconsistency to only the last log
 bool Reader::ReadRecord(Slice* record, std::string* scratch,
-                        WALRecoveryMode wal_recovery_mode) {
+                        WALRecoveryMode wal_recovery_mode,
+                        uint64_t* record_checksum) {
   scratch->clear();
   record->clear();
+  if (record_checksum != nullptr) {
+    if (hash_state_ == nullptr) {
+      hash_state_ = XXH3_createState();
+    }
+    XXH3_64bits_reset(hash_state_);
+  }
+  if (uncompress_) {
+    uncompress_->Reset();
+  }
   bool in_fragmented_record = false;
   // Record offset of the logical record that we're reading
   // 0 is a dummy value to make compilers happy
@@ -64,7 +89,8 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
   while (true) {
     uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
     size_t drop_size = 0;
-    const unsigned int record_type = ReadPhysicalRecord(&fragment, &drop_size);
+    const unsigned int record_type =
+        ReadPhysicalRecord(&fragment, &drop_size, record_checksum);
     switch (record_type) {
       case kFullType:
       case kRecyclableFullType:
@@ -75,10 +101,18 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
           // at the beginning of the next block.
           ReportCorruption(scratch->size(), "partial record without end(1)");
         }
+        // No need to compute record_checksum since the record
+        // consists of a single fragment and the checksum is computed
+        // in ReadPhysicalRecord() if WAL compression is enabled
+        if (record_checksum != nullptr && uncompress_ == nullptr) {
+          // No need to stream since the record is a single fragment
+          *record_checksum = XXH3_64bits(fragment.data(), fragment.size());
+        }
         prospective_record_offset = physical_record_offset;
         scratch->clear();
         *record = fragment;
         last_record_offset_ = prospective_record_offset;
+        first_record_read_ = true;
         return true;
 
       case kFirstType:
@@ -89,6 +123,10 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
           // of a block followed by a kFullType or kFirstType record
           // at the beginning of the next block.
           ReportCorruption(scratch->size(), "partial record without end(2)");
+          XXH3_64bits_reset(hash_state_);
+        }
+        if (record_checksum != nullptr) {
+          XXH3_64bits_update(hash_state_, fragment.data(), fragment.size());
         }
         prospective_record_offset = physical_record_offset;
         scratch->assign(fragment.data(), fragment.size());
@@ -101,6 +139,9 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
           ReportCorruption(fragment.size(),
                            "missing start of fragmented record(1)");
         } else {
+          if (record_checksum != nullptr) {
+            XXH3_64bits_update(hash_state_, fragment.data(), fragment.size());
+          }
           scratch->append(fragment.data(), fragment.size());
         }
         break;
@@ -111,9 +152,14 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
           ReportCorruption(fragment.size(),
                            "missing start of fragmented record(2)");
         } else {
+          if (record_checksum != nullptr) {
+            XXH3_64bits_update(hash_state_, fragment.data(), fragment.size());
+            *record_checksum = XXH3_64bits_digest(hash_state_);
+          }
           scratch->append(fragment.data(), fragment.size());
           *record = Slice(*scratch);
           last_record_offset_ = prospective_record_offset;
+          first_record_read_ = true;
           return true;
         }
         break;
@@ -194,9 +240,8 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
         FALLTHROUGH_INTENDED;
 
       case kBadRecordChecksum:
-        if (recycled_ &&
-            wal_recovery_mode ==
-                WALRecoveryMode::kTolerateCorruptedTailRecords) {
+        if (recycled_ && wal_recovery_mode ==
+                             WALRecoveryMode::kTolerateCorruptedTailRecords) {
           scratch->clear();
           return false;
         }
@@ -212,6 +257,29 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
         }
         break;
 
+      case kSetCompressionType: {
+        if (compression_type_record_read_) {
+          ReportCorruption(fragment.size(),
+                           "read multiple SetCompressionType records");
+        }
+        if (first_record_read_) {
+          ReportCorruption(fragment.size(),
+                           "SetCompressionType not the first record");
+        }
+        prospective_record_offset = physical_record_offset;
+        scratch->clear();
+        last_record_offset_ = prospective_record_offset;
+        CompressionTypeRecord compression_record(kNoCompression);
+        Status s = compression_record.DecodeFrom(&fragment);
+        if (!s.ok()) {
+          ReportCorruption(fragment.size(),
+                           "could not decode SetCompressionType record");
+        } else {
+          InitCompression(compression_record);
+        }
+        break;
+      }
+
       default: {
         char buf[40];
         snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
@@ -227,9 +295,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
   return false;
 }
 
-uint64_t Reader::LastRecordOffset() {
-  return last_record_offset_;
-}
+uint64_t Reader::LastRecordOffset() { return last_record_offset_; }
 
 uint64_t Reader::LastRecordEnd() {
   return end_of_buffer_offset_ - buffer_.size();
@@ -267,8 +333,14 @@ void Reader::UnmarkEOFInternal() {
   }
 
   Slice read_buffer;
-  Status status = file_->Read(remaining, &read_buffer,
-    backing_store_ + eof_offset_);
+  // TODO: rate limit log reader with approriate priority.
+  // TODO: avoid overcharging rate limiter:
+  // Note that the Read here might overcharge SequentialFileReader's internal
+  // rate limiter if priority is not IO_TOTAL, e.g., when there is not enough
+  // content left until EOF to read.
+  Status status =
+      file_->Read(remaining, &read_buffer, backing_store_ + eof_offset_,
+                  Env::IO_TOTAL /* rate_limiter_priority */);
 
   size_t added = read_buffer.size();
   end_of_buffer_offset_ += added;
@@ -285,11 +357,11 @@ void Reader::UnmarkEOFInternal() {
   if (read_buffer.data() != backing_store_ + eof_offset_) {
     // Read did not write to backing_store_
     memmove(backing_store_ + eof_offset_, read_buffer.data(),
-      read_buffer.size());
+            read_buffer.size());
   }
 
   buffer_ = Slice(backing_store_ + consumed_bytes,
-    eof_offset_ + added - consumed_bytes);
+                  eof_offset_ + added - consumed_bytes);
 
   if (added < remaining) {
     eof_ = true;
@@ -309,11 +381,17 @@ void Reader::ReportDrop(size_t bytes, const Status& reason) {
   }
 }
 
-bool Reader::ReadMore(size_t* drop_size, int *error) {
+bool Reader::ReadMore(size_t* drop_size, interror) {
   if (!eof_ && !read_error_) {
     // Last read was a full read, so this is a trailer to skip
     buffer_.clear();
-    Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
+    // TODO: rate limit log reader with approriate priority.
+    // TODO: avoid overcharging rate limiter:
+    // Note that the Read here might overcharge SequentialFileReader's internal
+    // rate limiter if priority is not IO_TOTAL, e.g., when there is not enough
+    // content left until EOF to read.
+    Status status = file_->Read(kBlockSize, &buffer_, backing_store_,
+                                Env::IO_TOTAL /* rate_limiter_priority */);
     TEST_SYNC_POINT_CALLBACK("LogReader::ReadMore:AfterReadFile", &status);
     end_of_buffer_offset_ += buffer_.size();
     if (!status.ok()) {
@@ -344,7 +422,8 @@ bool Reader::ReadMore(size_t* drop_size, int *error) {
   }
 }
 
-unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
+unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size,
+                                        uint64_t* fragment_checksum) {
   while (true) {
     // We need at least the minimum header size
     if (buffer_.size() < static_cast<size_t>(kHeaderSize)) {
@@ -421,17 +500,82 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
 
     buffer_.remove_prefix(header_size + length);
 
-    *result = Slice(header + header_size, length);
-    return type;
+    if (!uncompress_ || type == kSetCompressionType) {
+      *result = Slice(header + header_size, length);
+      return type;
+    } else {
+      // Uncompress compressed records
+      uncompressed_record_.clear();
+      if (fragment_checksum != nullptr) {
+        if (uncompress_hash_state_ == nullptr) {
+          uncompress_hash_state_ = XXH3_createState();
+        }
+        XXH3_64bits_reset(uncompress_hash_state_);
+      }
+
+      size_t uncompressed_size = 0;
+      int remaining = 0;
+      do {
+        remaining = uncompress_->Uncompress(header + header_size, length,
+                                            uncompressed_buffer_.get(),
+                                            &uncompressed_size);
+        if (remaining < 0) {
+          buffer_.clear();
+          return kBadRecord;
+        }
+        if (uncompressed_size > 0) {
+          if (fragment_checksum != nullptr) {
+            XXH3_64bits_update(uncompress_hash_state_,
+                               uncompressed_buffer_.get(), uncompressed_size);
+          }
+          uncompressed_record_.append(uncompressed_buffer_.get(),
+                                      uncompressed_size);
+        }
+      } while (remaining > 0 || uncompressed_size == kBlockSize);
+
+      if (fragment_checksum != nullptr) {
+        // We can remove this check by updating hash_state_ directly,
+        // but that requires resetting hash_state_ for full and first types
+        // for edge cases like consecutive fist type records.
+        // Leaving the check as is since it is cleaner and can revert to the
+        // above approach if it causes performance impact.
+        *fragment_checksum = XXH3_64bits_digest(uncompress_hash_state_);
+        uint64_t actual_checksum = XXH3_64bits(uncompressed_record_.data(),
+                                               uncompressed_record_.size());
+        if (*fragment_checksum != actual_checksum) {
+          // uncompressed_record_ contains bad content that does not match
+          // actual decompressed content
+          return kBadRecord;
+        }
+      }
+      *result = Slice(uncompressed_record_);
+      return type;
+    }
   }
 }
 
+// Initialize uncompress related fields
+void Reader::InitCompression(const CompressionTypeRecord& compression_record) {
+  compression_type_ = compression_record.GetCompressionType();
+  compression_type_record_read_ = true;
+  constexpr uint32_t compression_format_version = 2;
+  uncompress_ = StreamingUncompress::Create(
+      compression_type_, compression_format_version, kBlockSize);
+  assert(uncompress_ != nullptr);
+  uncompressed_buffer_ = std::unique_ptr<char[]>(new char[kBlockSize]);
+  assert(uncompressed_buffer_);
+}
+
 bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch,
-                                        WALRecoveryMode /*unused*/) {
+                                        WALRecoveryMode /*unused*/,
+                                        uint64_t* /* checksum */) {
   assert(record != nullptr);
   assert(scratch != nullptr);
   record->clear();
   scratch->clear();
+  if (uncompress_) {
+    uncompress_->Reset();
+  }
 
   uint64_t prospective_record_offset = 0;
   uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
@@ -449,6 +593,7 @@ bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch,
         *record = fragment;
         prospective_record_offset = physical_record_offset;
         last_record_offset_ = prospective_record_offset;
+        first_record_read_ = true;
         in_fragmented_record_ = false;
         return true;
 
@@ -483,6 +628,7 @@ bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch,
           fragments_.clear();
           *record = Slice(*scratch);
           last_record_offset_ = prospective_record_offset;
+          first_record_read_ = true;
           in_fragmented_record_ = false;
           return true;
         }
@@ -512,6 +658,30 @@ bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch,
         }
         break;
 
+      case kSetCompressionType: {
+        if (compression_type_record_read_) {
+          ReportCorruption(fragment.size(),
+                           "read multiple SetCompressionType records");
+        }
+        if (first_record_read_) {
+          ReportCorruption(fragment.size(),
+                           "SetCompressionType not the first record");
+        }
+        fragments_.clear();
+        prospective_record_offset = physical_record_offset;
+        last_record_offset_ = prospective_record_offset;
+        in_fragmented_record_ = false;
+        CompressionTypeRecord compression_record(kNoCompression);
+        Status s = compression_record.DecodeFrom(&fragment);
+        if (!s.ok()) {
+          ReportCorruption(fragment.size(),
+                           "could not decode SetCompressionType record");
+        } else {
+          InitCompression(compression_record);
+        }
+        break;
+      }
+
       default: {
         char buf[40];
         snprintf(buf, sizeof(buf), "unknown record type %u",
@@ -540,7 +710,13 @@ bool FragmentBufferedReader::TryReadMore(size_t* drop_size, int* error) {
   if (!eof_ && !read_error_) {
     // Last read was a full read, so this is a trailer to skip
     buffer_.clear();
-    Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
+    // TODO: rate limit log reader with approriate priority.
+    // TODO: avoid overcharging rate limiter:
+    // Note that the Read here might overcharge SequentialFileReader's internal
+    // rate limiter if priority is not IO_TOTAL, e.g., when there is not enough
+    // content left until EOF to read.
+    Status status = file_->Read(kBlockSize, &buffer_, backing_store_,
+                                Env::IO_TOTAL /* rate_limiter_priority */);
     end_of_buffer_offset_ += buffer_.size();
     if (!status.ok()) {
       buffer_.clear();
@@ -645,9 +821,33 @@ bool FragmentBufferedReader::TryReadFragment(
 
   buffer_.remove_prefix(header_size + length);
 
-  *fragment = Slice(header + header_size, length);
-  *fragment_type_or_err = type;
-  return true;
+  if (!uncompress_ || type == kSetCompressionType) {
+    *fragment = Slice(header + header_size, length);
+    *fragment_type_or_err = type;
+    return true;
+  } else {
+    // Uncompress compressed records
+    uncompressed_record_.clear();
+    size_t uncompressed_size = 0;
+    int remaining = 0;
+    do {
+      remaining = uncompress_->Uncompress(header + header_size, length,
+                                          uncompressed_buffer_.get(),
+                                          &uncompressed_size);
+      if (remaining < 0) {
+        buffer_.clear();
+        *fragment_type_or_err = kBadRecord;
+        return true;
+      }
+      if (uncompressed_size > 0) {
+        uncompressed_record_.append(uncompressed_buffer_.get(),
+                                    uncompressed_size);
+      }
+    } while (remaining > 0 || uncompressed_size == kBlockSize);
+    *fragment = Slice(std::move(uncompressed_record_));
+    *fragment_type_or_err = type;
+    return true;
+  }
 }
 
 }  // namespace log