]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/merge_test.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / db / merge_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 #include <assert.h>
7 #include <memory>
8 #include <iostream>
9
10 #include "port/stack_trace.h"
11 #include "rocksdb/cache.h"
12 #include "rocksdb/comparator.h"
13 #include "rocksdb/db.h"
14 #include "rocksdb/env.h"
15 #include "rocksdb/merge_operator.h"
16 #include "rocksdb/utilities/db_ttl.h"
17 #include "db/dbformat.h"
18 #include "db/db_impl.h"
19 #include "db/write_batch_internal.h"
20 #include "utilities/merge_operators.h"
21 #include "util/testharness.h"
22
23 namespace rocksdb {
24
25 bool use_compression;
26
27 class MergeTest : public testing::Test {};
28
29 size_t num_merge_operator_calls;
30 void resetNumMergeOperatorCalls() { num_merge_operator_calls = 0; }
31
32 size_t num_partial_merge_calls;
33 void resetNumPartialMergeCalls() { num_partial_merge_calls = 0; }
34
35 class CountMergeOperator : public AssociativeMergeOperator {
36 public:
37 CountMergeOperator() {
38 mergeOperator_ = MergeOperators::CreateUInt64AddOperator();
39 }
40
41 virtual bool Merge(const Slice& key,
42 const Slice* existing_value,
43 const Slice& value,
44 std::string* new_value,
45 Logger* logger) const override {
46 assert(new_value->empty());
47 ++num_merge_operator_calls;
48 if (existing_value == nullptr) {
49 new_value->assign(value.data(), value.size());
50 return true;
51 }
52
53 return mergeOperator_->PartialMerge(
54 key,
55 *existing_value,
56 value,
57 new_value,
58 logger);
59 }
60
61 virtual bool PartialMergeMulti(const Slice& key,
62 const std::deque<Slice>& operand_list,
63 std::string* new_value,
64 Logger* logger) const override {
65 assert(new_value->empty());
66 ++num_partial_merge_calls;
67 return mergeOperator_->PartialMergeMulti(key, operand_list, new_value,
68 logger);
69 }
70
71 virtual const char* Name() const override {
72 return "UInt64AddOperator";
73 }
74
75 private:
76 std::shared_ptr<MergeOperator> mergeOperator_;
77 };
78
79 std::shared_ptr<DB> OpenDb(const std::string& dbname, const bool ttl = false,
80 const size_t max_successive_merges = 0) {
81 DB* db;
82 Options options;
83 options.create_if_missing = true;
84 options.merge_operator = std::make_shared<CountMergeOperator>();
85 options.max_successive_merges = max_successive_merges;
86 Status s;
87 DestroyDB(dbname, Options());
88 // DBWithTTL is not supported in ROCKSDB_LITE
89 #ifndef ROCKSDB_LITE
90 if (ttl) {
91 DBWithTTL* db_with_ttl;
92 s = DBWithTTL::Open(options, dbname, &db_with_ttl);
93 db = db_with_ttl;
94 } else {
95 s = DB::Open(options, dbname, &db);
96 }
97 #else
98 assert(!ttl);
99 s = DB::Open(options, dbname, &db);
100 #endif // !ROCKSDB_LITE
101 if (!s.ok()) {
102 std::cerr << s.ToString() << std::endl;
103 assert(false);
104 }
105 return std::shared_ptr<DB>(db);
106 }
107
108 // Imagine we are maintaining a set of uint64 counters.
109 // Each counter has a distinct name. And we would like
110 // to support four high level operations:
111 // set, add, get and remove
112 // This is a quick implementation without a Merge operation.
113 class Counters {
114
115 protected:
116 std::shared_ptr<DB> db_;
117
118 WriteOptions put_option_;
119 ReadOptions get_option_;
120 WriteOptions delete_option_;
121
122 uint64_t default_;
123
124 public:
125 explicit Counters(std::shared_ptr<DB> db, uint64_t defaultCount = 0)
126 : db_(db),
127 put_option_(),
128 get_option_(),
129 delete_option_(),
130 default_(defaultCount) {
131 assert(db_);
132 }
133
134 virtual ~Counters() {}
135
136 // public interface of Counters.
137 // All four functions return false
138 // if the underlying level db operation failed.
139
140 // mapped to a levedb Put
141 bool set(const std::string& key, uint64_t value) {
142 // just treat the internal rep of int64 as the string
143 char buf[sizeof(value)];
144 EncodeFixed64(buf, value);
145 Slice slice(buf, sizeof(value));
146 auto s = db_->Put(put_option_, key, slice);
147
148 if (s.ok()) {
149 return true;
150 } else {
151 std::cerr << s.ToString() << std::endl;
152 return false;
153 }
154 }
155
156 // mapped to a rocksdb Delete
157 bool remove(const std::string& key) {
158 auto s = db_->Delete(delete_option_, key);
159
160 if (s.ok()) {
161 return true;
162 } else {
163 std::cerr << s.ToString() << std::endl;
164 return false;
165 }
166 }
167
168 // mapped to a rocksdb Get
169 bool get(const std::string& key, uint64_t* value) {
170 std::string str;
171 auto s = db_->Get(get_option_, key, &str);
172
173 if (s.IsNotFound()) {
174 // return default value if not found;
175 *value = default_;
176 return true;
177 } else if (s.ok()) {
178 // deserialization
179 if (str.size() != sizeof(uint64_t)) {
180 std::cerr << "value corruption\n";
181 return false;
182 }
183 *value = DecodeFixed64(&str[0]);
184 return true;
185 } else {
186 std::cerr << s.ToString() << std::endl;
187 return false;
188 }
189 }
190
191 // 'add' is implemented as get -> modify -> set
192 // An alternative is a single merge operation, see MergeBasedCounters
193 virtual bool add(const std::string& key, uint64_t value) {
194 uint64_t base = default_;
195 return get(key, &base) && set(key, base + value);
196 }
197
198
199 // convenience functions for testing
200 void assert_set(const std::string& key, uint64_t value) {
201 assert(set(key, value));
202 }
203
204 void assert_remove(const std::string& key) { assert(remove(key)); }
205
206 uint64_t assert_get(const std::string& key) {
207 uint64_t value = default_;
208 int result = get(key, &value);
209 assert(result);
210 if (result == 0) exit(1); // Disable unused variable warning.
211 return value;
212 }
213
214 void assert_add(const std::string& key, uint64_t value) {
215 int result = add(key, value);
216 assert(result);
217 if (result == 0) exit(1); // Disable unused variable warning.
218 }
219 };
220
221 // Implement 'add' directly with the new Merge operation
222 class MergeBasedCounters : public Counters {
223 private:
224 WriteOptions merge_option_; // for merge
225
226 public:
227 explicit MergeBasedCounters(std::shared_ptr<DB> db, uint64_t defaultCount = 0)
228 : Counters(db, defaultCount),
229 merge_option_() {
230 }
231
232 // mapped to a rocksdb Merge operation
233 virtual bool add(const std::string& key, uint64_t value) override {
234 char encoded[sizeof(uint64_t)];
235 EncodeFixed64(encoded, value);
236 Slice slice(encoded, sizeof(uint64_t));
237 auto s = db_->Merge(merge_option_, key, slice);
238
239 if (s.ok()) {
240 return true;
241 } else {
242 std::cerr << s.ToString() << std::endl;
243 return false;
244 }
245 }
246 };
247
248 void dumpDb(DB* db) {
249 auto it = unique_ptr<Iterator>(db->NewIterator(ReadOptions()));
250 for (it->SeekToFirst(); it->Valid(); it->Next()) {
251 //uint64_t value = DecodeFixed64(it->value().data());
252 //std::cout << it->key().ToString() << ": " << value << std::endl;
253 }
254 assert(it->status().ok()); // Check for any errors found during the scan
255 }
256
257 void testCounters(Counters& counters, DB* db, bool test_compaction) {
258
259 FlushOptions o;
260 o.wait = true;
261
262 counters.assert_set("a", 1);
263
264 if (test_compaction) db->Flush(o);
265
266 assert(counters.assert_get("a") == 1);
267
268 counters.assert_remove("b");
269
270 // defaut value is 0 if non-existent
271 assert(counters.assert_get("b") == 0);
272
273 counters.assert_add("a", 2);
274
275 if (test_compaction) db->Flush(o);
276
277 // 1+2 = 3
278 assert(counters.assert_get("a")== 3);
279
280 dumpDb(db);
281
282 // 1+...+49 = ?
283 uint64_t sum = 0;
284 for (int i = 1; i < 50; i++) {
285 counters.assert_add("b", i);
286 sum += i;
287 }
288 assert(counters.assert_get("b") == sum);
289
290 dumpDb(db);
291
292 if (test_compaction) {
293 db->Flush(o);
294
295 db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
296
297 dumpDb(db);
298
299 assert(counters.assert_get("a")== 3);
300 assert(counters.assert_get("b") == sum);
301 }
302 }
303
304 void testSuccessiveMerge(Counters& counters, size_t max_num_merges,
305 size_t num_merges) {
306
307 counters.assert_remove("z");
308 uint64_t sum = 0;
309
310 for (size_t i = 1; i <= num_merges; ++i) {
311 resetNumMergeOperatorCalls();
312 counters.assert_add("z", i);
313 sum += i;
314
315 if (i % (max_num_merges + 1) == 0) {
316 assert(num_merge_operator_calls == max_num_merges + 1);
317 } else {
318 assert(num_merge_operator_calls == 0);
319 }
320
321 resetNumMergeOperatorCalls();
322 assert(counters.assert_get("z") == sum);
323 assert(num_merge_operator_calls == i % (max_num_merges + 1));
324 }
325 }
326
327 void testPartialMerge(Counters* counters, DB* db, size_t max_merge,
328 size_t min_merge, size_t count) {
329 FlushOptions o;
330 o.wait = true;
331
332 // Test case 1: partial merge should be called when the number of merge
333 // operands exceeds the threshold.
334 uint64_t tmp_sum = 0;
335 resetNumPartialMergeCalls();
336 for (size_t i = 1; i <= count; i++) {
337 counters->assert_add("b", i);
338 tmp_sum += i;
339 }
340 db->Flush(o);
341 db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
342 ASSERT_EQ(tmp_sum, counters->assert_get("b"));
343 if (count > max_merge) {
344 // in this case, FullMerge should be called instead.
345 ASSERT_EQ(num_partial_merge_calls, 0U);
346 } else {
347 // if count >= min_merge, then partial merge should be called once.
348 ASSERT_EQ((count >= min_merge), (num_partial_merge_calls == 1));
349 }
350
351 // Test case 2: partial merge should not be called when a put is found.
352 resetNumPartialMergeCalls();
353 tmp_sum = 0;
354 db->Put(rocksdb::WriteOptions(), "c", "10");
355 for (size_t i = 1; i <= count; i++) {
356 counters->assert_add("c", i);
357 tmp_sum += i;
358 }
359 db->Flush(o);
360 db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
361 ASSERT_EQ(tmp_sum, counters->assert_get("c"));
362 ASSERT_EQ(num_partial_merge_calls, 0U);
363 }
364
365 void testSingleBatchSuccessiveMerge(DB* db, size_t max_num_merges,
366 size_t num_merges) {
367 assert(num_merges > max_num_merges);
368
369 Slice key("BatchSuccessiveMerge");
370 uint64_t merge_value = 1;
371 char buf[sizeof(merge_value)];
372 EncodeFixed64(buf, merge_value);
373 Slice merge_value_slice(buf, sizeof(merge_value));
374
375 // Create the batch
376 WriteBatch batch;
377 for (size_t i = 0; i < num_merges; ++i) {
378 batch.Merge(key, merge_value_slice);
379 }
380
381 // Apply to memtable and count the number of merges
382 resetNumMergeOperatorCalls();
383 {
384 Status s = db->Write(WriteOptions(), &batch);
385 assert(s.ok());
386 }
387 ASSERT_EQ(
388 num_merge_operator_calls,
389 static_cast<size_t>(num_merges - (num_merges % (max_num_merges + 1))));
390
391 // Get the value
392 resetNumMergeOperatorCalls();
393 std::string get_value_str;
394 {
395 Status s = db->Get(ReadOptions(), key, &get_value_str);
396 assert(s.ok());
397 }
398 assert(get_value_str.size() == sizeof(uint64_t));
399 uint64_t get_value = DecodeFixed64(&get_value_str[0]);
400 ASSERT_EQ(get_value, num_merges * merge_value);
401 ASSERT_EQ(num_merge_operator_calls,
402 static_cast<size_t>((num_merges % (max_num_merges + 1))));
403 }
404
405 void runTest(const std::string& dbname, const bool use_ttl = false) {
406
407 {
408 auto db = OpenDb(dbname, use_ttl);
409
410 {
411 Counters counters(db, 0);
412 testCounters(counters, db.get(), true);
413 }
414
415 {
416 MergeBasedCounters counters(db, 0);
417 testCounters(counters, db.get(), use_compression);
418 }
419 }
420
421 DestroyDB(dbname, Options());
422
423 {
424 size_t max_merge = 5;
425 auto db = OpenDb(dbname, use_ttl, max_merge);
426 MergeBasedCounters counters(db, 0);
427 testCounters(counters, db.get(), use_compression);
428 testSuccessiveMerge(counters, max_merge, max_merge * 2);
429 testSingleBatchSuccessiveMerge(db.get(), 5, 7);
430 DestroyDB(dbname, Options());
431 }
432
433 {
434 size_t max_merge = 100;
435 // Min merge is hard-coded to 2.
436 uint32_t min_merge = 2;
437 for (uint32_t count = min_merge - 1; count <= min_merge + 1; count++) {
438 auto db = OpenDb(dbname, use_ttl, max_merge);
439 MergeBasedCounters counters(db, 0);
440 testPartialMerge(&counters, db.get(), max_merge, min_merge, count);
441 DestroyDB(dbname, Options());
442 }
443 {
444 auto db = OpenDb(dbname, use_ttl, max_merge);
445 MergeBasedCounters counters(db, 0);
446 testPartialMerge(&counters, db.get(), max_merge, min_merge,
447 min_merge * 10);
448 DestroyDB(dbname, Options());
449 }
450 }
451
452 {
453 {
454 auto db = OpenDb(dbname);
455 MergeBasedCounters counters(db, 0);
456 counters.add("test-key", 1);
457 counters.add("test-key", 1);
458 counters.add("test-key", 1);
459 db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
460 }
461
462 DB* reopen_db;
463 ASSERT_OK(DB::Open(Options(), dbname, &reopen_db));
464 std::string value;
465 ASSERT_TRUE(!(reopen_db->Get(ReadOptions(), "test-key", &value).ok()));
466 delete reopen_db;
467 DestroyDB(dbname, Options());
468 }
469
470 /* Temporary remove this test
471 {
472 std::cout << "Test merge-operator not set after reopen (recovery case)\n";
473 {
474 auto db = OpenDb(dbname);
475 MergeBasedCounters counters(db, 0);
476 counters.add("test-key", 1);
477 counters.add("test-key", 1);
478 counters.add("test-key", 1);
479 }
480
481 DB* reopen_db;
482 ASSERT_TRUE(DB::Open(Options(), dbname, &reopen_db).IsInvalidArgument());
483 }
484 */
485 }
486
487 TEST_F(MergeTest, MergeDbTest) {
488 runTest(test::PerThreadDBPath("merge_testdb"));
489 }
490
491 #ifndef ROCKSDB_LITE
492 TEST_F(MergeTest, MergeDbTtlTest) {
493 runTest(test::PerThreadDBPath("merge_testdbttl"),
494 true); // Run test on TTL database
495 }
496 #endif // !ROCKSDB_LITE
497
498 } // namespace rocksdb
499
500 int main(int argc, char** argv) {
501 rocksdb::use_compression = false;
502 if (argc > 1) {
503 rocksdb::use_compression = true;
504 }
505
506 rocksdb::port::InstallStackTraceHandler();
507 ::testing::InitGoogleTest(&argc, argv);
508 return RUN_ALL_TESTS();
509 }