]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/write_batch.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / write_batch.cc
index d578db59bac0161e48074b5ec5f21176bf8284ae..c7f2628f7bd305dc0da6329eea3c183b3e7fc689 100644 (file)
 #include "db/write_batch_internal.h"
 #include "monitoring/perf_context_imp.h"
 #include "monitoring/statistics.h"
+#include "port/lang.h"
 #include "rocksdb/merge_operator.h"
 #include "util/autovector.h"
 #include "util/cast_util.h"
 #include "util/coding.h"
 #include "util/duplicate_detector.h"
 #include "util/string_util.h"
-#include "util/util.h"
 
 namespace ROCKSDB_NAMESPACE {
 
@@ -340,7 +340,8 @@ uint32_t WriteBatch::ComputeContentFlags() const {
   auto rv = content_flags_.load(std::memory_order_relaxed);
   if ((rv & ContentFlags::DEFERRED) != 0) {
     BatchContentClassifier classifier;
-    Iterate(&classifier);
+    // Should we handle status here?
+    Iterate(&classifier).PermitUncheckedError();
     rv = classifier.content_flags;
 
     // this method is conceptually const, because it is performing a lazy
@@ -639,7 +640,8 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb,
       case kTypeBeginPrepareXID:
         assert(wb->content_flags_.load(std::memory_order_relaxed) &
                (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
-        handler->MarkBeginPrepare();
+        s = handler->MarkBeginPrepare();
+        assert(s.ok());
         empty_batch = false;
         if (!handler->WriteAfterCommit()) {
           s = Status::NotSupported(
@@ -658,7 +660,8 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb,
       case kTypeBeginPersistedPrepareXID:
         assert(wb->content_flags_.load(std::memory_order_relaxed) &
                (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
-        handler->MarkBeginPrepare();
+        s = handler->MarkBeginPrepare();
+        assert(s.ok());
         empty_batch = false;
         if (handler->WriteAfterCommit()) {
           s = Status::NotSupported(
@@ -671,7 +674,8 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb,
       case kTypeBeginUnprepareXID:
         assert(wb->content_flags_.load(std::memory_order_relaxed) &
                (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_UNPREPARE));
-        handler->MarkBeginPrepare(true /* unprepared */);
+        s = handler->MarkBeginPrepare(true /* unprepared */);
+        assert(s.ok());
         empty_batch = false;
         if (handler->WriteAfterCommit()) {
           s = Status::NotSupported(
@@ -690,23 +694,27 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb,
       case kTypeEndPrepareXID:
         assert(wb->content_flags_.load(std::memory_order_relaxed) &
                (ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE));
-        handler->MarkEndPrepare(xid);
+        s = handler->MarkEndPrepare(xid);
+        assert(s.ok());
         empty_batch = true;
         break;
       case kTypeCommitXID:
         assert(wb->content_flags_.load(std::memory_order_relaxed) &
                (ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT));
-        handler->MarkCommit(xid);
+        s = handler->MarkCommit(xid);
+        assert(s.ok());
         empty_batch = true;
         break;
       case kTypeRollbackXID:
         assert(wb->content_flags_.load(std::memory_order_relaxed) &
                (ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK));
-        handler->MarkRollback(xid);
+        s = handler->MarkRollback(xid);
+        assert(s.ok());
         empty_batch = true;
         break;
       case kTypeNoop:
-        handler->MarkNoop(empty_batch);
+        s = handler->MarkNoop(empty_batch);
+        assert(s.ok());
         empty_batch = true;
         break;
       default:
@@ -908,7 +916,14 @@ Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
     PutVarint32(&b->rep_, column_family_id);
   }
-  PutLengthPrefixedSlice(&b->rep_, key);
+  if (0 == b->timestamp_size_) {
+    PutLengthPrefixedSlice(&b->rep_, key);
+  } else {
+    PutVarint32(&b->rep_,
+                static_cast<uint32_t>(key.size() + b->timestamp_size_));
+    b->rep_.append(key.data(), key.size());
+    b->rep_.append(b->timestamp_size_, '\0');
+  }
   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
                               ContentFlags::HAS_DELETE,
                           std::memory_order_relaxed);
@@ -930,7 +945,11 @@ Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
     PutVarint32(&b->rep_, column_family_id);
   }
-  PutLengthPrefixedSliceParts(&b->rep_, key);
+  if (0 == b->timestamp_size_) {
+    PutLengthPrefixedSliceParts(&b->rep_, key);
+  } else {
+    PutLengthPrefixedSlicePartsWithPadding(&b->rep_, key, b->timestamp_size_);
+  }
   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
                               ContentFlags::HAS_DELETE,
                           std::memory_order_relaxed);
@@ -1281,7 +1300,7 @@ class MemTableInserter : public WriteBatch::Handler {
         ignore_missing_column_families_(ignore_missing_column_families),
         recovering_log_number_(recovering_log_number),
         log_number_ref_(0),
-        db_(static_cast_with_check<DBImpl, DB>(db)),
+        db_(static_cast_with_check<DBImpl>(db)),
         concurrent_memtable_writes_(concurrent_memtable_writes),
         post_info_created_(false),
         has_valid_writes_(has_valid_writes),
@@ -1399,25 +1418,26 @@ class MemTableInserter : public WriteBatch::Handler {
                    const Slice& value, ValueType value_type) {
     // optimize for non-recovery mode
     if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
-      WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
-      return Status::OK();
+      return WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key,
+                                     value);
       // else insert the values to the memtable right away
     }
 
-    Status seek_status;
-    if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
+    Status ret_status;
+    if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
       bool batch_boundry = false;
       if (rebuilding_trx_ != nullptr) {
         assert(!write_after_commit_);
         // The CF is probably flushed and hence no need for insert but we still
         // need to keep track of the keys for upcoming rollback/commit.
-        WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
+        ret_status = WriteBatchInternal::Put(rebuilding_trx_, column_family_id,
+                                             key, value);
+        assert(ret_status.ok());
         batch_boundry = IsDuplicateKeySeq(column_family_id, key);
       }
       MaybeAdvanceSeq(batch_boundry);
-      return seek_status;
+      return ret_status;
     }
-    Status ret_status;
 
     MemTable* mem = cf_mems_->GetMemTable();
     auto* moptions = mem->GetImmutableMemTableOptions();
@@ -1490,7 +1510,9 @@ class MemTableInserter : public WriteBatch::Handler {
       assert(!write_after_commit_);
       // If the ret_status is TryAgain then let the next try to add the ky to
       // the rebuilding transaction object.
-      WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
+      ret_status = WriteBatchInternal::Put(rebuilding_trx_, column_family_id,
+                                           key, value);
+      assert(ret_status.ok());
     }
     // Since all Puts are logged in transaction logs (if enabled), always bump
     // sequence number. Even if the update eventually fails and does not result
@@ -1527,32 +1549,41 @@ class MemTableInserter : public WriteBatch::Handler {
   Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
     // optimize for non-recovery mode
     if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
-      WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
-      return Status::OK();
+      return WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
       // else insert the values to the memtable right away
     }
 
-    Status seek_status;
-    if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
+    Status ret_status;
+    if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
       bool batch_boundry = false;
       if (rebuilding_trx_ != nullptr) {
         assert(!write_after_commit_);
         // The CF is probably flushed and hence no need for insert but we still
         // need to keep track of the keys for upcoming rollback/commit.
-        WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
+        ret_status =
+            WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
+        assert(ret_status.ok());
         batch_boundry = IsDuplicateKeySeq(column_family_id, key);
       }
       MaybeAdvanceSeq(batch_boundry);
-      return seek_status;
+      return ret_status;
     }
 
-    auto ret_status = DeleteImpl(column_family_id, key, Slice(), kTypeDeletion);
+    ColumnFamilyData* cfd = cf_mems_->current();
+    assert(!cfd || cfd->user_comparator());
+    const size_t ts_sz = (cfd && cfd->user_comparator())
+                             ? cfd->user_comparator()->timestamp_size()
+                             : 0;
+    const ValueType delete_type =
+        (0 == ts_sz) ? kTypeDeletion : kTypeDeletionWithTimestamp;
+    ret_status = DeleteImpl(column_family_id, key, Slice(), delete_type);
     // optimize for non-recovery mode
     if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
       assert(!write_after_commit_);
       // If the ret_status is TryAgain then let the next try to add the ky to
       // the rebuilding transaction object.
-      WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
+      ret_status =
+          WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
     }
     return ret_status;
   }
@@ -1560,34 +1591,36 @@ class MemTableInserter : public WriteBatch::Handler {
   Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override {
     // optimize for non-recovery mode
     if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
-      WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key);
-      return Status::OK();
+      return WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id,
+                                              key);
       // else insert the values to the memtable right away
     }
 
-    Status seek_status;
-    if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
+    Status ret_status;
+    if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
       bool batch_boundry = false;
       if (rebuilding_trx_ != nullptr) {
         assert(!write_after_commit_);
         // The CF is probably flushed and hence no need for insert but we still
         // need to keep track of the keys for upcoming rollback/commit.
-        WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id,
-                                         key);
+        ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_,
+                                                      column_family_id, key);
+        assert(ret_status.ok());
         batch_boundry = IsDuplicateKeySeq(column_family_id, key);
       }
       MaybeAdvanceSeq(batch_boundry);
-      return seek_status;
+      return ret_status;
     }
 
-    auto ret_status =
+    ret_status =
         DeleteImpl(column_family_id, key, Slice(), kTypeSingleDeletion);
     // optimize for non-recovery mode
     if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
       assert(!write_after_commit_);
       // If the ret_status is TryAgain then let the next try to add the ky to
       // the rebuilding transaction object.
-      WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key);
+      ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_,
+                                                    column_family_id, key);
     }
     return ret_status;
   }
