]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/util/comparator.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / util / comparator.cc
index 96e770489b49d14cce3b295f1c78d604a9cea81b..f85ed69ee6c9ca7b7b2c6d8d6defcf0f17c7f450 100644 (file)
@@ -8,20 +8,30 @@
 // found in the LICENSE file. See the AUTHORS file for names of contributors.
 
 #include "rocksdb/comparator.h"
+
 #include <stdint.h>
+
 #include <algorithm>
 #include <memory>
+#include <mutex>
+#include <sstream>
+
+#include "db/dbformat.h"
+#include "port/lang.h"
 #include "port/port.h"
+#include "rocksdb/convenience.h"
 #include "rocksdb/slice.h"
+#include "rocksdb/utilities/customizable_util.h"
+#include "rocksdb/utilities/object_registry.h"
 
 namespace ROCKSDB_NAMESPACE {
 
 namespace {
 class BytewiseComparatorImpl : public Comparator {
  public:
-  BytewiseComparatorImpl() { }
-
-  const char* Name() const override { return "leveldb.BytewiseComparator"; }
+  BytewiseComparatorImpl() {}
+  static const char* kClassName() { return "leveldb.BytewiseComparator"; }
+  const char* Name() const override { return kClassName(); }
 
   int Compare(const Slice& a, const Slice& b) const override {
     return a.compare(b);
@@ -87,7 +97,7 @@ class BytewiseComparatorImpl : public Comparator {
       const uint8_t byte = (*key)[i];
       if (byte != static_cast<uint8_t>(0xff)) {
         (*key)[i] = byte + 1;
-        key->resize(i+1);
+        key->resize(i + 1);
         return;
       }
     }
@@ -129,15 +139,20 @@ class BytewiseComparatorImpl : public Comparator {
                               bool /*b_has_ts*/) const override {
     return a.compare(b);
   }
+
+  bool EqualWithoutTimestamp(const Slice& a, const Slice& b) const override {
+    return a == b;
+  }
 };
 
 class ReverseBytewiseComparatorImpl : public BytewiseComparatorImpl {
  public:
-  ReverseBytewiseComparatorImpl() { }
+  ReverseBytewiseComparatorImpl() {}
 
-  const char* Name() const override {
+  static const char* kClassName() {
     return "rocksdb.ReverseBytewiseComparator";
   }
+  const char* Name() const override { return kClassName(); }
 
   int Compare(const Slice& a, const Slice& b) const override {
     return -a.compare(b);
@@ -194,6 +209,16 @@ class ReverseBytewiseComparatorImpl : public BytewiseComparatorImpl {
     // Don't do anything for simplicity.
   }
 
+  bool IsSameLengthImmediateSuccessor(const Slice& s,
+                                      const Slice& t) const override {
+    // Always returning false to prevent surfacing design flaws in
+    // auto_prefix_mode
+    (void)s, (void)t;
+    return false;
+    // "Correct" implementation:
+    // return BytewiseComparatorImpl::IsSameLengthImmediateSuccessor(t, s);
+  }
+
   bool CanKeysWithDifferentByteContentsBeEqual() const override {
     return false;
   }
@@ -204,16 +229,163 @@ class ReverseBytewiseComparatorImpl : public BytewiseComparatorImpl {
     return -a.compare(b);
   }
 };
-}// namespace
+
+// EXPERIMENTAL
+// Comparator with 64-bit integer timestamp.
+// We did not performance test this yet.
+template <typename TComparator>
+class ComparatorWithU64TsImpl : public Comparator {
+  static_assert(std::is_base_of<Comparator, TComparator>::value,
+                "template type must be a inherited type of comparator");
+
+ public:
+  explicit ComparatorWithU64TsImpl() : Comparator(/*ts_sz=*/sizeof(uint64_t)) {
+    assert(cmp_without_ts_.timestamp_size() == 0);
+  }
+
+  static const char* kClassName() {
+    static std::string class_name = kClassNameInternal();
+    return class_name.c_str();
+  }
+
+  const char* Name() const override { return kClassName(); }
+
+  void FindShortSuccessor(std::string*) const override {}
+  void FindShortestSeparator(std::string*, const Slice&) const override {}
+  int Compare(const Slice& a, const Slice& b) const override {
+    int ret = CompareWithoutTimestamp(a, b);
+    size_t ts_sz = timestamp_size();
+    if (ret != 0) {
+      return ret;
+    }
+    // Compare timestamp.
+    // For the same user key with different timestamps, larger (newer) timestamp
+    // comes first.
+    return -CompareTimestamp(ExtractTimestampFromUserKey(a, ts_sz),
+                             ExtractTimestampFromUserKey(b, ts_sz));
+  }
+  using Comparator::CompareWithoutTimestamp;
+  int CompareWithoutTimestamp(const Slice& a, bool a_has_ts, const Slice& b,
+                              bool b_has_ts) const override {
+    const size_t ts_sz = timestamp_size();
+    assert(!a_has_ts || a.size() >= ts_sz);
+    assert(!b_has_ts || b.size() >= ts_sz);
+    Slice lhs = a_has_ts ? StripTimestampFromUserKey(a, ts_sz) : a;
+    Slice rhs = b_has_ts ? StripTimestampFromUserKey(b, ts_sz) : b;
+    return cmp_without_ts_.Compare(lhs, rhs);
+  }
+  int CompareTimestamp(const Slice& ts1, const Slice& ts2) const override {
+    assert(ts1.size() == sizeof(uint64_t));
+    assert(ts2.size() == sizeof(uint64_t));
+    uint64_t lhs = DecodeFixed64(ts1.data());
+    uint64_t rhs = DecodeFixed64(ts2.data());
+    if (lhs < rhs) {
+      return -1;
+    } else if (lhs > rhs) {
+      return 1;
+    } else {
+      return 0;
+    }
+  }
+
+ private:
+  static std::string kClassNameInternal() {
+    std::stringstream ss;
+    ss << TComparator::kClassName() << ".u64ts";
+    return ss.str();
+  }
+
+  TComparator cmp_without_ts_;
+};
+
+}  // namespace
 
 const Comparator* BytewiseComparator() {
-  static BytewiseComparatorImpl bytewise;
+  STATIC_AVOID_DESTRUCTION(BytewiseComparatorImpl, bytewise);
   return &bytewise;
 }
 
 const Comparator* ReverseBytewiseComparator() {
-  static ReverseBytewiseComparatorImpl rbytewise;
+  STATIC_AVOID_DESTRUCTION(ReverseBytewiseComparatorImpl, rbytewise);
   return &rbytewise;
 }
 
+const Comparator* BytewiseComparatorWithU64Ts() {
+  STATIC_AVOID_DESTRUCTION(ComparatorWithU64TsImpl<BytewiseComparatorImpl>,
+                           comp_with_u64_ts);
+  return &comp_with_u64_ts;
+}
+
+#ifndef ROCKSDB_LITE
+static int RegisterBuiltinComparators(ObjectLibrary& library,
+                                      const std::string& /*arg*/) {
+  library.AddFactory<const Comparator>(
+      BytewiseComparatorImpl::kClassName(),
+      [](const std::string& /*uri*/,
+         std::unique_ptr<const Comparator>* /*guard */,
+         std::string* /* errmsg */) { return BytewiseComparator(); });
+  library.AddFactory<const Comparator>(
+      ReverseBytewiseComparatorImpl::kClassName(),
+      [](const std::string& /*uri*/,
+         std::unique_ptr<const Comparator>* /*guard */,
+         std::string* /* errmsg */) { return ReverseBytewiseComparator(); });
+  library.AddFactory<const Comparator>(
+      ComparatorWithU64TsImpl<BytewiseComparatorImpl>::kClassName(),
+      [](const std::string& /*uri*/,
+         std::unique_ptr<const Comparator>* /*guard */,
+         std::string* /* errmsg */) { return BytewiseComparatorWithU64Ts(); });
+  return 3;
+}
+#endif  // ROCKSDB_LITE
+
+Status Comparator::CreateFromString(const ConfigOptions& config_options,
+                                    const std::string& value,
+                                    const Comparator** result) {
+#ifndef ROCKSDB_LITE
+  static std::once_flag once;
+  std::call_once(once, [&]() {
+    RegisterBuiltinComparators(*(ObjectLibrary::Default().get()), "");
+  });
+#endif  // ROCKSDB_LITE
+  std::string id;
+  std::unordered_map<std::string, std::string> opt_map;
+  Status status = Customizable::GetOptionsMap(config_options, *result, value,
+                                              &id, &opt_map);
+  if (!status.ok()) {  // GetOptionsMap failed
+    return status;
+  }
+  if (id == BytewiseComparatorImpl::kClassName()) {
+    *result = BytewiseComparator();
+  } else if (id == ReverseBytewiseComparatorImpl::kClassName()) {
+    *result = ReverseBytewiseComparator();
+  } else if (id ==
+             ComparatorWithU64TsImpl<BytewiseComparatorImpl>::kClassName()) {
+    *result = BytewiseComparatorWithU64Ts();
+  } else if (value.empty()) {
+    // No Id and no options.  Clear the object
+    *result = nullptr;
+    return Status::OK();
+  } else if (id.empty()) {  // We have no Id but have options.  Not good
+    return Status::NotSupported("Cannot reset object ", id);
+  } else {
+#ifndef ROCKSDB_LITE
+    status = config_options.registry->NewStaticObject(id, result);
+#else
+    status = Status::NotSupported("Cannot load object in LITE mode ", id);
+#endif  // ROCKSDB_LITE
+    if (!status.ok()) {
+      if (config_options.ignore_unsupported_options &&
+          status.IsNotSupported()) {
+        return Status::OK();
+      } else {
+        return status;
+      }
+    } else {
+      Comparator* comparator = const_cast<Comparator*>(*result);
+      status =
+          Customizable::ConfigureNewObject(config_options, comparator, opt_map);
+    }
+  }
+  return status;
+}
 }  // namespace ROCKSDB_NAMESPACE