]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/compaction.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / compaction.cc
index 4ea92d5cc78ba3388ac395f9586ff8963487c61c..f8805376f1dd7198bbe2eea8e5abd05a50af5dbf 100644 (file)
 
 namespace rocksdb {
 
+const uint64_t kRangeTombstoneSentinel =
+    PackSequenceAndType(kMaxSequenceNumber, kTypeRangeDeletion);
+
+int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a,
+                      const InternalKey& b) {
+  auto c = user_cmp->Compare(a.user_key(), b.user_key());
+  if (c != 0) {
+    return c;
+  }
+  auto a_footer = ExtractInternalKeyFooter(a.Encode());
+  auto b_footer = ExtractInternalKeyFooter(b.Encode());
+  if (a_footer == kRangeTombstoneSentinel) {
+    if (b_footer != kRangeTombstoneSentinel) {
+      return -1;
+    }
+  } else if (b_footer == kRangeTombstoneSentinel) {
+    return 1;
+  }
+  return 0;
+}
+
+int sstableKeyCompare(const Comparator* user_cmp, const InternalKey* a,
+                      const InternalKey& b) {
+  if (a == nullptr) {
+    return -1;
+  }
+  return sstableKeyCompare(user_cmp, *a, b);
+}
+
+int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a,
+                      const InternalKey* b) {
+  if (b == nullptr) {
+    return -1;
+  }
+  return sstableKeyCompare(user_cmp, a, *b);
+}
+
 uint64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
   uint64_t sum = 0;
   for (size_t i = 0; i < files.size() && files[i]; i++) {
@@ -81,6 +118,49 @@ void Compaction::GetBoundaryKeys(
   }
 }
 
+std::vector<CompactionInputFiles> Compaction::PopulateWithAtomicBoundaries(
+    VersionStorageInfo* vstorage, std::vector<CompactionInputFiles> inputs) {
+  const Comparator* ucmp = vstorage->InternalComparator()->user_comparator();
+  for (size_t i = 0; i < inputs.size(); i++) {
+    if (inputs[i].level == 0 || inputs[i].files.empty()) {
+      continue;
+    }
+    inputs[i].atomic_compaction_unit_boundaries.reserve(inputs[i].files.size());
+    AtomicCompactionUnitBoundary cur_boundary;
+    size_t first_atomic_idx = 0;
+    auto add_unit_boundary = [&](size_t to) {
+      if (first_atomic_idx == to) return;
+      for (size_t k = first_atomic_idx; k < to; k++) {
+        inputs[i].atomic_compaction_unit_boundaries.push_back(cur_boundary);
+      }
+      first_atomic_idx = to;
+    };
+    for (size_t j = 0; j < inputs[i].files.size(); j++) {
+      const auto* f = inputs[i].files[j];
+      if (j == 0) {
+        // First file in a level.
+        cur_boundary.smallest = &f->smallest;
+        cur_boundary.largest = &f->largest;
+      } else if (sstableKeyCompare(ucmp, *cur_boundary.largest, f->smallest) ==
+                 0) {
+        // SSTs overlap but the end key of the previous file was not
+        // artificially extended by a range tombstone. Extend the current
+        // boundary.
+        cur_boundary.largest = &f->largest;
+      } else {
+        // Atomic compaction unit has ended.
+        add_unit_boundary(j);
+        cur_boundary.smallest = &f->smallest;
+        cur_boundary.largest = &f->largest;
+      }
+    }
+    add_unit_boundary(inputs[i].files.size());
+    assert(inputs[i].files.size() ==
+           inputs[i].atomic_compaction_unit_boundaries.size());
+  }
+  return inputs;
+}
+
 // helper function to determine if compaction is creating files at the
 // bottommost level
 bool Compaction::IsBottommostLevel(
@@ -155,7 +235,7 @@ Compaction::Compaction(VersionStorageInfo* vstorage,
       output_compression_(_compression),
       output_compression_opts_(_compression_opts),
       deletion_compaction_(_deletion_compaction),
-      inputs_(std::move(_inputs)),
+      inputs_(PopulateWithAtomicBoundaries(vstorage, std::move(_inputs))),
       grandparents_(std::move(_grandparents)),
       score_(_score),
       bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)),
@@ -170,6 +250,12 @@ Compaction::Compaction(VersionStorageInfo* vstorage,
   if (max_subcompactions_ == 0) {
     max_subcompactions_ = immutable_cf_options_.max_subcompactions;
   }
+  if (!bottommost_level_) {
+    // Currently we only enable dictionary compression during compaction to the
+    // bottommost level.
+    output_compression_opts_.max_dict_bytes = 0;
+    output_compression_opts_.zstd_max_train_bytes = 0;
+  }
 
 #ifndef NDEBUG
   for (size_t i = 1; i < inputs_.size(); ++i) {
@@ -293,8 +379,8 @@ bool Compaction::KeyNotExistsBeyondOutputLevel(
         if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) {
           // We've advanced far enough
           if (user_cmp->Compare(user_key, f->smallest.user_key()) >= 0) {
-            // Key falls in this file's range, so definitely
-            // exists beyond output level
+            // Key falls in this file's range, so it may
+            // exist beyond output level
             return false;
           }
           break;
@@ -331,12 +417,14 @@ const char* Compaction::InputLevelSummary(
     if (!is_first) {
       len +=
           snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, " + ");
+      len = std::min(len, static_cast<int>(sizeof(scratch->buffer)));
     } else {
       is_first = false;
     }
     len += snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
                     "%" ROCKSDB_PRIszt "@%d", input_level.size(),
                     input_level.level);
+    len = std::min(len, static_cast<int>(sizeof(scratch->buffer)));
   }
   snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
            " files to L%d", output_level());