]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / utilities / write_batch_with_index / write_batch_with_index_internal.cc
index de6fc48de09ecc69f17007b7ea65eb599f09291d..3c9205bf78f1e5f248106a29d0d447b152ee29de 100644 (file)
@@ -8,6 +8,7 @@
 #include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
 
 #include "db/column_family.h"
+#include "db/db_impl/db_impl.h"
 #include "db/merge_context.h"
 #include "db/merge_helper.h"
 #include "rocksdb/comparator.h"
 #include "util/string_util.h"
 
 namespace ROCKSDB_NAMESPACE {
+BaseDeltaIterator::BaseDeltaIterator(ColumnFamilyHandle* column_family,
+                                     Iterator* base_iterator,
+                                     WBWIIteratorImpl* delta_iterator,
+                                     const Comparator* comparator,
+                                     const ReadOptions* read_options)
+    : forward_(true),
+      current_at_base_(true),
+      equal_keys_(false),
+      status_(Status::OK()),
+      base_iterator_(base_iterator),
+      delta_iterator_(delta_iterator),
+      comparator_(comparator),
+      iterate_upper_bound_(read_options ? read_options->iterate_upper_bound
+                                        : nullptr) {
+  assert(comparator_);
+  wbwii_.reset(new WriteBatchWithIndexInternal(column_family));
+}
+
+bool BaseDeltaIterator::Valid() const {
+  return status_.ok() ? (current_at_base_ ? BaseValid() : DeltaValid()) : false;
+}
+
+void BaseDeltaIterator::SeekToFirst() {
+  forward_ = true;
+  base_iterator_->SeekToFirst();
+  delta_iterator_->SeekToFirst();
+  UpdateCurrent();
+}
+
+void BaseDeltaIterator::SeekToLast() {
+  forward_ = false;
+  base_iterator_->SeekToLast();
+  delta_iterator_->SeekToLast();
+  UpdateCurrent();
+}
+
+void BaseDeltaIterator::Seek(const Slice& k) {
+  forward_ = true;
+  base_iterator_->Seek(k);
+  delta_iterator_->Seek(k);
+  UpdateCurrent();
+}
+
+void BaseDeltaIterator::SeekForPrev(const Slice& k) {
+  forward_ = false;
+  base_iterator_->SeekForPrev(k);
+  delta_iterator_->SeekForPrev(k);
+  UpdateCurrent();
+}
+
+void BaseDeltaIterator::Next() {
+  if (!Valid()) {
+    status_ = Status::NotSupported("Next() on invalid iterator");
+    return;
+  }
+
+  if (!forward_) {
+    // Need to change direction
+    // if our direction was backward and we're not equal, we have two states:
+    // * both iterators are valid: we're already in a good state (current
+    // shows to smaller)
+    // * only one iterator is valid: we need to advance that iterator
+    forward_ = true;
+    equal_keys_ = false;
+    if (!BaseValid()) {
+      assert(DeltaValid());
+      base_iterator_->SeekToFirst();
+    } else if (!DeltaValid()) {
+      delta_iterator_->SeekToFirst();
+    } else if (current_at_base_) {
+      // Change delta from larger than base to smaller
+      AdvanceDelta();
+    } else {
+      // Change base from larger than delta to smaller
+      AdvanceBase();
+    }
+    if (DeltaValid() && BaseValid()) {
+      if (0 == comparator_->CompareWithoutTimestamp(
+                   delta_iterator_->Entry().key, /*a_has_ts=*/false,
+                   base_iterator_->key(), /*b_has_ts=*/false)) {
+        equal_keys_ = true;
+      }
+    }
+  }
+  Advance();
+}
+
+void BaseDeltaIterator::Prev() {
+  if (!Valid()) {
+    status_ = Status::NotSupported("Prev() on invalid iterator");
+    return;
+  }
+
+  if (forward_) {
+    // Need to change direction
+    // if our direction was backward and we're not equal, we have two states:
+    // * both iterators are valid: we're already in a good state (current
+    // shows to smaller)
+    // * only one iterator is valid: we need to advance that iterator
+    forward_ = false;
+    equal_keys_ = false;
+    if (!BaseValid()) {
+      assert(DeltaValid());
+      base_iterator_->SeekToLast();
+    } else if (!DeltaValid()) {
+      delta_iterator_->SeekToLast();
+    } else if (current_at_base_) {
+      // Change delta from less advanced than base to more advanced
+      AdvanceDelta();
+    } else {
+      // Change base from less advanced than delta to more advanced
+      AdvanceBase();
+    }
+    if (DeltaValid() && BaseValid()) {
+      if (0 == comparator_->CompareWithoutTimestamp(
+                   delta_iterator_->Entry().key, /*a_has_ts=*/false,
+                   base_iterator_->key(), /*b_has_ts=*/false)) {
+        equal_keys_ = true;
+      }
+    }
+  }
+
+  Advance();
+}
+
+Slice BaseDeltaIterator::key() const {
+  return current_at_base_ ? base_iterator_->key()
+                          : delta_iterator_->Entry().key;
+}
+
+Slice BaseDeltaIterator::value() const {
+  if (current_at_base_) {
+    return base_iterator_->value();
+  } else {
+    WriteEntry delta_entry = delta_iterator_->Entry();
+    if (wbwii_->GetNumOperands() == 0) {
+      return delta_entry.value;
+    } else if (delta_entry.type == kDeleteRecord ||
+               delta_entry.type == kSingleDeleteRecord) {
+      status_ =
+          wbwii_->MergeKey(delta_entry.key, nullptr, merge_result_.GetSelf());
+    } else if (delta_entry.type == kPutRecord) {
+      status_ = wbwii_->MergeKey(delta_entry.key, &delta_entry.value,
+                                 merge_result_.GetSelf());
+    } else if (delta_entry.type == kMergeRecord) {
+      if (equal_keys_) {
+        Slice base_value = base_iterator_->value();
+        status_ = wbwii_->MergeKey(delta_entry.key, &base_value,
+                                   merge_result_.GetSelf());
+      } else {
+        status_ =
+            wbwii_->MergeKey(delta_entry.key, nullptr, merge_result_.GetSelf());
+      }
+    }
+    merge_result_.PinSelf();
+    return merge_result_;
+  }
+}
 
-class Env;
-class Logger;
-class Statistics;
+Status BaseDeltaIterator::status() const {
+  if (!status_.ok()) {
+    return status_;
+  }
+  if (!base_iterator_->status().ok()) {
+    return base_iterator_->status();
+  }
+  return delta_iterator_->status();
+}
+
+void BaseDeltaIterator::Invalidate(Status s) { status_ = s; }
+
+void BaseDeltaIterator::AssertInvariants() {
+#ifndef NDEBUG
+  bool not_ok = false;
+  if (!base_iterator_->status().ok()) {
+    assert(!base_iterator_->Valid());
+    not_ok = true;
+  }
+  if (!delta_iterator_->status().ok()) {
+    assert(!delta_iterator_->Valid());
+    not_ok = true;
+  }
+  if (not_ok) {
+    assert(!Valid());
+    assert(!status().ok());
+    return;
+  }
+
+  if (!Valid()) {
+    return;
+  }
+  if (!BaseValid()) {
+    assert(!current_at_base_ && delta_iterator_->Valid());
+    return;
+  }
+  if (!DeltaValid()) {
+    assert(current_at_base_ && base_iterator_->Valid());
+    return;
+  }
+  // we don't support those yet
+  assert(delta_iterator_->Entry().type != kMergeRecord &&
+         delta_iterator_->Entry().type != kLogDataRecord);
+  int compare = comparator_->CompareWithoutTimestamp(
+      delta_iterator_->Entry().key, /*a_has_ts=*/false, base_iterator_->key(),
+      /*b_has_ts=*/false);
+  if (forward_) {
+    // current_at_base -> compare < 0
+    assert(!current_at_base_ || compare < 0);
+    // !current_at_base -> compare <= 0
+    assert(current_at_base_ && compare >= 0);
+  } else {
+    // current_at_base -> compare > 0
+    assert(!current_at_base_ || compare > 0);
+    // !current_at_base -> compare <= 0
+    assert(current_at_base_ && compare <= 0);
+  }
+  // equal_keys_ <=> compare == 0
+  assert((equal_keys_ || compare != 0) && (!equal_keys_ || compare == 0));
+#endif
+}
+
+void BaseDeltaIterator::Advance() {
+  if (equal_keys_) {
+    assert(BaseValid() && DeltaValid());
+    AdvanceBase();
+    AdvanceDelta();
+  } else {
+    if (current_at_base_) {
+      assert(BaseValid());
+      AdvanceBase();
+    } else {
+      assert(DeltaValid());
+      AdvanceDelta();
+    }
+  }
+  UpdateCurrent();
+}
+
+void BaseDeltaIterator::AdvanceDelta() {
+  if (forward_) {
+    delta_iterator_->NextKey();
+  } else {
+    delta_iterator_->PrevKey();
+  }
+}
+void BaseDeltaIterator::AdvanceBase() {
+  if (forward_) {
+    base_iterator_->Next();
+  } else {
+    base_iterator_->Prev();
+  }
+}
+
+bool BaseDeltaIterator::BaseValid() const { return base_iterator_->Valid(); }
+bool BaseDeltaIterator::DeltaValid() const { return delta_iterator_->Valid(); }
+void BaseDeltaIterator::UpdateCurrent() {
+// Suppress false positive clang analyzer warnings.
+#ifndef __clang_analyzer__
+  status_ = Status::OK();
+  while (true) {
+    auto delta_result = WBWIIteratorImpl::kNotFound;
+    WriteEntry delta_entry;
+    if (DeltaValid()) {
+      assert(delta_iterator_->status().ok());
+      delta_result =
+          delta_iterator_->FindLatestUpdate(wbwii_->GetMergeContext());
+      delta_entry = delta_iterator_->Entry();
+    } else if (!delta_iterator_->status().ok()) {
+      // Expose the error status and stop.
+      current_at_base_ = false;
+      return;
+    }
+    equal_keys_ = false;
+    if (!BaseValid()) {
+      if (!base_iterator_->status().ok()) {
+        // Expose the error status and stop.
+        current_at_base_ = true;
+        return;
+      }
+
+      // Base has finished.
+      if (!DeltaValid()) {
+        // Finished
+        return;
+      }
+      if (iterate_upper_bound_) {
+        if (comparator_->CompareWithoutTimestamp(
+                delta_entry.key, /*a_has_ts=*/false, *iterate_upper_bound_,
+                /*b_has_ts=*/false) >= 0) {
+          // out of upper bound -> finished.
+          return;
+        }
+      }
+      if (delta_result == WBWIIteratorImpl::kDeleted &&
+          wbwii_->GetNumOperands() == 0) {
+        AdvanceDelta();
+      } else {
+        current_at_base_ = false;
+        return;
+      }
+    } else if (!DeltaValid()) {
+      // Delta has finished.
+      current_at_base_ = true;
+      return;
+    } else {
+      int compare =
+          (forward_ ? 1 : -1) * comparator_->CompareWithoutTimestamp(
+                                    delta_entry.key, /*a_has_ts=*/false,
+                                    base_iterator_->key(), /*b_has_ts=*/false);
+      if (compare <= 0) {  // delta bigger or equal
+        if (compare == 0) {
+          equal_keys_ = true;
+        }
+        if (delta_result != WBWIIteratorImpl::kDeleted ||
+            wbwii_->GetNumOperands() > 0) {
+          current_at_base_ = false;
+          return;
+        }
+        // Delta is less advanced and is delete.
+        AdvanceDelta();
+        if (equal_keys_) {
+          AdvanceBase();
+        }
+      } else {
+        current_at_base_ = true;
+        return;
+      }
+    }
+  }
+
+  AssertInvariants();
+#endif  // __clang_analyzer__
+}
+
+void WBWIIteratorImpl::AdvanceKey(bool forward) {
+  if (Valid()) {
+    Slice key = Entry().key;
+    do {
+      if (forward) {
+        Next();
+      } else {
+        Prev();
+      }
+    } while (MatchesKey(column_family_id_, key));
+  }
+}
+
+void WBWIIteratorImpl::NextKey() { AdvanceKey(true); }
+
+void WBWIIteratorImpl::PrevKey() {
+  AdvanceKey(false);  // Move to the tail of the previous key
+  if (Valid()) {
+    AdvanceKey(false);  // Move back another key.  Now we are at the start of
+                        // the previous key
+    if (Valid()) {      // Still a valid
+      Next();           // Move forward one onto this key
+    } else {
+      SeekToFirst();  // Not valid, move to the start
+    }
+  }
+}
+
+WBWIIteratorImpl::Result WBWIIteratorImpl::FindLatestUpdate(
+    MergeContext* merge_context) {
+  if (Valid()) {
+    Slice key = Entry().key;
+    return FindLatestUpdate(key, merge_context);
+  } else {
+    merge_context->Clear();  // Clear any entries in the MergeContext
+    return WBWIIteratorImpl::kNotFound;
+  }
+}
+
+WBWIIteratorImpl::Result WBWIIteratorImpl::FindLatestUpdate(
+    const Slice& key, MergeContext* merge_context) {
+  Result result = WBWIIteratorImpl::kNotFound;
+  merge_context->Clear();  // Clear any entries in the MergeContext
+  // TODO(agiardullo): consider adding support for reverse iteration
+  if (!Valid()) {
+    return result;
+  } else if (comparator_->CompareKey(column_family_id_, Entry().key, key) !=
+             0) {
+    return result;
+  } else {
+    // We want to iterate in the reverse order that the writes were added to the
+    // batch.  Since we don't have a reverse iterator, we must seek past the
+    // end. We do this by seeking to the next key, and then back one step
+    NextKey();
+    if (Valid()) {
+      Prev();
+    } else {
+      SeekToLast();
+    }
+
+    // We are at the end of the iterator for this key.  Search backwards for the
+    // last Put or Delete, accumulating merges along the way.
+    while (Valid()) {
+      const WriteEntry entry = Entry();
+      if (comparator_->CompareKey(column_family_id_, entry.key, key) != 0) {
+        break;  // Unexpected error or we've reached a different next key
+      }
+
+      switch (entry.type) {
+        case kPutRecord:
+          return WBWIIteratorImpl::kFound;
+        case kDeleteRecord:
+          return WBWIIteratorImpl::kDeleted;
+        case kSingleDeleteRecord:
+          return WBWIIteratorImpl::kDeleted;
+        case kMergeRecord:
+          result = WBWIIteratorImpl::kMergeInProgress;
+          merge_context->PushOperand(entry.value);
+          break;
+        case kLogDataRecord:
+          break;  // ignore
+        case kXIDRecord:
+          break;  // ignore
+        default:
+          return WBWIIteratorImpl::kError;
+      }  // end switch statement
+      Prev();
+    }  // End while Valid()
+    // At this point, we have been through the whole list and found no Puts or
+    // Deletes. The iterator points to the previous key.  Move the iterator back
+    // onto this one.
+    if (Valid()) {
+      Next();
+    } else {
+      SeekToFirst();
+    }
+  }
+  return result;
+}
 
 Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
                                                   WriteType* type, Slice* Key,
@@ -84,7 +514,7 @@ Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
       break;
     default:
       return Status::Corruption("unknown WriteBatch tag ",
-                                ToString(static_cast<unsigned int>(tag)));
+                                std::to_string(static_cast<unsigned int>(tag)));
   }
   return Status::OK();
 }
