]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/merge_helper.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / db / merge_helper.cc
index aa5570579e5e6dd572991e176107d54df02d8074..dc6baa9630202f80c10c4fd1d97046b4f0a20782 100644 (file)
@@ -1,29 +1,60 @@
 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
-//  This source code is licensed under the BSD-style license found in the
-//  LICENSE file in the root directory of this source tree. An additional grant
-//  of patent rights can be found in the PATENTS file in the same directory.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
 
 #include "db/merge_helper.h"
 
-#include <stdio.h>
 #include <string>
 
 #include "db/dbformat.h"
 #include "monitoring/perf_context_imp.h"
 #include "monitoring/statistics.h"
+#include "port/likely.h"
 #include "rocksdb/comparator.h"
 #include "rocksdb/db.h"
 #include "rocksdb/merge_operator.h"
+#include "table/format.h"
 #include "table/internal_iterator.h"
 
 namespace rocksdb {
 
+MergeHelper::MergeHelper(Env* env, const Comparator* user_comparator,
+                         const MergeOperator* user_merge_operator,
+                         const CompactionFilter* compaction_filter,
+                         Logger* logger, bool assert_valid_internal_key,
+                         SequenceNumber latest_snapshot,
+                         const SnapshotChecker* snapshot_checker, int level,
+                         Statistics* stats,
+                         const std::atomic<bool>* shutting_down)
+    : env_(env),
+      user_comparator_(user_comparator),
+      user_merge_operator_(user_merge_operator),
+      compaction_filter_(compaction_filter),
+      shutting_down_(shutting_down),
+      logger_(logger),
+      assert_valid_internal_key_(assert_valid_internal_key),
+      allow_single_operand_(false),
+      latest_snapshot_(latest_snapshot),
+      snapshot_checker_(snapshot_checker),
+      level_(level),
+      keys_(),
+      filter_timer_(env_),
+      total_filter_time_(0U),
+      stats_(stats) {
+  assert(user_comparator_ != nullptr);
+  if (user_merge_operator_) {
+    allow_single_operand_ = user_merge_operator_->AllowSingleOperand();
+  }
+}
+
 Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
                                    const Slice& key, const Slice* value,
                                    const std::vector<Slice>& operands,
                                    std::string* result, Logger* logger,
                                    Statistics* statistics, Env* env,
-                                   Slice* result_operand) {
+                                   Slice* result_operand,
+                                   bool update_num_ops_stats) {
   assert(merge_operator != nullptr);
 
   if (operands.size() == 0) {
@@ -32,6 +63,11 @@ Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
     return Status::OK();
   }
 
+  if (update_num_ops_stats) {
+    MeasureTime(statistics, READ_NUM_MERGE_OPERANDS,
+                static_cast<uint64_t>(operands.size()));
+  }
+
   bool success;
   Slice tmp_result_operand(nullptr, 0);
   const MergeOperator::MergeOperationInput merge_in(key, value, operands,
@@ -125,7 +161,10 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
       // hit a different user key, stop right here
       hit_the_next_user_key = true;
       break;
-    } else if (stop_before && ikey.sequence <= stop_before) {
+    } else if (stop_before > 0 && ikey.sequence <= stop_before &&
+               LIKELY(snapshot_checker_ == nullptr ||
+                      snapshot_checker_->IsInSnapshot(ikey.sequence,
+                                                      stop_before))) {
       // hit an entry that's visible by the previous snapshot, can't touch that
       break;
     }
@@ -195,12 +234,10 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
           ikey.sequence <= latest_snapshot_
               ? CompactionFilter::Decision::kKeep
               : FilterMerge(orig_ikey.user_key, value_slice);
-      if (range_del_agg != nullptr &&
-
+      if (filter != CompactionFilter::Decision::kRemoveAndSkipUntil &&
+          range_del_agg != nullptr &&
           range_del_agg->ShouldDelete(
-              iter->key(),
-              RangeDelAggregator::RangePositioningMode::kForwardTraversal) &&
-          filter != CompactionFilter::Decision::kRemoveAndSkipUntil) {
+              iter->key(), RangeDelPositioningMode::kForwardTraversal)) {
         filter = CompactionFilter::Decision::kRemove;
       }
       if (filter == CompactionFilter::Decision::kKeep ||
@@ -283,7 +320,8 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
     // Attempt to use the user's associative merge function to
     // merge the stacked merge operands into a single operand.
     s = Status::MergeInProgress();
-    if (merge_context_.GetNumOperands() >= 2) {
+    if (merge_context_.GetNumOperands() >= 2 ||
+        (allow_single_operand_ && merge_context_.GetNumOperands() == 1)) {
       bool merge_success = false;
       std::string merge_result;
       {
@@ -334,7 +372,7 @@ CompactionFilter::Decision MergeHelper::FilterMerge(const Slice& user_key,
   if (compaction_filter_ == nullptr) {
     return CompactionFilter::Decision::kKeep;
   }
-  if (stats_ != nullptr) {
+  if (stats_ != nullptr && ShouldReportDetailedTime(env_, stats_)) {
     filter_timer_.Start();
   }
   compaction_filter_value_.clear();