1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "rocksdb/comparator.h"
19 #include "db/dbformat.h"
20 #include "port/lang.h"
21 #include "port/port.h"
22 #include "rocksdb/convenience.h"
23 #include "rocksdb/slice.h"
24 #include "rocksdb/utilities/customizable_util.h"
25 #include "rocksdb/utilities/object_registry.h"
27 namespace ROCKSDB_NAMESPACE
{
30 class BytewiseComparatorImpl
: public Comparator
{
32 BytewiseComparatorImpl() {}
33 static const char* kClassName() { return "leveldb.BytewiseComparator"; }
34 const char* Name() const override
{ return kClassName(); }
36 int Compare(const Slice
& a
, const Slice
& b
) const override
{
40 bool Equal(const Slice
& a
, const Slice
& b
) const override
{ return a
== b
; }
42 void FindShortestSeparator(std::string
* start
,
43 const Slice
& limit
) const override
{
44 // Find length of common prefix
45 size_t min_length
= std::min(start
->size(), limit
.size());
46 size_t diff_index
= 0;
47 while ((diff_index
< min_length
) &&
48 ((*start
)[diff_index
] == limit
[diff_index
])) {
52 if (diff_index
>= min_length
) {
53 // Do not shorten if one string is a prefix of the other
55 uint8_t start_byte
= static_cast<uint8_t>((*start
)[diff_index
]);
56 uint8_t limit_byte
= static_cast<uint8_t>(limit
[diff_index
]);
57 if (start_byte
>= limit_byte
) {
58 // Cannot shorten since limit is smaller than start or start is
59 // already the shortest possible.
62 assert(start_byte
< limit_byte
);
64 if (diff_index
< limit
.size() - 1 || start_byte
+ 1 < limit_byte
) {
65 (*start
)[diff_index
]++;
66 start
->resize(diff_index
+ 1);
72 // Incrementing the current byte will make start bigger than limit, we
73 // will skip this byte, and find the first non 0xFF byte in start and
77 while (diff_index
< start
->size()) {
78 // Keep moving until we find the first non 0xFF byte to
80 if (static_cast<uint8_t>((*start
)[diff_index
]) <
81 static_cast<uint8_t>(0xff)) {
82 (*start
)[diff_index
]++;
83 start
->resize(diff_index
+ 1);
89 assert(Compare(*start
, limit
) < 0);
93 void FindShortSuccessor(std::string
* key
) const override
{
94 // Find first character that can be incremented
95 size_t n
= key
->size();
96 for (size_t i
= 0; i
< n
; i
++) {
97 const uint8_t byte
= (*key
)[i
];
98 if (byte
!= static_cast<uint8_t>(0xff)) {
104 // *key is a run of 0xffs. Leave it alone.
107 bool IsSameLengthImmediateSuccessor(const Slice
& s
,
108 const Slice
& t
) const override
{
109 if (s
.size() != t
.size() || s
.size() == 0) {
112 size_t diff_ind
= s
.difference_offset(t
);
114 if (diff_ind
>= s
.size()) return false;
115 uint8_t byte_s
= static_cast<uint8_t>(s
[diff_ind
]);
116 uint8_t byte_t
= static_cast<uint8_t>(t
[diff_ind
]);
117 // first different byte must be consecutive, and remaining bytes must be
118 // 0xff for s and 0x00 for t
119 if (byte_s
!= uint8_t{0xff} && byte_s
+ 1 == byte_t
) {
120 for (size_t i
= diff_ind
+ 1; i
< s
.size(); ++i
) {
121 byte_s
= static_cast<uint8_t>(s
[i
]);
122 byte_t
= static_cast<uint8_t>(t
[i
]);
123 if (byte_s
!= uint8_t{0xff} || byte_t
!= uint8_t{0x00}) {
133 bool CanKeysWithDifferentByteContentsBeEqual() const override
{
137 using Comparator::CompareWithoutTimestamp
;
138 int CompareWithoutTimestamp(const Slice
& a
, bool /*a_has_ts*/, const Slice
& b
,
139 bool /*b_has_ts*/) const override
{
143 bool EqualWithoutTimestamp(const Slice
& a
, const Slice
& b
) const override
{
148 class ReverseBytewiseComparatorImpl
: public BytewiseComparatorImpl
{
150 ReverseBytewiseComparatorImpl() {}
152 static const char* kClassName() {
153 return "rocksdb.ReverseBytewiseComparator";
155 const char* Name() const override
{ return kClassName(); }
157 int Compare(const Slice
& a
, const Slice
& b
) const override
{
158 return -a
.compare(b
);
161 void FindShortestSeparator(std::string
* start
,
162 const Slice
& limit
) const override
{
163 // Find length of common prefix
164 size_t min_length
= std::min(start
->size(), limit
.size());
165 size_t diff_index
= 0;
166 while ((diff_index
< min_length
) &&
167 ((*start
)[diff_index
] == limit
[diff_index
])) {
171 assert(diff_index
<= min_length
);
172 if (diff_index
== min_length
) {
173 // Do not shorten if one string is a prefix of the other
175 // We could handle cases like:
179 // in a similar way as BytewiseComparator::FindShortestSeparator().
180 // We keep it simple by not implementing it. We can come back to it
181 // later when needed.
183 uint8_t start_byte
= static_cast<uint8_t>((*start
)[diff_index
]);
184 uint8_t limit_byte
= static_cast<uint8_t>(limit
[diff_index
]);
185 if (start_byte
> limit_byte
&& diff_index
< start
->size() - 1) {
195 // In this case "AA2" will be good.
197 std::string old_start
= *start
;
199 start
->resize(diff_index
+ 1);
201 assert(old_start
>= *start
);
203 assert(Slice(*start
).compare(limit
) > 0);
208 void FindShortSuccessor(std::string
* /*key*/) const override
{
209 // Don't do anything for simplicity.
212 bool IsSameLengthImmediateSuccessor(const Slice
& s
,
213 const Slice
& t
) const override
{
214 // Always returning false to prevent surfacing design flaws in
218 // "Correct" implementation:
219 // return BytewiseComparatorImpl::IsSameLengthImmediateSuccessor(t, s);
222 bool CanKeysWithDifferentByteContentsBeEqual() const override
{
226 using Comparator::CompareWithoutTimestamp
;
227 int CompareWithoutTimestamp(const Slice
& a
, bool /*a_has_ts*/, const Slice
& b
,
228 bool /*b_has_ts*/) const override
{
229 return -a
.compare(b
);
234 // Comparator with 64-bit integer timestamp.
235 // We did not performance test this yet.
236 template <typename TComparator
>
237 class ComparatorWithU64TsImpl
: public Comparator
{
238 static_assert(std::is_base_of
<Comparator
, TComparator
>::value
,
239 "template type must be a inherited type of comparator");
242 explicit ComparatorWithU64TsImpl() : Comparator(/*ts_sz=*/sizeof(uint64_t)) {
243 assert(cmp_without_ts_
.timestamp_size() == 0);
246 static const char* kClassName() {
247 static std::string class_name
= kClassNameInternal();
248 return class_name
.c_str();
251 const char* Name() const override
{ return kClassName(); }
253 void FindShortSuccessor(std::string
*) const override
{}
254 void FindShortestSeparator(std::string
*, const Slice
&) const override
{}
255 int Compare(const Slice
& a
, const Slice
& b
) const override
{
256 int ret
= CompareWithoutTimestamp(a
, b
);
257 size_t ts_sz
= timestamp_size();
261 // Compare timestamp.
262 // For the same user key with different timestamps, larger (newer) timestamp
264 return -CompareTimestamp(ExtractTimestampFromUserKey(a
, ts_sz
),
265 ExtractTimestampFromUserKey(b
, ts_sz
));
267 using Comparator::CompareWithoutTimestamp
;
268 int CompareWithoutTimestamp(const Slice
& a
, bool a_has_ts
, const Slice
& b
,
269 bool b_has_ts
) const override
{
270 const size_t ts_sz
= timestamp_size();
271 assert(!a_has_ts
|| a
.size() >= ts_sz
);
272 assert(!b_has_ts
|| b
.size() >= ts_sz
);
273 Slice lhs
= a_has_ts
? StripTimestampFromUserKey(a
, ts_sz
) : a
;
274 Slice rhs
= b_has_ts
? StripTimestampFromUserKey(b
, ts_sz
) : b
;
275 return cmp_without_ts_
.Compare(lhs
, rhs
);
277 int CompareTimestamp(const Slice
& ts1
, const Slice
& ts2
) const override
{
278 assert(ts1
.size() == sizeof(uint64_t));
279 assert(ts2
.size() == sizeof(uint64_t));
280 uint64_t lhs
= DecodeFixed64(ts1
.data());
281 uint64_t rhs
= DecodeFixed64(ts2
.data());
284 } else if (lhs
> rhs
) {
292 static std::string
kClassNameInternal() {
293 std::stringstream ss
;
294 ss
<< TComparator::kClassName() << ".u64ts";
298 TComparator cmp_without_ts_
;
303 const Comparator
* BytewiseComparator() {
304 STATIC_AVOID_DESTRUCTION(BytewiseComparatorImpl
, bytewise
);
308 const Comparator
* ReverseBytewiseComparator() {
309 STATIC_AVOID_DESTRUCTION(ReverseBytewiseComparatorImpl
, rbytewise
);
313 const Comparator
* BytewiseComparatorWithU64Ts() {
314 STATIC_AVOID_DESTRUCTION(ComparatorWithU64TsImpl
<BytewiseComparatorImpl
>,
316 return &comp_with_u64_ts
;
320 static int RegisterBuiltinComparators(ObjectLibrary
& library
,
321 const std::string
& /*arg*/) {
322 library
.AddFactory
<const Comparator
>(
323 BytewiseComparatorImpl::kClassName(),
324 [](const std::string
& /*uri*/,
325 std::unique_ptr
<const Comparator
>* /*guard */,
326 std::string
* /* errmsg */) { return BytewiseComparator(); });
327 library
.AddFactory
<const Comparator
>(
328 ReverseBytewiseComparatorImpl::kClassName(),
329 [](const std::string
& /*uri*/,
330 std::unique_ptr
<const Comparator
>* /*guard */,
331 std::string
* /* errmsg */) { return ReverseBytewiseComparator(); });
332 library
.AddFactory
<const Comparator
>(
333 ComparatorWithU64TsImpl
<BytewiseComparatorImpl
>::kClassName(),
334 [](const std::string
& /*uri*/,
335 std::unique_ptr
<const Comparator
>* /*guard */,
336 std::string
* /* errmsg */) { return BytewiseComparatorWithU64Ts(); });
339 #endif // ROCKSDB_LITE
341 Status
Comparator::CreateFromString(const ConfigOptions
& config_options
,
342 const std::string
& value
,
343 const Comparator
** result
) {
345 static std::once_flag once
;
346 std::call_once(once
, [&]() {
347 RegisterBuiltinComparators(*(ObjectLibrary::Default().get()), "");
349 #endif // ROCKSDB_LITE
351 std::unordered_map
<std::string
, std::string
> opt_map
;
352 Status status
= Customizable::GetOptionsMap(config_options
, *result
, value
,
354 if (!status
.ok()) { // GetOptionsMap failed
357 if (id
== BytewiseComparatorImpl::kClassName()) {
358 *result
= BytewiseComparator();
359 } else if (id
== ReverseBytewiseComparatorImpl::kClassName()) {
360 *result
= ReverseBytewiseComparator();
362 ComparatorWithU64TsImpl
<BytewiseComparatorImpl
>::kClassName()) {
363 *result
= BytewiseComparatorWithU64Ts();
364 } else if (value
.empty()) {
365 // No Id and no options. Clear the object
368 } else if (id
.empty()) { // We have no Id but have options. Not good
369 return Status::NotSupported("Cannot reset object ", id
);
372 status
= config_options
.registry
->NewStaticObject(id
, result
);
374 status
= Status::NotSupported("Cannot load object in LITE mode ", id
);
375 #endif // ROCKSDB_LITE
377 if (config_options
.ignore_unsupported_options
&&
378 status
.IsNotSupported()) {
384 Comparator
* comparator
= const_cast<Comparator
*>(*result
);
386 Customizable::ConfigureNewObject(config_options
, comparator
, opt_map
);
391 } // namespace ROCKSDB_NAMESPACE