@@ -1596,51 +1629,61 @@ class MemTableInserter : public WriteBatch::Handler {
                        const Slice& end_key) override {
     // optimize for non-recovery mode
     if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
-      WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
-                                      begin_key, end_key);
-      return Status::OK();
+      return WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
+                                             begin_key, end_key);
       // else insert the values to the memtable right away
     }
 
-    Status seek_status;
-    if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
+    Status ret_status;
+    if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
       bool batch_boundry = false;
       if (rebuilding_trx_ != nullptr) {
         assert(!write_after_commit_);
         // The CF is probably flushed and hence no need for insert but we still
         // need to keep track of the keys for upcoming rollback/commit.
-        WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
-                                        begin_key, end_key);
+        ret_status = WriteBatchInternal::DeleteRange(
+            rebuilding_trx_, column_family_id, begin_key, end_key);
+        assert(ret_status.ok());
         // TODO(myabandeh): when transactional DeleteRange support is added,
         // check if end_key must also be added.
         batch_boundry = IsDuplicateKeySeq(column_family_id, begin_key);
       }
       MaybeAdvanceSeq(batch_boundry);
-      return seek_status;
+      return ret_status;
     }
     if (db_ != nullptr) {
       auto cf_handle = cf_mems_->GetColumnFamilyHandle();
       if (cf_handle == nullptr) {
         cf_handle = db_->DefaultColumnFamily();
       }
-      auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cf_handle)->cfd();
+      auto* cfd =
+          static_cast_with_check<ColumnFamilyHandleImpl>(cf_handle)->cfd();
       if (!cfd->is_delete_range_supported()) {
         return Status::NotSupported(
             std::string("DeleteRange not supported for table type ") +
             cfd->ioptions()->table_factory->Name() + " in CF " +
             cfd->GetName());
       }
