// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
-// This source code is licensed under the BSD-style license found in the
-// LICENSE file in the root directory of this source tree. An additional grant
-// of patent rights can be found in the PATENTS file in the same directory.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
#include "db/merge_helper.h"
-#include <stdio.h>
#include <string>
#include "db/dbformat.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/statistics.h"
+#include "port/likely.h"
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/merge_operator.h"
+#include "table/format.h"
#include "table/internal_iterator.h"
namespace rocksdb {
+MergeHelper::MergeHelper(Env* env, const Comparator* user_comparator,
+ const MergeOperator* user_merge_operator,
+ const CompactionFilter* compaction_filter,
+ Logger* logger, bool assert_valid_internal_key,
+ SequenceNumber latest_snapshot,
+ const SnapshotChecker* snapshot_checker, int level,
+ Statistics* stats,
+ const std::atomic<bool>* shutting_down)
+ : env_(env),
+ user_comparator_(user_comparator),
+ user_merge_operator_(user_merge_operator),
+ compaction_filter_(compaction_filter),
+ shutting_down_(shutting_down),
+ logger_(logger),
+ assert_valid_internal_key_(assert_valid_internal_key),
+ allow_single_operand_(false),
+ latest_snapshot_(latest_snapshot),
+ snapshot_checker_(snapshot_checker),
+ level_(level),
+ keys_(),
+ filter_timer_(env_),
+ total_filter_time_(0U),
+ stats_(stats) {
+ assert(user_comparator_ != nullptr);
+ if (user_merge_operator_) {
+ allow_single_operand_ = user_merge_operator_->AllowSingleOperand();
+ }
+}
+
Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
const Slice& key, const Slice* value,
const std::vector<Slice>& operands,
std::string* result, Logger* logger,
Statistics* statistics, Env* env,
- Slice* result_operand) {
+ Slice* result_operand,
+ bool update_num_ops_stats) {
assert(merge_operator != nullptr);
if (operands.size() == 0) {
return Status::OK();
}
+ if (update_num_ops_stats) {
+ MeasureTime(statistics, READ_NUM_MERGE_OPERANDS,
+ static_cast<uint64_t>(operands.size()));
+ }
+
bool success;
Slice tmp_result_operand(nullptr, 0);
const MergeOperator::MergeOperationInput merge_in(key, value, operands,
// hit a different user key, stop right here
hit_the_next_user_key = true;
break;
- } else if (stop_before && ikey.sequence <= stop_before) {
+ } else if (stop_before > 0 && ikey.sequence <= stop_before &&
+ LIKELY(snapshot_checker_ == nullptr ||
+ snapshot_checker_->IsInSnapshot(ikey.sequence,
+ stop_before))) {
// hit an entry that's visible by the previous snapshot, can't touch that
break;
}
ikey.sequence <= latest_snapshot_
? CompactionFilter::Decision::kKeep
: FilterMerge(orig_ikey.user_key, value_slice);
- if (range_del_agg != nullptr &&
-
+ if (filter != CompactionFilter::Decision::kRemoveAndSkipUntil &&
+ range_del_agg != nullptr &&
range_del_agg->ShouldDelete(
- iter->key(),
- RangeDelAggregator::RangePositioningMode::kForwardTraversal) &&
- filter != CompactionFilter::Decision::kRemoveAndSkipUntil) {
+ iter->key(), RangeDelPositioningMode::kForwardTraversal)) {
filter = CompactionFilter::Decision::kRemove;
}
if (filter == CompactionFilter::Decision::kKeep ||
// Attempt to use the user's associative merge function to
// merge the stacked merge operands into a single operand.
s = Status::MergeInProgress();
- if (merge_context_.GetNumOperands() >= 2) {
+ if (merge_context_.GetNumOperands() >= 2 ||
+ (allow_single_operand_ && merge_context_.GetNumOperands() == 1)) {
bool merge_success = false;
std::string merge_result;
{
if (compaction_filter_ == nullptr) {
return CompactionFilter::Decision::kKeep;
}
- if (stats_ != nullptr) {
+ if (stats_ != nullptr && ShouldReportDetailedTime(env_, stats_)) {
filter_timer_.Start();
}
compaction_filter_value_.clear();