]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / utilities / write_batch_with_index / write_batch_with_index.cc
index 0926c84687cf96f91536b8404f1293c14333c530..2202d6baf75ab1e93b969816393f0522a4fdc5fc 100644 (file)
@@ -1,13 +1,12 @@
 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
-//  This source code is licensed under the BSD-style license found in the
-//  LICENSE file in the root directory of this source tree. An additional grant
-//  of patent rights can be found in the PATENTS file in the same directory.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
 
 #ifndef ROCKSDB_LITE
 
 #include "rocksdb/utilities/write_batch_with_index.h"
 
-#include <limits>
 #include <memory>
 
 #include "db/column_family.h"
@@ -19,6 +18,8 @@
 #include "rocksdb/comparator.h"
 #include "rocksdb/iterator.h"
 #include "util/arena.h"
+#include "util/cast_util.h"
+#include "util/string_util.h"
 #include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
 
 namespace rocksdb {
@@ -78,6 +79,7 @@ class BaseDeltaIterator : public Iterator {
   void Next() override {
     if (!Valid()) {
       status_ = Status::NotSupported("Next() on invalid iterator");
+      return;
     }
 
     if (!forward_) {
@@ -113,6 +115,7 @@ class BaseDeltaIterator : public Iterator {
   void Prev() override {
     if (!Valid()) {
       status_ = Status::NotSupported("Prev() on invalid iterator");
+      return;
     }
 
     if (forward_) {
@@ -169,6 +172,21 @@ class BaseDeltaIterator : public Iterator {
  private:
   void AssertInvariants() {
 #ifndef NDEBUG
+    bool not_ok = false;
+    if (!base_iterator_->status().ok()) {
+      assert(!base_iterator_->Valid());
+      not_ok = true;
+    }
+    if (!delta_iterator_->status().ok()) {
+      assert(!delta_iterator_->Valid());
+      not_ok = true;
+    }
+    if (not_ok) {
+      assert(!Valid());
+      assert(!status().ok());
+      return;
+    }
+
     if (!Valid()) {
       return;
     }
@@ -237,13 +255,25 @@ class BaseDeltaIterator : public Iterator {
   void UpdateCurrent() {
 // Suppress false positive clang analyzer warnings.
 #ifndef __clang_analyzer__
+    status_ = Status::OK();
     while (true) {
       WriteEntry delta_entry;
       if (DeltaValid()) {
+        assert(delta_iterator_->status().ok());
         delta_entry = delta_iterator_->Entry();
+      } else if (!delta_iterator_->status().ok()) {
+        // Expose the error status and stop.
+        current_at_base_ = false;
+        return;
       }
       equal_keys_ = false;
       if (!BaseValid()) {
+        if (!base_iterator_->status().ok()) {
+          // Expose the error status and stop.
+          current_at_base_ = true;
+          return;
+        }
+
         // Base has finished.
         if (!DeltaValid()) {
           // Finished
@@ -322,14 +352,16 @@ class WBWIIteratorImpl : public WBWIIterator {
   }
 
   virtual void SeekToFirst() override {
-    WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin,
-                                      column_family_id_, 0, 0);
+    WriteBatchIndexEntry search_entry(
+        nullptr /* search_key */, column_family_id_,
+        true /* is_forward_direction */, true /* is_seek_to_first */);
     skip_list_iter_.Seek(&search_entry);
   }
 
   virtual void SeekToLast() override {
-    WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin,
-                                      column_family_id_ + 1, 0, 0);
+    WriteBatchIndexEntry search_entry(
+        nullptr /* search_key */, column_family_id_ + 1,
+        true /* is_forward_direction */, true /* is_seek_to_first */);
     skip_list_iter_.Seek(&search_entry);
     if (!skip_list_iter_.Valid()) {
       skip_list_iter_.SeekToLast();
@@ -339,12 +371,16 @@ class WBWIIteratorImpl : public WBWIIterator {
   }
 
   virtual void Seek(const Slice& key) override {
-    WriteBatchIndexEntry search_entry(&key, column_family_id_);
+    WriteBatchIndexEntry search_entry(&key, column_family_id_,
+                                      true /* is_forward_direction */,
+                                      false /* is_seek_to_first */);
     skip_list_iter_.Seek(&search_entry);
   }
 
   virtual void SeekForPrev(const Slice& key) override {
-    WriteBatchIndexEntry search_entry(&key, column_family_id_);
+    WriteBatchIndexEntry search_entry(&key, column_family_id_,
+                                      false /* is_forward_direction */,
+                                      false /* is_seek_to_first */);
     skip_list_iter_.SeekForPrev(&search_entry);
   }
 
@@ -385,19 +421,27 @@ class WBWIIteratorImpl : public WBWIIterator {
 };
 
 struct WriteBatchWithIndex::Rep {
-  Rep(const Comparator* index_comparator, size_t reserved_bytes = 0,
-      size_t max_bytes = 0, bool _overwrite_key = false)
+  explicit Rep(const Comparator* index_comparator, size_t reserved_bytes = 0,
+               size_t max_bytes = 0, bool _overwrite_key = false)
       : write_batch(reserved_bytes, max_bytes),
         comparator(index_comparator, &write_batch),
         skip_list(comparator, &arena),
         overwrite_key(_overwrite_key),
-        last_entry_offset(0) {}
+        last_entry_offset(0),
+        last_sub_batch_offset(0),
+        sub_batch_cnt(1) {}
   ReadableWriteBatch write_batch;
   WriteBatchEntryComparator comparator;
   Arena arena;
   WriteBatchEntrySkipList skip_list;
   bool overwrite_key;
   size_t last_entry_offset;
+  // The starting offset of the last sub-batch. A sub-batch starts right before
+  // inserting a key that is a duplicate of a key in the last sub-batch. Zero,
+  // the default, means that no duplicate key is detected so far.
+  size_t last_sub_batch_offset;
+  // Total number of sub-batches in the write batch. Default is 1.
+  size_t sub_batch_cnt;
 
   // Remember current offset of internal write batch, which is used as
   // the starting offset of the next record.
@@ -449,6 +493,10 @@ bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId(
   }
   WriteBatchIndexEntry* non_const_entry =
       const_cast<WriteBatchIndexEntry*>(iter.GetRawEntry());
+  if (LIKELY(last_sub_batch_offset <= non_const_entry->offset)) {
+    last_sub_batch_offset = last_entry_offset;
+    sub_batch_cnt++;
+  }
   non_const_entry->offset = last_entry_offset;
   return true;
 }
@@ -477,106 +525,114 @@ void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) {
                           wb_data.size() - last_entry_offset);
   // Extract key
   Slice key;
-  bool success __attribute__((__unused__)) =
+  bool success __attribute__((__unused__));
+  success =
       ReadKeyFromWriteBatchEntry(&entry_ptr, &key, column_family_id != 0);
   assert(success);
 
-    auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry));
-    auto* index_entry =
-        new (mem) WriteBatchIndexEntry(last_entry_offset, column_family_id,
-                                       key.data() - wb_data.data(), key.size());
-    skip_list.Insert(index_entry);
-  }
-
-  void WriteBatchWithIndex::Rep::Clear() {
-    write_batch.Clear();
-    ClearIndex();
-  }
+  auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry));
+  auto* index_entry =
+      new (mem) WriteBatchIndexEntry(last_entry_offset, column_family_id,
+                                      key.data() - wb_data.data(), key.size());
+  skip_list.Insert(index_entry);
+}
 
-  void WriteBatchWithIndex::Rep::ClearIndex() {
-    skip_list.~WriteBatchEntrySkipList();
-    arena.~Arena();
-    new (&arena) Arena();
-    new (&skip_list) WriteBatchEntrySkipList(comparator, &arena);
-    last_entry_offset = 0;
-  }
+void WriteBatchWithIndex::Rep::Clear() {
+  write_batch.Clear();
+  ClearIndex();
+}
 
-  Status WriteBatchWithIndex::Rep::ReBuildIndex() {
-    Status s;
+void WriteBatchWithIndex::Rep::ClearIndex() {
+  skip_list.~WriteBatchEntrySkipList();
+  arena.~Arena();
+  new (&arena) Arena();
+  new (&skip_list) WriteBatchEntrySkipList(comparator, &arena);
+  last_entry_offset = 0;
+  last_sub_batch_offset = 0;
+  sub_batch_cnt = 1;
+}
 
-    ClearIndex();
+Status WriteBatchWithIndex::Rep::ReBuildIndex() {
+  Status s;
 
-    if (write_batch.Count() == 0) {
-      // Nothing to re-index
-      return s;
-    }
+  ClearIndex();
 
-    size_t offset = WriteBatchInternal::GetFirstOffset(&write_batch);
+  if (write_batch.Count() == 0) {
+    // Nothing to re-index
+    return s;
+  }
 
-    Slice input(write_batch.Data());
-    input.remove_prefix(offset);
+  size_t offset = WriteBatchInternal::GetFirstOffset(&write_batch);
 
-    // Loop through all entries in Rep and add each one to the index
-    int found = 0;
-    while (s.ok() && !input.empty()) {
-      Slice key, value, blob, xid;
-      uint32_t column_family_id = 0;  // default
-      char tag = 0;
+  Slice input(write_batch.Data());
+  input.remove_prefix(offset);
 
-      // set offset of current entry for call to AddNewEntry()
-      last_entry_offset = input.data() - write_batch.Data().data();
+  // Loop through all entries in Rep and add each one to the index
+  int found = 0;
+  while (s.ok() && !input.empty()) {
+    Slice key, value, blob, xid;
+    uint32_t column_family_id = 0;  // default
+    char tag = 0;
 
-      s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key,
-                                   &value, &blob, &xid);
-      if (!s.ok()) {
-        break;
-      }
+    // set offset of current entry for call to AddNewEntry()
+    last_entry_offset = input.data() - write_batch.Data().data();
 
-      switch (tag) {
-        case kTypeColumnFamilyValue:
-        case kTypeValue:
-        case kTypeColumnFamilyDeletion:
-        case kTypeDeletion:
-        case kTypeColumnFamilySingleDeletion:
-        case kTypeSingleDeletion:
-        case kTypeColumnFamilyMerge:
-        case kTypeMerge:
-          found++;
-          if (!UpdateExistingEntryWithCfId(column_family_id, key)) {
-            AddNewEntry(column_family_id);
-          }
-          break;
-        case kTypeLogData:
-        case kTypeBeginPrepareXID:
-        case kTypeEndPrepareXID:
-        case kTypeCommitXID:
-        case kTypeRollbackXID:
-        case kTypeNoop:
-          break;
-        default:
-          return Status::Corruption("unknown WriteBatch tag");
-      }
+    s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key,
+                                  &value, &blob, &xid);
+    if (!s.ok()) {
+      break;
     }
 
-    if (s.ok() && found != write_batch.Count()) {
-      s = Status::Corruption("WriteBatch has wrong count");
+    switch (tag) {
+      case kTypeColumnFamilyValue:
+      case kTypeValue:
+      case kTypeColumnFamilyDeletion:
+      case kTypeDeletion:
+      case kTypeColumnFamilySingleDeletion:
+      case kTypeSingleDeletion:
+      case kTypeColumnFamilyMerge:
+      case kTypeMerge:
+        found++;
+        if (!UpdateExistingEntryWithCfId(column_family_id, key)) {
+          AddNewEntry(column_family_id);
+        }
+        break;
+      case kTypeLogData:
+      case kTypeBeginPrepareXID:
+      case kTypeBeginPersistedPrepareXID:
+      case kTypeBeginUnprepareXID:
+      case kTypeEndPrepareXID:
+      case kTypeCommitXID:
+      case kTypeRollbackXID:
+      case kTypeNoop:
+        break;
+      default:
+        return Status::Corruption("unknown WriteBatch tag in ReBuildIndex",
+                                  ToString(static_cast<unsigned int>(tag)));
     }
+  }
 
-    return s;
+  if (s.ok() && found != write_batch.Count()) {
+    s = Status::Corruption("WriteBatch has wrong count");
   }
 
-  WriteBatchWithIndex::WriteBatchWithIndex(
-      const Comparator* default_index_comparator, size_t reserved_bytes,
-      bool overwrite_key, size_t max_bytes)
-      : rep(new Rep(default_index_comparator, reserved_bytes, max_bytes,
-                    overwrite_key)) {}
+  return s;
+}
+
+WriteBatchWithIndex::WriteBatchWithIndex(
+    const Comparator* default_index_comparator, size_t reserved_bytes,
+    bool overwrite_key, size_t max_bytes)
+    : rep(new Rep(default_index_comparator, reserved_bytes, max_bytes,
+                  overwrite_key)) {}
 
-  WriteBatchWithIndex::~WriteBatchWithIndex() {}
+WriteBatchWithIndex::~WriteBatchWithIndex() {}
 
-  WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; }
+WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; }
 
-  WBWIIterator* WriteBatchWithIndex::NewIterator() {
-    return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch);
+size_t WriteBatchWithIndex::SubBatchCnt() { return rep->sub_batch_cnt; }
+
+WBWIIterator* WriteBatchWithIndex::NewIterator() {
+  return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch);
 }
 
 WBWIIterator* WriteBatchWithIndex::NewIterator(
@@ -743,8 +799,23 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
                                               const ReadOptions& read_options,
                                               const Slice& key,
                                               std::string* value) {
+  assert(value != nullptr);
+  PinnableSlice pinnable_val(value);
+  assert(!pinnable_val.IsPinned());
+  auto s = GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key,
+                             &pinnable_val);
+  if (s.ok() && pinnable_val.IsPinned()) {
+    value->assign(pinnable_val.data(), pinnable_val.size());
+  }  // else value is already assigned
+  return s;
+}
+
+Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
+                                              const ReadOptions& read_options,
+                                              const Slice& key,
+                                              PinnableSlice* pinnable_val) {
   return GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key,
-                           value);
+                           pinnable_val);
 }
 
 Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
@@ -752,19 +823,46 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
                                               ColumnFamilyHandle* column_family,
                                               const Slice& key,
                                               std::string* value) {
+  assert(value != nullptr);
+  PinnableSlice pinnable_val(value);
+  assert(!pinnable_val.IsPinned());
+  auto s =
+      GetFromBatchAndDB(db, read_options, column_family, key, &pinnable_val);
+  if (s.ok() && pinnable_val.IsPinned()) {
+    value->assign(pinnable_val.data(), pinnable_val.size());
+  }  // else value is already assigned
+  return s;
+}
+
+Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
+                                              const ReadOptions& read_options,
+                                              ColumnFamilyHandle* column_family,
+                                              const Slice& key,
+                                              PinnableSlice* pinnable_val) {
+  return GetFromBatchAndDB(db, read_options, column_family, key, pinnable_val,
+                           nullptr);
+}
+
+Status WriteBatchWithIndex::GetFromBatchAndDB(
+    DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
+    const Slice& key, PinnableSlice* pinnable_val, ReadCallback* callback) {
   Status s;
   MergeContext merge_context;
   const ImmutableDBOptions& immuable_db_options =
-      reinterpret_cast<DBImpl*>(db)->immutable_db_options();
+      static_cast_with_check<DBImpl, DB>(db->GetRootDB())
+          ->immutable_db_options();
 
-  std::string batch_value;
+  // Since the lifetime of the WriteBatch is the same as that of the transaction
+  // we cannot pin it as otherwise the returned value will not be available
+  // after the transaction finishes.
+  std::string& batch_value = *pinnable_val->GetSelf();
   WriteBatchWithIndexInternal::Result result =
       WriteBatchWithIndexInternal::GetFromBatch(
           immuable_db_options, this, column_family, key, &merge_context,
           &rep->comparator, &batch_value, rep->overwrite_key, &s);
 
   if (result == WriteBatchWithIndexInternal::Result::kFound) {
-    value->assign(batch_value.data(), batch_value.size());
+    pinnable_val->PinSelf();
     return s;
   }
   if (result == WriteBatchWithIndexInternal::Result::kDeleted) {
@@ -785,7 +883,13 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
          result == WriteBatchWithIndexInternal::Result::kNotFound);
 
   // Did not find key in batch OR could not resolve Merges.  Try DB.
-  s = db->Get(read_options, column_family, key, value);
+  if (!callback) {
+    s = db->Get(read_options, column_family, key, pinnable_val);
+  } else {
+    s = static_cast_with_check<DBImpl, DB>(db->GetRootDB())
+            ->GetImpl(read_options, column_family, key, pinnable_val, nullptr,
+                      callback);
+  }
 
   if (s.ok() || s.IsNotFound()) {  // DB Get Succeeded
     if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress) {
@@ -797,18 +901,18 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
       Env* env = immuable_db_options.env;
       Logger* logger = immuable_db_options.info_log.get();
 
-      Slice db_slice(*value);
       Slice* merge_data;
       if (s.ok()) {
-        merge_data = &db_slice;
+        merge_data = pinnable_val;
       } else {  // Key not present in db (s.IsNotFound())
         merge_data = nullptr;
       }
 
       if (merge_operator) {
-        s = MergeHelper::TimedFullMerge(merge_operator, key, merge_data,
-                                        merge_context.GetOperands(), value,
-                                        logger, statistics, env);
+        s = MergeHelper::TimedFullMerge(
+            merge_operator, key, merge_data, merge_context.GetOperands(),
+            pinnable_val->GetSelf(), logger, statistics, env);
+        pinnable_val->PinSelf();
       } else {
         s = Status::InvalidArgument("Options::merge_operator must be set");
       }
@@ -824,15 +928,25 @@ Status WriteBatchWithIndex::RollbackToSavePoint() {
   Status s = rep->write_batch.RollbackToSavePoint();
 
   if (s.ok()) {
+    rep->sub_batch_cnt = 1;
+    rep->last_sub_batch_offset = 0;
     s = rep->ReBuildIndex();
   }
 
   return s;
 }
 
+Status WriteBatchWithIndex::PopSavePoint() {
+  return rep->write_batch.PopSavePoint();
+}
+
 void WriteBatchWithIndex::SetMaxBytes(size_t max_bytes) {
   rep->write_batch.SetMaxBytes(max_bytes);
 }
 
+size_t WriteBatchWithIndex::GetDataSize() const {
+  return rep->write_batch.GetDataSize();
+}
+
 }  // namespace rocksdb
 #endif  // !ROCKSDB_LITE