]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/util/comparator.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / util / comparator.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "rocksdb/comparator.h"
11
12 #include <stdint.h>
13
14 #include <algorithm>
15 #include <memory>
16 #include <mutex>
17 #include <sstream>
18
19 #include "db/dbformat.h"
20 #include "port/lang.h"
21 #include "port/port.h"
22 #include "rocksdb/convenience.h"
23 #include "rocksdb/slice.h"
24 #include "rocksdb/utilities/customizable_util.h"
25 #include "rocksdb/utilities/object_registry.h"
26
27 namespace ROCKSDB_NAMESPACE {
28
29 namespace {
30 class BytewiseComparatorImpl : public Comparator {
31 public:
32 BytewiseComparatorImpl() {}
33 static const char* kClassName() { return "leveldb.BytewiseComparator"; }
34 const char* Name() const override { return kClassName(); }
35
36 int Compare(const Slice& a, const Slice& b) const override {
37 return a.compare(b);
38 }
39
40 bool Equal(const Slice& a, const Slice& b) const override { return a == b; }
41
42 void FindShortestSeparator(std::string* start,
43 const Slice& limit) const override {
44 // Find length of common prefix
45 size_t min_length = std::min(start->size(), limit.size());
46 size_t diff_index = 0;
47 while ((diff_index < min_length) &&
48 ((*start)[diff_index] == limit[diff_index])) {
49 diff_index++;
50 }
51
52 if (diff_index >= min_length) {
53 // Do not shorten if one string is a prefix of the other
54 } else {
55 uint8_t start_byte = static_cast<uint8_t>((*start)[diff_index]);
56 uint8_t limit_byte = static_cast<uint8_t>(limit[diff_index]);
57 if (start_byte >= limit_byte) {
58 // Cannot shorten since limit is smaller than start or start is
59 // already the shortest possible.
60 return;
61 }
62 assert(start_byte < limit_byte);
63
64 if (diff_index < limit.size() - 1 || start_byte + 1 < limit_byte) {
65 (*start)[diff_index]++;
66 start->resize(diff_index + 1);
67 } else {
68 // v
69 // A A 1 A A A
70 // A A 2
71 //
72 // Incrementing the current byte will make start bigger than limit, we
73 // will skip this byte, and find the first non 0xFF byte in start and
74 // increment it.
75 diff_index++;
76
77 while (diff_index < start->size()) {
78 // Keep moving until we find the first non 0xFF byte to
79 // increment it
80 if (static_cast<uint8_t>((*start)[diff_index]) <
81 static_cast<uint8_t>(0xff)) {
82 (*start)[diff_index]++;
83 start->resize(diff_index + 1);
84 break;
85 }
86 diff_index++;
87 }
88 }
89 assert(Compare(*start, limit) < 0);
90 }
91 }
92
93 void FindShortSuccessor(std::string* key) const override {
94 // Find first character that can be incremented
95 size_t n = key->size();
96 for (size_t i = 0; i < n; i++) {
97 const uint8_t byte = (*key)[i];
98 if (byte != static_cast<uint8_t>(0xff)) {
99 (*key)[i] = byte + 1;
100 key->resize(i + 1);
101 return;
102 }
103 }
104 // *key is a run of 0xffs. Leave it alone.
105 }
106
107 bool IsSameLengthImmediateSuccessor(const Slice& s,
108 const Slice& t) const override {
109 if (s.size() != t.size() || s.size() == 0) {
110 return false;
111 }
112 size_t diff_ind = s.difference_offset(t);
113 // same slice
114 if (diff_ind >= s.size()) return false;
115 uint8_t byte_s = static_cast<uint8_t>(s[diff_ind]);
116 uint8_t byte_t = static_cast<uint8_t>(t[diff_ind]);
117 // first different byte must be consecutive, and remaining bytes must be
118 // 0xff for s and 0x00 for t
119 if (byte_s != uint8_t{0xff} && byte_s + 1 == byte_t) {
120 for (size_t i = diff_ind + 1; i < s.size(); ++i) {
121 byte_s = static_cast<uint8_t>(s[i]);
122 byte_t = static_cast<uint8_t>(t[i]);
123 if (byte_s != uint8_t{0xff} || byte_t != uint8_t{0x00}) {
124 return false;
125 }
126 }
127 return true;
128 } else {
129 return false;
130 }
131 }
132
133 bool CanKeysWithDifferentByteContentsBeEqual() const override {
134 return false;
135 }
136
137 using Comparator::CompareWithoutTimestamp;
138 int CompareWithoutTimestamp(const Slice& a, bool /*a_has_ts*/, const Slice& b,
139 bool /*b_has_ts*/) const override {
140 return a.compare(b);
141 }
142
143 bool EqualWithoutTimestamp(const Slice& a, const Slice& b) const override {
144 return a == b;
145 }
146 };
147
148 class ReverseBytewiseComparatorImpl : public BytewiseComparatorImpl {
149 public:
150 ReverseBytewiseComparatorImpl() {}
151
152 static const char* kClassName() {
153 return "rocksdb.ReverseBytewiseComparator";
154 }
155 const char* Name() const override { return kClassName(); }
156
157 int Compare(const Slice& a, const Slice& b) const override {
158 return -a.compare(b);
159 }
160
161 void FindShortestSeparator(std::string* start,
162 const Slice& limit) const override {
163 // Find length of common prefix
164 size_t min_length = std::min(start->size(), limit.size());
165 size_t diff_index = 0;
166 while ((diff_index < min_length) &&
167 ((*start)[diff_index] == limit[diff_index])) {
168 diff_index++;
169 }
170
171 assert(diff_index <= min_length);
172 if (diff_index == min_length) {
173 // Do not shorten if one string is a prefix of the other
174 //
175 // We could handle cases like:
176 // V
177 // A A 2 X Y
178 // A A 2
179 // in a similar way as BytewiseComparator::FindShortestSeparator().
180 // We keep it simple by not implementing it. We can come back to it
181 // later when needed.
182 } else {
183 uint8_t start_byte = static_cast<uint8_t>((*start)[diff_index]);
184 uint8_t limit_byte = static_cast<uint8_t>(limit[diff_index]);
185 if (start_byte > limit_byte && diff_index < start->size() - 1) {
186 // Case like
187 // V
188 // A A 3 A A
189 // A A 1 B B
190 //
191 // or
192 // v
193 // A A 2 A A
194 // A A 1 B B
195 // In this case "AA2" will be good.
196 #ifndef NDEBUG
197 std::string old_start = *start;
198 #endif
199 start->resize(diff_index + 1);
200 #ifndef NDEBUG
201 assert(old_start >= *start);
202 #endif
203 assert(Slice(*start).compare(limit) > 0);
204 }
205 }
206 }
207
208 void FindShortSuccessor(std::string* /*key*/) const override {
209 // Don't do anything for simplicity.
210 }
211
212 bool IsSameLengthImmediateSuccessor(const Slice& s,
213 const Slice& t) const override {
214 // Always returning false to prevent surfacing design flaws in
215 // auto_prefix_mode
216 (void)s, (void)t;
217 return false;
218 // "Correct" implementation:
219 // return BytewiseComparatorImpl::IsSameLengthImmediateSuccessor(t, s);
220 }
221
222 bool CanKeysWithDifferentByteContentsBeEqual() const override {
223 return false;
224 }
225
226 using Comparator::CompareWithoutTimestamp;
227 int CompareWithoutTimestamp(const Slice& a, bool /*a_has_ts*/, const Slice& b,
228 bool /*b_has_ts*/) const override {
229 return -a.compare(b);
230 }
231 };
232
233 // EXPERIMENTAL
234 // Comparator with 64-bit integer timestamp.
235 // We did not performance test this yet.
236 template <typename TComparator>
237 class ComparatorWithU64TsImpl : public Comparator {
238 static_assert(std::is_base_of<Comparator, TComparator>::value,
239 "template type must be a inherited type of comparator");
240
241 public:
242 explicit ComparatorWithU64TsImpl() : Comparator(/*ts_sz=*/sizeof(uint64_t)) {
243 assert(cmp_without_ts_.timestamp_size() == 0);
244 }
245
246 static const char* kClassName() {
247 static std::string class_name = kClassNameInternal();
248 return class_name.c_str();
249 }
250
251 const char* Name() const override { return kClassName(); }
252
253 void FindShortSuccessor(std::string*) const override {}
254 void FindShortestSeparator(std::string*, const Slice&) const override {}
255 int Compare(const Slice& a, const Slice& b) const override {
256 int ret = CompareWithoutTimestamp(a, b);
257 size_t ts_sz = timestamp_size();
258 if (ret != 0) {
259 return ret;
260 }
261 // Compare timestamp.
262 // For the same user key with different timestamps, larger (newer) timestamp
263 // comes first.
264 return -CompareTimestamp(ExtractTimestampFromUserKey(a, ts_sz),
265 ExtractTimestampFromUserKey(b, ts_sz));
266 }
267 using Comparator::CompareWithoutTimestamp;
268 int CompareWithoutTimestamp(const Slice& a, bool a_has_ts, const Slice& b,
269 bool b_has_ts) const override {
270 const size_t ts_sz = timestamp_size();
271 assert(!a_has_ts || a.size() >= ts_sz);
272 assert(!b_has_ts || b.size() >= ts_sz);
273 Slice lhs = a_has_ts ? StripTimestampFromUserKey(a, ts_sz) : a;
274 Slice rhs = b_has_ts ? StripTimestampFromUserKey(b, ts_sz) : b;
275 return cmp_without_ts_.Compare(lhs, rhs);
276 }
277 int CompareTimestamp(const Slice& ts1, const Slice& ts2) const override {
278 assert(ts1.size() == sizeof(uint64_t));
279 assert(ts2.size() == sizeof(uint64_t));
280 uint64_t lhs = DecodeFixed64(ts1.data());
281 uint64_t rhs = DecodeFixed64(ts2.data());
282 if (lhs < rhs) {
283 return -1;
284 } else if (lhs > rhs) {
285 return 1;
286 } else {
287 return 0;
288 }
289 }
290
291 private:
292 static std::string kClassNameInternal() {
293 std::stringstream ss;
294 ss << TComparator::kClassName() << ".u64ts";
295 return ss.str();
296 }
297
298 TComparator cmp_without_ts_;
299 };
300
301 } // namespace
302
303 const Comparator* BytewiseComparator() {
304 STATIC_AVOID_DESTRUCTION(BytewiseComparatorImpl, bytewise);
305 return &bytewise;
306 }
307
308 const Comparator* ReverseBytewiseComparator() {
309 STATIC_AVOID_DESTRUCTION(ReverseBytewiseComparatorImpl, rbytewise);
310 return &rbytewise;
311 }
312
313 const Comparator* BytewiseComparatorWithU64Ts() {
314 STATIC_AVOID_DESTRUCTION(ComparatorWithU64TsImpl<BytewiseComparatorImpl>,
315 comp_with_u64_ts);
316 return &comp_with_u64_ts;
317 }
318
319 #ifndef ROCKSDB_LITE
320 static int RegisterBuiltinComparators(ObjectLibrary& library,
321 const std::string& /*arg*/) {
322 library.AddFactory<const Comparator>(
323 BytewiseComparatorImpl::kClassName(),
324 [](const std::string& /*uri*/,
325 std::unique_ptr<const Comparator>* /*guard */,
326 std::string* /* errmsg */) { return BytewiseComparator(); });
327 library.AddFactory<const Comparator>(
328 ReverseBytewiseComparatorImpl::kClassName(),
329 [](const std::string& /*uri*/,
330 std::unique_ptr<const Comparator>* /*guard */,
331 std::string* /* errmsg */) { return ReverseBytewiseComparator(); });
332 library.AddFactory<const Comparator>(
333 ComparatorWithU64TsImpl<BytewiseComparatorImpl>::kClassName(),
334 [](const std::string& /*uri*/,
335 std::unique_ptr<const Comparator>* /*guard */,
336 std::string* /* errmsg */) { return BytewiseComparatorWithU64Ts(); });
337 return 3;
338 }
339 #endif // ROCKSDB_LITE
340
341 Status Comparator::CreateFromString(const ConfigOptions& config_options,
342 const std::string& value,
343 const Comparator** result) {
344 #ifndef ROCKSDB_LITE
345 static std::once_flag once;
346 std::call_once(once, [&]() {
347 RegisterBuiltinComparators(*(ObjectLibrary::Default().get()), "");
348 });
349 #endif // ROCKSDB_LITE
350 std::string id;
351 std::unordered_map<std::string, std::string> opt_map;
352 Status status = Customizable::GetOptionsMap(config_options, *result, value,
353 &id, &opt_map);
354 if (!status.ok()) { // GetOptionsMap failed
355 return status;
356 }
357 if (id == BytewiseComparatorImpl::kClassName()) {
358 *result = BytewiseComparator();
359 } else if (id == ReverseBytewiseComparatorImpl::kClassName()) {
360 *result = ReverseBytewiseComparator();
361 } else if (id ==
362 ComparatorWithU64TsImpl<BytewiseComparatorImpl>::kClassName()) {
363 *result = BytewiseComparatorWithU64Ts();
364 } else if (value.empty()) {
365 // No Id and no options. Clear the object
366 *result = nullptr;
367 return Status::OK();
368 } else if (id.empty()) { // We have no Id but have options. Not good
369 return Status::NotSupported("Cannot reset object ", id);
370 } else {
371 #ifndef ROCKSDB_LITE
372 status = config_options.registry->NewStaticObject(id, result);
373 #else
374 status = Status::NotSupported("Cannot load object in LITE mode ", id);
375 #endif // ROCKSDB_LITE
376 if (!status.ok()) {
377 if (config_options.ignore_unsupported_options &&
378 status.IsNotSupported()) {
379 return Status::OK();
380 } else {
381 return status;
382 }
383 } else {
384 Comparator* comparator = const_cast<Comparator*>(*result);
385 status =
386 Customizable::ConfigureNewObject(config_options, comparator, opt_map);
387 }
388 }
389 return status;
390 }
391 } // namespace ROCKSDB_NAMESPACE