@@ -94,7 +524,7 @@ Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
 // 1. first compare the column family, the one with larger CF will be larger;
 // 2. Inside the same CF, we first decode the entry to find the key of the entry
 //    and the entry with larger key will be larger;
-// 3. If two entries are of the same CF and offset, the one with larger offset
+// 3. If two entries are of the same CF and key, the one with larger offset
 //    will be larger.
 // Some times either `entry1` or `entry2` is dummy entry, which is actually
 // a search key. In this case, in step 2, we don't go ahead and decode the
@@ -149,142 +579,154 @@ int WriteBatchEntryComparator::CompareKey(uint32_t column_family,
                                           const Slice& key2) const {
   if (column_family < cf_comparators_.size() &&
       cf_comparators_[column_family] != nullptr) {
-    return cf_comparators_[column_family]->Compare(key1, key2);
+    return cf_comparators_[column_family]->CompareWithoutTimestamp(
+        key1, /*a_has_ts=*/false, key2, /*b_has_ts=*/false);
   } else {
-    return default_comparator_->Compare(key1, key2);
+    return default_comparator_->CompareWithoutTimestamp(
+        key1, /*a_has_ts=*/false, key2, /*b_has_ts=*/false);
   }
 }
 
-WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch(
-    const ImmutableDBOptions& immuable_db_options, WriteBatchWithIndex* batch,
-    ColumnFamilyHandle* column_family, const Slice& key,
-    MergeContext* merge_context, WriteBatchEntryComparator* cmp,
-    std::string* value, bool overwrite_key, Status* s) {
-  uint32_t cf_id = GetColumnFamilyID(column_family);
-  *s = Status::OK();
-  WriteBatchWithIndexInternal::Result result =
-      WriteBatchWithIndexInternal::Result::kNotFound;
-
-  std::unique_ptr<WBWIIterator> iter =
-      std::unique_ptr<WBWIIterator>(batch->NewIterator(column_family));
-
-  // We want to iterate in the reverse order that the writes were added to the
-  // batch.  Since we don't have a reverse iterator, we must seek past the end.
-  // TODO(agiardullo): consider adding support for reverse iteration
-  iter->Seek(key);
-  while (iter->Valid()) {
-    const WriteEntry entry = iter->Entry();
-    if (cmp->CompareKey(cf_id, entry.key, key) != 0) {
-      break;
-    }
+const Comparator* WriteBatchEntryComparator::GetComparator(
+    const ColumnFamilyHandle* column_family) const {
+  return column_family ? column_family->GetComparator() : default_comparator_;
+}
 
-    iter->Next();
+const Comparator* WriteBatchEntryComparator::GetComparator(
+    uint32_t column_family) const {
+  if (column_family < cf_comparators_.size() &&
+      cf_comparators_[column_family]) {
+    return cf_comparators_[column_family];
   }
+  return default_comparator_;
+}
 
-  if (!(*s).ok()) {
-    return WriteBatchWithIndexInternal::Result::kError;
+WriteEntry WBWIIteratorImpl::Entry() const {
+  WriteEntry ret;
+  Slice blob, xid;
+  const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
+  // this is guaranteed with Valid()
+  assert(iter_entry != nullptr &&
+         iter_entry->column_family == column_family_id_);
+  auto s = write_batch_->GetEntryFromDataOffset(
+      iter_entry->offset, &ret.type, &ret.key, &ret.value, &blob, &xid);
+  assert(s.ok());
+  assert(ret.type == kPutRecord || ret.type == kDeleteRecord ||
+         ret.type == kSingleDeleteRecord || ret.type == kDeleteRangeRecord ||
+         ret.type == kMergeRecord);
+  // Make sure entry.key does not include user-defined timestamp.
+  const Comparator* const ucmp = comparator_->GetComparator(column_family_id_);
+  size_t ts_sz = ucmp->timestamp_size();
+  if (ts_sz > 0) {
+    ret.key = StripTimestampFromUserKey(ret.key, ts_sz);
   }
+  return ret;
+}
 
-  if (!iter->Valid()) {
-    // Read past end of results.  Reposition on last result.
-    iter->SeekToLast();
+bool WBWIIteratorImpl::MatchesKey(uint32_t cf_id, const Slice& key) {
+  if (Valid()) {
+    return comparator_->CompareKey(cf_id, key, Entry().key) == 0;
   } else {
-    iter->Prev();
+    return false;
   }
+}
 
-  Slice entry_value;
-  while (iter->Valid()) {
-    const WriteEntry entry = iter->Entry();
-    if (cmp->CompareKey(cf_id, entry.key, key) != 0) {
-      // Unexpected error or we've reached a different next key
-      break;
-    }
+WriteBatchWithIndexInternal::WriteBatchWithIndexInternal(
+    ColumnFamilyHandle* column_family)
+    : db_(nullptr), db_options_(nullptr), column_family_(column_family) {}
 
-    switch (entry.type) {
-      case kPutRecord: {
-        result = WriteBatchWithIndexInternal::Result::kFound;
-        entry_value = entry.value;
-        break;
-      }
-      case kMergeRecord: {
-        result = WriteBatchWithIndexInternal::Result::kMergeInProgress;
-        merge_context->PushOperand(entry.value);
-        break;
-      }
-      case kDeleteRecord:
-      case kSingleDeleteRecord: {
-        result = WriteBatchWithIndexInternal::Result::kDeleted;
-        break;
-      }
-      case kLogDataRecord:
-      case kXIDRecord: {
-        // ignore
-        break;
-      }
-      default: {
-        result = WriteBatchWithIndexInternal::Result::kError;
-        (*s) = Status::Corruption("Unexpected entry in WriteBatchWithIndex:",
-                                  ToString(entry.type));
-        break;
-      }
-    }
-    if (result == WriteBatchWithIndexInternal::Result::kFound ||
-        result == WriteBatchWithIndexInternal::Result::kDeleted ||
-        result == WriteBatchWithIndexInternal::Result::kError) {
-      // We can stop iterating once we find a PUT or DELETE
-      break;
-    }
-    if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
-        overwrite_key == true) {
-      // Since we've overwritten keys, we do not know what other operations are
-      // in this batch for this key, so we cannot do a Merge to compute the
-      // result.  Instead, we will simply return MergeInProgress.
-      break;
-    }
+WriteBatchWithIndexInternal::WriteBatchWithIndexInternal(
+    DB* db, ColumnFamilyHandle* column_family)
+    : db_(db), db_options_(nullptr), column_family_(column_family) {
+  if (db_ != nullptr && column_family_ == nullptr) {
+    column_family_ = db_->DefaultColumnFamily();
+  }
+}
 
-    iter->Prev();
+WriteBatchWithIndexInternal::WriteBatchWithIndexInternal(
+    const DBOptions* db_options, ColumnFamilyHandle* column_family)
+    : db_(nullptr), db_options_(db_options), column_family_(column_family) {}
+
+Status WriteBatchWithIndexInternal::MergeKey(const Slice& key,
+                                             const Slice* value,
+                                             const MergeContext& context,
+                                             std::string* result) const {
+  if (column_family_ != nullptr) {
+    auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family_);
+    const auto merge_operator = cfh->cfd()->ioptions()->merge_operator.get();
+    if (merge_operator == nullptr) {
+      return Status::InvalidArgument(
+          "Merge_operator must be set for column_family");
+    } else if (db_ != nullptr) {
+      const ImmutableDBOptions& immutable_db_options =
+          static_cast_with_check<DBImpl>(db_->GetRootDB())
+              ->immutable_db_options();
+      Statistics* statistics = immutable_db_options.statistics.get();
+      Logger* logger = immutable_db_options.info_log.get();
+      SystemClock* clock = immutable_db_options.clock;
+      return MergeHelper::TimedFullMerge(
+          merge_operator, key, value, context.GetOperands(), result, logger,
+          statistics, clock, /* result_operand */ nullptr,
+          /* update_num_ops_stats */ false);
+    } else if (db_options_ != nullptr) {
+      Statistics* statistics = db_options_->statistics.get();
+      Env* env = db_options_->env;
+      Logger* logger = db_options_->info_log.get();
+      SystemClock* clock = env->GetSystemClock().get();
+      return MergeHelper::TimedFullMerge(
+          merge_operator, key, value, context.GetOperands(), result, logger,
+          statistics, clock, /* result_operand */ nullptr,
+          /* update_num_ops_stats */ false);
+    } else {
+      const auto cf_opts = cfh->cfd()->ioptions();
+      return MergeHelper::TimedFullMerge(
+          merge_operator, key, value, context.GetOperands(), result,
+          cf_opts->logger, cf_opts->stats, cf_opts->clock,
+          /* result_operand */ nullptr, /* update_num_ops_stats */ false);
+    }
+  } else {
+    return Status::InvalidArgument("Must provide a column_family");
   }
+}
 
-  if ((*s).ok()) {
-    if (result == WriteBatchWithIndexInternal::Result::kFound ||
-        result == WriteBatchWithIndexInternal::Result::kDeleted) {
-      // Found a Put or Delete.  Merge if necessary.
-      if (merge_context->GetNumOperands() > 0) {
-        const MergeOperator* merge_operator;
+WBWIIteratorImpl::Result WriteBatchWithIndexInternal::GetFromBatch(
+    WriteBatchWithIndex* batch, const Slice& key, MergeContext* context,
+    std::string* value, Status* s) {
+  *s = Status::OK();
 
-        if (column_family != nullptr) {
-          auto cfh =
-              static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
-          merge_operator = cfh->cfd()->ioptions()->merge_operator;
-        } else {
-          *s = Status::InvalidArgument("Must provide a column_family");
-          result = WriteBatchWithIndexInternal::Result::kError;
-          return result;
-        }
-        Statistics* statistics = immuable_db_options.statistics.get();
-        Env* env = immuable_db_options.env;
-        Logger* logger = immuable_db_options.info_log.get();
-
-        if (merge_operator) {
-          *s = MergeHelper::TimedFullMerge(merge_operator, key, &entry_value,
-                                           merge_context->GetOperands(), value,
-                                           logger, statistics, env);
-        } else {
-          *s = Status::InvalidArgument("Options::merge_operator must be set");
-        }
-        if ((*s).ok()) {
-          result = WriteBatchWithIndexInternal::Result::kFound;
-        } else {
-          result = WriteBatchWithIndexInternal::Result::kError;
-        }
-      } else {  // nothing to merge
-        if (result == WriteBatchWithIndexInternal::Result::kFound) {  // PUT
-          value->assign(entry_value.data(), entry_value.size());
-        }
+  std::unique_ptr<WBWIIteratorImpl> iter(
+      static_cast_with_check<WBWIIteratorImpl>(
+          batch->NewIterator(column_family_)));
+
+  // Search the iterator for this key, and updates/merges to it.
+  iter->Seek(key);
+  auto result = iter->FindLatestUpdate(key, context);
+  if (result == WBWIIteratorImpl::kError) {
+    (*s) = Status::Corruption("Unexpected entry in WriteBatchWithIndex:",
+                              std::to_string(iter->Entry().type));
+    return result;
+  } else if (result == WBWIIteratorImpl::kNotFound) {
+    return result;
+  } else if (result == WBWIIteratorImpl::Result::kFound) {  // PUT
+    Slice entry_value = iter->Entry().value;
+    if (context->GetNumOperands() > 0) {
+      *s = MergeKey(key, &entry_value, *context, value);
+      if (!s->ok()) {
+        result = WBWIIteratorImpl::Result::kError;
+      }
+    } else {
+      value->assign(entry_value.data(), entry_value.size());
+    }
+  } else if (result == WBWIIteratorImpl::kDeleted) {
+    if (context->GetNumOperands() > 0) {
+      *s = MergeKey(key, nullptr, *context, value);
+      if (s->ok()) {
+        result = WBWIIteratorImpl::Result::kFound;
+      } else {
+        result = WBWIIteratorImpl::Result::kError;
       }
     }
   }
-
   return result;
 }