// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "rocksdb/comparator.h"
+
#include <stdint.h>
+
#include <algorithm>
#include <memory>
+#include <mutex>
+#include <sstream>
+
+#include "db/dbformat.h"
+#include "port/lang.h"
#include "port/port.h"
+#include "rocksdb/convenience.h"
#include "rocksdb/slice.h"
+#include "rocksdb/utilities/customizable_util.h"
+#include "rocksdb/utilities/object_registry.h"
namespace ROCKSDB_NAMESPACE {
namespace {
class BytewiseComparatorImpl : public Comparator {
public:
- BytewiseComparatorImpl() { }
-
- const char* Name() const override { return "leveldb.BytewiseComparator"; }
+ BytewiseComparatorImpl() {}
+ static const char* kClassName() { return "leveldb.BytewiseComparator"; }
+ const char* Name() const override { return kClassName(); }
int Compare(const Slice& a, const Slice& b) const override {
return a.compare(b);
const uint8_t byte = (*key)[i];
if (byte != static_cast<uint8_t>(0xff)) {
(*key)[i] = byte + 1;
- key->resize(i+1);
+ key->resize(i + 1);
return;
}
}
bool /*b_has_ts*/) const override {
return a.compare(b);
}
+
+ bool EqualWithoutTimestamp(const Slice& a, const Slice& b) const override {
+ return a == b;
+ }
};
class ReverseBytewiseComparatorImpl : public BytewiseComparatorImpl {
public:
- ReverseBytewiseComparatorImpl() { }
+ ReverseBytewiseComparatorImpl() {}
- const char* Name() const override {
+ static const char* kClassName() {
return "rocksdb.ReverseBytewiseComparator";
}
+ const char* Name() const override { return kClassName(); }
int Compare(const Slice& a, const Slice& b) const override {
return -a.compare(b);
// Don't do anything for simplicity.
}
+ bool IsSameLengthImmediateSuccessor(const Slice& s,
+ const Slice& t) const override {
+ // Always returning false to prevent surfacing design flaws in
+ // auto_prefix_mode
+ (void)s, (void)t;
+ return false;
+ // "Correct" implementation:
+ // return BytewiseComparatorImpl::IsSameLengthImmediateSuccessor(t, s);
+ }
+
bool CanKeysWithDifferentByteContentsBeEqual() const override {
return false;
}
return -a.compare(b);
}
};
-}// namespace
+
+// EXPERIMENTAL
+// Comparator with 64-bit integer timestamp.
+// We did not performance test this yet.
+template <typename TComparator>
+class ComparatorWithU64TsImpl : public Comparator {
+ static_assert(std::is_base_of<Comparator, TComparator>::value,
+ "template type must be a inherited type of comparator");
+
+ public:
+ explicit ComparatorWithU64TsImpl() : Comparator(/*ts_sz=*/sizeof(uint64_t)) {
+ assert(cmp_without_ts_.timestamp_size() == 0);
+ }
+
+ static const char* kClassName() {
+ static std::string class_name = kClassNameInternal();
+ return class_name.c_str();
+ }
+
+ const char* Name() const override { return kClassName(); }
+
+ void FindShortSuccessor(std::string*) const override {}
+ void FindShortestSeparator(std::string*, const Slice&) const override {}
+ int Compare(const Slice& a, const Slice& b) const override {
+ int ret = CompareWithoutTimestamp(a, b);
+ size_t ts_sz = timestamp_size();
+ if (ret != 0) {
+ return ret;
+ }
+ // Compare timestamp.
+ // For the same user key with different timestamps, larger (newer) timestamp
+ // comes first.
+ return -CompareTimestamp(ExtractTimestampFromUserKey(a, ts_sz),
+ ExtractTimestampFromUserKey(b, ts_sz));
+ }
+ using Comparator::CompareWithoutTimestamp;
+ int CompareWithoutTimestamp(const Slice& a, bool a_has_ts, const Slice& b,
+ bool b_has_ts) const override {
+ const size_t ts_sz = timestamp_size();
+ assert(!a_has_ts || a.size() >= ts_sz);
+ assert(!b_has_ts || b.size() >= ts_sz);
+ Slice lhs = a_has_ts ? StripTimestampFromUserKey(a, ts_sz) : a;
+ Slice rhs = b_has_ts ? StripTimestampFromUserKey(b, ts_sz) : b;
+ return cmp_without_ts_.Compare(lhs, rhs);
+ }
+ int CompareTimestamp(const Slice& ts1, const Slice& ts2) const override {
+ assert(ts1.size() == sizeof(uint64_t));
+ assert(ts2.size() == sizeof(uint64_t));
+ uint64_t lhs = DecodeFixed64(ts1.data());
+ uint64_t rhs = DecodeFixed64(ts2.data());
+ if (lhs < rhs) {
+ return -1;
+ } else if (lhs > rhs) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+
+ private:
+ static std::string kClassNameInternal() {
+ std::stringstream ss;
+ ss << TComparator::kClassName() << ".u64ts";
+ return ss.str();
+ }
+
+ TComparator cmp_without_ts_;
+};
+
+} // namespace
const Comparator* BytewiseComparator() {
- static BytewiseComparatorImpl bytewise;
+ STATIC_AVOID_DESTRUCTION(BytewiseComparatorImpl, bytewise);
return &bytewise;
}
const Comparator* ReverseBytewiseComparator() {
- static ReverseBytewiseComparatorImpl rbytewise;
+ STATIC_AVOID_DESTRUCTION(ReverseBytewiseComparatorImpl, rbytewise);
return &rbytewise;
}
+const Comparator* BytewiseComparatorWithU64Ts() {
+ STATIC_AVOID_DESTRUCTION(ComparatorWithU64TsImpl<BytewiseComparatorImpl>,
+ comp_with_u64_ts);
+ return &comp_with_u64_ts;
+}
+
+#ifndef ROCKSDB_LITE
+static int RegisterBuiltinComparators(ObjectLibrary& library,
+ const std::string& /*arg*/) {
+ library.AddFactory<const Comparator>(
+ BytewiseComparatorImpl::kClassName(),
+ [](const std::string& /*uri*/,
+ std::unique_ptr<const Comparator>* /*guard */,
+ std::string* /* errmsg */) { return BytewiseComparator(); });
+ library.AddFactory<const Comparator>(
+ ReverseBytewiseComparatorImpl::kClassName(),
+ [](const std::string& /*uri*/,
+ std::unique_ptr<const Comparator>* /*guard */,
+ std::string* /* errmsg */) { return ReverseBytewiseComparator(); });
+ library.AddFactory<const Comparator>(
+ ComparatorWithU64TsImpl<BytewiseComparatorImpl>::kClassName(),
+ [](const std::string& /*uri*/,
+ std::unique_ptr<const Comparator>* /*guard */,
+ std::string* /* errmsg */) { return BytewiseComparatorWithU64Ts(); });
+ return 3;
+}
+#endif // ROCKSDB_LITE
+
+Status Comparator::CreateFromString(const ConfigOptions& config_options,
+ const std::string& value,
+ const Comparator** result) {
+#ifndef ROCKSDB_LITE
+ static std::once_flag once;
+ std::call_once(once, [&]() {
+ RegisterBuiltinComparators(*(ObjectLibrary::Default().get()), "");
+ });
+#endif // ROCKSDB_LITE
+ std::string id;
+ std::unordered_map<std::string, std::string> opt_map;
+ Status status = Customizable::GetOptionsMap(config_options, *result, value,
+ &id, &opt_map);
+ if (!status.ok()) { // GetOptionsMap failed
+ return status;
+ }
+ if (id == BytewiseComparatorImpl::kClassName()) {
+ *result = BytewiseComparator();
+ } else if (id == ReverseBytewiseComparatorImpl::kClassName()) {
+ *result = ReverseBytewiseComparator();
+ } else if (id ==
+ ComparatorWithU64TsImpl<BytewiseComparatorImpl>::kClassName()) {
+ *result = BytewiseComparatorWithU64Ts();
+ } else if (value.empty()) {
+ // No Id and no options. Clear the object
+ *result = nullptr;
+ return Status::OK();
+ } else if (id.empty()) { // We have no Id but have options. Not good
+ return Status::NotSupported("Cannot reset object ", id);
+ } else {
+#ifndef ROCKSDB_LITE
+ status = config_options.registry->NewStaticObject(id, result);
+#else
+ status = Status::NotSupported("Cannot load object in LITE mode ", id);
+#endif // ROCKSDB_LITE
+ if (!status.ok()) {
+ if (config_options.ignore_unsupported_options &&
+ status.IsNotSupported()) {
+ return Status::OK();
+ } else {
+ return status;
+ }
+ } else {
+ Comparator* comparator = const_cast<Comparator*>(*result);
+ status =
+ Customizable::ConfigureNewObject(config_options, comparator, opt_map);
+ }
+ }
+ return status;
+}
} // namespace ROCKSDB_NAMESPACE