+      int cmp = cfd->user_comparator()->Compare(begin_key, end_key);
+      if (cmp > 0) {
+        // It's an empty range where endpoints appear mistaken. Don't bother
+        // applying it to the DB, and return an error to the user.
+        return Status::InvalidArgument("end key comes before start key");
+      } else if (cmp == 0) {
+        // It's an empty range. Don't bother applying it to the DB.
+        return Status::OK();
+      }
     }
 
-    auto ret_status =
+    ret_status =
         DeleteImpl(column_family_id, begin_key, end_key, kTypeRangeDeletion);
     // optimize for non-recovery mode
     if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
       assert(!write_after_commit_);
       // If the ret_status is TryAgain then let the next try to add the ky to
       // the rebuilding transaction object.
-      WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
-                                      begin_key, end_key);
+      ret_status = WriteBatchInternal::DeleteRange(
+          rebuilding_trx_, column_family_id, begin_key, end_key);
     }
     return ret_status;
   }
@@ -1649,27 +1692,27 @@ class MemTableInserter : public WriteBatch::Handler {
                  const Slice& value) override {
     // optimize for non-recovery mode
     if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
-      WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value);
-      return Status::OK();
+      return WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key,
+                                       value);
       // else insert the values to the memtable right away
     }
 
-    Status seek_status;
-    if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
+    Status ret_status;
+    if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
       bool batch_boundry = false;
       if (rebuilding_trx_ != nullptr) {
         assert(!write_after_commit_);
         // The CF is probably flushed and hence no need for insert but we still
         // need to keep track of the keys for upcoming rollback/commit.
-        WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key,
-                                  value);
+        ret_status = WriteBatchInternal::Merge(rebuilding_trx_,
+                                               column_family_id, key, value);
+        assert(ret_status.ok());
         batch_boundry = IsDuplicateKeySeq(column_family_id, key);
       }
       MaybeAdvanceSeq(batch_boundry);
-      return seek_status;
+      return ret_status;
     }
 
-    Status ret_status;
     MemTable* mem = cf_mems_->GetMemTable();
     auto* moptions = mem->GetImmutableMemTableOptions();
     bool perform_merge = false;
@@ -1757,7 +1800,9 @@ class MemTableInserter : public WriteBatch::Handler {
       assert(!write_after_commit_);
       // If the ret_status is TryAgain then let the next try to add the ky to
       // the rebuilding transaction object.
-      WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value);
+      ret_status = WriteBatchInternal::Merge(rebuilding_trx_, column_family_id,
+                                             key, value);
+      assert(ret_status.ok());
     }
     MaybeAdvanceSeq();
     CheckMemtableFull();