]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/merge_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / merge_test.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6#include <assert.h>
20effc67 7
7c673cae 8#include <iostream>
20effc67 9#include <memory>
7c673cae 10
f67539c2
TL
11#include "db/db_impl/db_impl.h"
12#include "db/dbformat.h"
13#include "db/write_batch_internal.h"
7c673cae
FG
14#include "port/stack_trace.h"
15#include "rocksdb/cache.h"
16#include "rocksdb/comparator.h"
17#include "rocksdb/db.h"
18#include "rocksdb/env.h"
19#include "rocksdb/merge_operator.h"
20#include "rocksdb/utilities/db_ttl.h"
f67539c2 21#include "test_util/testharness.h"
20effc67 22#include "util/coding.h"
7c673cae 23#include "utilities/merge_operators.h"
7c673cae 24
f67539c2 25namespace ROCKSDB_NAMESPACE {
11fdf7f2
TL
26
27bool use_compression;
28
29class MergeTest : public testing::Test {};
7c673cae 30
7c673cae
FG
31size_t num_merge_operator_calls;
32void resetNumMergeOperatorCalls() { num_merge_operator_calls = 0; }
33
34size_t num_partial_merge_calls;
35void resetNumPartialMergeCalls() { num_partial_merge_calls = 0; }
7c673cae
FG
36
37class CountMergeOperator : public AssociativeMergeOperator {
38 public:
39 CountMergeOperator() {
40 mergeOperator_ = MergeOperators::CreateUInt64AddOperator();
41 }
42
494da23a
TL
43 bool Merge(const Slice& key, const Slice* existing_value, const Slice& value,
44 std::string* new_value, Logger* logger) const override {
7c673cae
FG
45 assert(new_value->empty());
46 ++num_merge_operator_calls;
47 if (existing_value == nullptr) {
48 new_value->assign(value.data(), value.size());
49 return true;
50 }
51
52 return mergeOperator_->PartialMerge(
53 key,
54 *existing_value,
55 value,
56 new_value,
57 logger);
58 }
59
494da23a
TL
60 bool PartialMergeMulti(const Slice& key,
61 const std::deque<Slice>& operand_list,
62 std::string* new_value,
63 Logger* logger) const override {
7c673cae
FG
64 assert(new_value->empty());
65 ++num_partial_merge_calls;
66 return mergeOperator_->PartialMergeMulti(key, operand_list, new_value,
67 logger);
68 }
69
494da23a 70 const char* Name() const override { return "UInt64AddOperator"; }
7c673cae
FG
71
72 private:
73 std::shared_ptr<MergeOperator> mergeOperator_;
74};
75
7c673cae
FG
76std::shared_ptr<DB> OpenDb(const std::string& dbname, const bool ttl = false,
77 const size_t max_successive_merges = 0) {
78 DB* db;
79 Options options;
80 options.create_if_missing = true;
81 options.merge_operator = std::make_shared<CountMergeOperator>();
82 options.max_successive_merges = max_successive_merges;
83 Status s;
84 DestroyDB(dbname, Options());
85// DBWithTTL is not supported in ROCKSDB_LITE
86#ifndef ROCKSDB_LITE
87 if (ttl) {
7c673cae
FG
88 DBWithTTL* db_with_ttl;
89 s = DBWithTTL::Open(options, dbname, &db_with_ttl);
90 db = db_with_ttl;
91 } else {
92 s = DB::Open(options, dbname, &db);
93 }
94#else
95 assert(!ttl);
96 s = DB::Open(options, dbname, &db);
97#endif // !ROCKSDB_LITE
98 if (!s.ok()) {
99 std::cerr << s.ToString() << std::endl;
100 assert(false);
101 }
102 return std::shared_ptr<DB>(db);
103}
7c673cae
FG
104
105// Imagine we are maintaining a set of uint64 counters.
106// Each counter has a distinct name. And we would like
107// to support four high level operations:
108// set, add, get and remove
109// This is a quick implementation without a Merge operation.
110class Counters {
111
112 protected:
113 std::shared_ptr<DB> db_;
114
115 WriteOptions put_option_;
116 ReadOptions get_option_;
117 WriteOptions delete_option_;
118
119 uint64_t default_;
120
121 public:
122 explicit Counters(std::shared_ptr<DB> db, uint64_t defaultCount = 0)
123 : db_(db),
124 put_option_(),
125 get_option_(),
126 delete_option_(),
127 default_(defaultCount) {
128 assert(db_);
129 }
130
131 virtual ~Counters() {}
132
133 // public interface of Counters.
134 // All four functions return false
135 // if the underlying level db operation failed.
136
137 // mapped to a levedb Put
138 bool set(const std::string& key, uint64_t value) {
139 // just treat the internal rep of int64 as the string
140 char buf[sizeof(value)];
141 EncodeFixed64(buf, value);
142 Slice slice(buf, sizeof(value));
143 auto s = db_->Put(put_option_, key, slice);
144
145 if (s.ok()) {
146 return true;
147 } else {
148 std::cerr << s.ToString() << std::endl;
149 return false;
150 }
151 }
152
153 // mapped to a rocksdb Delete
154 bool remove(const std::string& key) {
155 auto s = db_->Delete(delete_option_, key);
156
157 if (s.ok()) {
158 return true;
159 } else {
160 std::cerr << s.ToString() << std::endl;
161 return false;
162 }
163 }
164
165 // mapped to a rocksdb Get
166 bool get(const std::string& key, uint64_t* value) {
167 std::string str;
168 auto s = db_->Get(get_option_, key, &str);
169
170 if (s.IsNotFound()) {
171 // return default value if not found;
172 *value = default_;
173 return true;
174 } else if (s.ok()) {
175 // deserialization
176 if (str.size() != sizeof(uint64_t)) {
177 std::cerr << "value corruption\n";
178 return false;
179 }
180 *value = DecodeFixed64(&str[0]);
181 return true;
182 } else {
183 std::cerr << s.ToString() << std::endl;
184 return false;
185 }
186 }
187
188 // 'add' is implemented as get -> modify -> set
189 // An alternative is a single merge operation, see MergeBasedCounters
190 virtual bool add(const std::string& key, uint64_t value) {
191 uint64_t base = default_;
192 return get(key, &base) && set(key, base + value);
193 }
194
195
196 // convenience functions for testing
197 void assert_set(const std::string& key, uint64_t value) {
198 assert(set(key, value));
199 }
200
201 void assert_remove(const std::string& key) { assert(remove(key)); }
202
203 uint64_t assert_get(const std::string& key) {
204 uint64_t value = default_;
205 int result = get(key, &value);
206 assert(result);
207 if (result == 0) exit(1); // Disable unused variable warning.
208 return value;
209 }
210
211 void assert_add(const std::string& key, uint64_t value) {
212 int result = add(key, value);
213 assert(result);
214 if (result == 0) exit(1); // Disable unused variable warning.
215 }
216};
217
218// Implement 'add' directly with the new Merge operation
219class MergeBasedCounters : public Counters {
220 private:
221 WriteOptions merge_option_; // for merge
222
223 public:
224 explicit MergeBasedCounters(std::shared_ptr<DB> db, uint64_t defaultCount = 0)
225 : Counters(db, defaultCount),
226 merge_option_() {
227 }
228
229 // mapped to a rocksdb Merge operation
494da23a 230 bool add(const std::string& key, uint64_t value) override {
7c673cae
FG
231 char encoded[sizeof(uint64_t)];
232 EncodeFixed64(encoded, value);
233 Slice slice(encoded, sizeof(uint64_t));
234 auto s = db_->Merge(merge_option_, key, slice);
235
236 if (s.ok()) {
237 return true;
238 } else {
239 std::cerr << s.ToString() << std::endl;
240 return false;
241 }
242 }
243};
244
7c673cae 245void dumpDb(DB* db) {
494da23a 246 auto it = std::unique_ptr<Iterator>(db->NewIterator(ReadOptions()));
7c673cae 247 for (it->SeekToFirst(); it->Valid(); it->Next()) {
11fdf7f2
TL
248 //uint64_t value = DecodeFixed64(it->value().data());
249 //std::cout << it->key().ToString() << ": " << value << std::endl;
7c673cae
FG
250 }
251 assert(it->status().ok()); // Check for any errors found during the scan
252}
253
254void testCounters(Counters& counters, DB* db, bool test_compaction) {
255
256 FlushOptions o;
257 o.wait = true;
258
259 counters.assert_set("a", 1);
260
261 if (test_compaction) db->Flush(o);
262
263 assert(counters.assert_get("a") == 1);
264
265 counters.assert_remove("b");
266
267 // defaut value is 0 if non-existent
268 assert(counters.assert_get("b") == 0);
269
270 counters.assert_add("a", 2);
271
272 if (test_compaction) db->Flush(o);
273
274 // 1+2 = 3
275 assert(counters.assert_get("a")== 3);
276
277 dumpDb(db);
278
7c673cae
FG
279 // 1+...+49 = ?
280 uint64_t sum = 0;
281 for (int i = 1; i < 50; i++) {
282 counters.assert_add("b", i);
283 sum += i;
284 }
285 assert(counters.assert_get("b") == sum);
286
7c673cae
FG
287 dumpDb(db);
288
7c673cae
FG
289 if (test_compaction) {
290 db->Flush(o);
291
7c673cae 292 db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
7c673cae
FG
293
294 dumpDb(db);
295
296 assert(counters.assert_get("a")== 3);
297 assert(counters.assert_get("b") == sum);
298 }
299}
300
20effc67
TL
301void testCountersWithFlushAndCompaction(Counters& counters, DB* db) {
302 ASSERT_OK(db->Put({}, "1", "1"));
303 ASSERT_OK(db->Flush(FlushOptions()));
304
305 std::atomic<int> cnt{0};
306 const auto get_thread_id = [&cnt]() {
307 thread_local int thread_id{cnt++};
308 return thread_id;
309 };
310 SyncPoint::GetInstance()->DisableProcessing();
311 SyncPoint::GetInstance()->ClearAllCallBacks();
312 SyncPoint::GetInstance()->SetCallBack(
313 "VersionSet::LogAndApply:BeforeWriterWaiting", [&](void* /*arg*/) {
314 int thread_id = get_thread_id();
315 if (1 == thread_id) {
316 TEST_SYNC_POINT(
317 "testCountersWithFlushAndCompaction::bg_compact_thread:0");
318 } else if (2 == thread_id) {
319 TEST_SYNC_POINT(
320 "testCountersWithFlushAndCompaction::bg_flush_thread:0");
321 }
322 });
323 SyncPoint::GetInstance()->SetCallBack(
324 "VersionSet::LogAndApply:WriteManifest", [&](void* /*arg*/) {
325 int thread_id = get_thread_id();
326 if (0 == thread_id) {
327 TEST_SYNC_POINT(
328 "testCountersWithFlushAndCompaction::set_options_thread:0");
329 TEST_SYNC_POINT(
330 "testCountersWithFlushAndCompaction::set_options_thread:1");
331 }
332 });
333 SyncPoint::GetInstance()->SetCallBack(
334 "VersionSet::LogAndApply:WakeUpAndDone", [&](void* arg) {
335 auto* mutex = reinterpret_cast<InstrumentedMutex*>(arg);
336 mutex->AssertHeld();
337 int thread_id = get_thread_id();
338 ASSERT_EQ(2, thread_id);
339 mutex->Unlock();
340 TEST_SYNC_POINT(
341 "testCountersWithFlushAndCompaction::bg_flush_thread:1");
342 TEST_SYNC_POINT(
343 "testCountersWithFlushAndCompaction::bg_flush_thread:2");
344 mutex->Lock();
345 });
346 SyncPoint::GetInstance()->LoadDependency({
347 {"testCountersWithFlushAndCompaction::set_options_thread:0",
348 "testCountersWithCompactionAndFlush:BeforeCompact"},
349 {"testCountersWithFlushAndCompaction::bg_compact_thread:0",
350 "testCountersWithFlushAndCompaction:BeforeIncCounters"},
351 {"testCountersWithFlushAndCompaction::bg_flush_thread:0",
352 "testCountersWithFlushAndCompaction::set_options_thread:1"},
353 {"testCountersWithFlushAndCompaction::bg_flush_thread:1",
354 "testCountersWithFlushAndCompaction:BeforeVerification"},
355 {"testCountersWithFlushAndCompaction:AfterGet",
356 "testCountersWithFlushAndCompaction::bg_flush_thread:2"},
357 });
358 SyncPoint::GetInstance()->EnableProcessing();
359
360 port::Thread set_options_thread([&]() {
361 ASSERT_OK(reinterpret_cast<DBImpl*>(db)->SetOptions(
362 {{"disable_auto_compactions", "false"}}));
363 });
364 TEST_SYNC_POINT("testCountersWithCompactionAndFlush:BeforeCompact");
365 port::Thread compact_thread([&]() {
366 ASSERT_OK(reinterpret_cast<DBImpl*>(db)->CompactRange(
367 CompactRangeOptions(), db->DefaultColumnFamily(), nullptr, nullptr));
368 });
369
370 TEST_SYNC_POINT("testCountersWithFlushAndCompaction:BeforeIncCounters");
371 counters.add("test-key", 1);
372
373 FlushOptions flush_opts;
374 flush_opts.wait = false;
375 ASSERT_OK(db->Flush(flush_opts));
376
377 TEST_SYNC_POINT("testCountersWithFlushAndCompaction:BeforeVerification");
378 std::string expected;
379 PutFixed64(&expected, 1);
380 std::string actual;
381 Status s = db->Get(ReadOptions(), "test-key", &actual);
382 TEST_SYNC_POINT("testCountersWithFlushAndCompaction:AfterGet");
383 set_options_thread.join();
384 compact_thread.join();
385 ASSERT_OK(s);
386 ASSERT_EQ(expected, actual);
387 SyncPoint::GetInstance()->DisableProcessing();
388 SyncPoint::GetInstance()->ClearAllCallBacks();
389}
390
7c673cae
FG
391void testSuccessiveMerge(Counters& counters, size_t max_num_merges,
392 size_t num_merges) {
393
394 counters.assert_remove("z");
395 uint64_t sum = 0;
396
397 for (size_t i = 1; i <= num_merges; ++i) {
398 resetNumMergeOperatorCalls();
399 counters.assert_add("z", i);
400 sum += i;
401
402 if (i % (max_num_merges + 1) == 0) {
403 assert(num_merge_operator_calls == max_num_merges + 1);
404 } else {
405 assert(num_merge_operator_calls == 0);
406 }
407
408 resetNumMergeOperatorCalls();
409 assert(counters.assert_get("z") == sum);
410 assert(num_merge_operator_calls == i % (max_num_merges + 1));
411 }
412}
413
414void testPartialMerge(Counters* counters, DB* db, size_t max_merge,
415 size_t min_merge, size_t count) {
416 FlushOptions o;
417 o.wait = true;
418
419 // Test case 1: partial merge should be called when the number of merge
420 // operands exceeds the threshold.
421 uint64_t tmp_sum = 0;
422 resetNumPartialMergeCalls();
423 for (size_t i = 1; i <= count; i++) {
424 counters->assert_add("b", i);
425 tmp_sum += i;
426 }
427 db->Flush(o);
428 db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
429 ASSERT_EQ(tmp_sum, counters->assert_get("b"));
430 if (count > max_merge) {
431 // in this case, FullMerge should be called instead.
432 ASSERT_EQ(num_partial_merge_calls, 0U);
433 } else {
434 // if count >= min_merge, then partial merge should be called once.
435 ASSERT_EQ((count >= min_merge), (num_partial_merge_calls == 1));
436 }
437
438 // Test case 2: partial merge should not be called when a put is found.
439 resetNumPartialMergeCalls();
440 tmp_sum = 0;
f67539c2 441 db->Put(ROCKSDB_NAMESPACE::WriteOptions(), "c", "10");
7c673cae
FG
442 for (size_t i = 1; i <= count; i++) {
443 counters->assert_add("c", i);
444 tmp_sum += i;
445 }
446 db->Flush(o);
447 db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
448 ASSERT_EQ(tmp_sum, counters->assert_get("c"));
449 ASSERT_EQ(num_partial_merge_calls, 0U);
450}
451
452void testSingleBatchSuccessiveMerge(DB* db, size_t max_num_merges,
453 size_t num_merges) {
454 assert(num_merges > max_num_merges);
455
456 Slice key("BatchSuccessiveMerge");
457 uint64_t merge_value = 1;
458 char buf[sizeof(merge_value)];
459 EncodeFixed64(buf, merge_value);
460 Slice merge_value_slice(buf, sizeof(merge_value));
461
462 // Create the batch
463 WriteBatch batch;
464 for (size_t i = 0; i < num_merges; ++i) {
465 batch.Merge(key, merge_value_slice);
466 }
467
468 // Apply to memtable and count the number of merges
469 resetNumMergeOperatorCalls();
470 {
471 Status s = db->Write(WriteOptions(), &batch);
472 assert(s.ok());
473 }
474 ASSERT_EQ(
475 num_merge_operator_calls,
476 static_cast<size_t>(num_merges - (num_merges % (max_num_merges + 1))));
477
478 // Get the value
479 resetNumMergeOperatorCalls();
480 std::string get_value_str;
481 {
482 Status s = db->Get(ReadOptions(), key, &get_value_str);
483 assert(s.ok());
484 }
485 assert(get_value_str.size() == sizeof(uint64_t));
486 uint64_t get_value = DecodeFixed64(&get_value_str[0]);
487 ASSERT_EQ(get_value, num_merges * merge_value);
488 ASSERT_EQ(num_merge_operator_calls,
489 static_cast<size_t>((num_merges % (max_num_merges + 1))));
490}
491
11fdf7f2 492void runTest(const std::string& dbname, const bool use_ttl = false) {
7c673cae
FG
493
494 {
495 auto db = OpenDb(dbname, use_ttl);
496
497 {
7c673cae
FG
498 Counters counters(db, 0);
499 testCounters(counters, db.get(), true);
500 }
501
502 {
7c673cae 503 MergeBasedCounters counters(db, 0);
11fdf7f2 504 testCounters(counters, db.get(), use_compression);
7c673cae
FG
505 }
506 }
507
508 DestroyDB(dbname, Options());
509
510 {
7c673cae
FG
511 size_t max_merge = 5;
512 auto db = OpenDb(dbname, use_ttl, max_merge);
513 MergeBasedCounters counters(db, 0);
11fdf7f2 514 testCounters(counters, db.get(), use_compression);
7c673cae
FG
515 testSuccessiveMerge(counters, max_merge, max_merge * 2);
516 testSingleBatchSuccessiveMerge(db.get(), 5, 7);
517 DestroyDB(dbname, Options());
518 }
519
520 {
7c673cae
FG
521 size_t max_merge = 100;
522 // Min merge is hard-coded to 2.
523 uint32_t min_merge = 2;
524 for (uint32_t count = min_merge - 1; count <= min_merge + 1; count++) {
525 auto db = OpenDb(dbname, use_ttl, max_merge);
526 MergeBasedCounters counters(db, 0);
527 testPartialMerge(&counters, db.get(), max_merge, min_merge, count);
528 DestroyDB(dbname, Options());
529 }
530 {
531 auto db = OpenDb(dbname, use_ttl, max_merge);
532 MergeBasedCounters counters(db, 0);
533 testPartialMerge(&counters, db.get(), max_merge, min_merge,
534 min_merge * 10);
535 DestroyDB(dbname, Options());
536 }
537 }
538
539 {
7c673cae
FG
540 {
541 auto db = OpenDb(dbname);
542 MergeBasedCounters counters(db, 0);
543 counters.add("test-key", 1);
544 counters.add("test-key", 1);
545 counters.add("test-key", 1);
546 db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
547 }
548
549 DB* reopen_db;
550 ASSERT_OK(DB::Open(Options(), dbname, &reopen_db));
551 std::string value;
552 ASSERT_TRUE(!(reopen_db->Get(ReadOptions(), "test-key", &value).ok()));
553 delete reopen_db;
554 DestroyDB(dbname, Options());
555 }
556
557 /* Temporary remove this test
558 {
559 std::cout << "Test merge-operator not set after reopen (recovery case)\n";
560 {
561 auto db = OpenDb(dbname);
562 MergeBasedCounters counters(db, 0);
563 counters.add("test-key", 1);
564 counters.add("test-key", 1);
565 counters.add("test-key", 1);
566 }
567
568 DB* reopen_db;
569 ASSERT_TRUE(DB::Open(Options(), dbname, &reopen_db).IsInvalidArgument());
570 }
571 */
572}
7c673cae 573
11fdf7f2
TL
574TEST_F(MergeTest, MergeDbTest) {
575 runTest(test::PerThreadDBPath("merge_testdb"));
576}
577
7c673cae 578#ifndef ROCKSDB_LITE
11fdf7f2
TL
579TEST_F(MergeTest, MergeDbTtlTest) {
580 runTest(test::PerThreadDBPath("merge_testdbttl"),
581 true); // Run test on TTL database
582}
20effc67
TL
583
584TEST_F(MergeTest, MergeWithCompactionAndFlush) {
585 const std::string dbname =
586 test::PerThreadDBPath("merge_with_compaction_and_flush");
587 {
588 auto db = OpenDb(dbname);
589 {
590 MergeBasedCounters counters(db, 0);
591 testCountersWithFlushAndCompaction(counters, db.get());
592 }
593 }
594 DestroyDB(dbname, Options());
595}
7c673cae 596#endif // !ROCKSDB_LITE
11fdf7f2 597
f67539c2 598} // namespace ROCKSDB_NAMESPACE
11fdf7f2
TL
599
600int main(int argc, char** argv) {
f67539c2 601 ROCKSDB_NAMESPACE::use_compression = false;
11fdf7f2 602 if (argc > 1) {
f67539c2 603 ROCKSDB_NAMESPACE::use_compression = true;
11fdf7f2
TL
604 }
605
f67539c2 606 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
11fdf7f2
TL
607 ::testing::InitGoogleTest(&argc, argv);
608 return RUN_ALL_TESTS();
7c673cae 609}