1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
10 #include "port/stack_trace.h"
11 #include "rocksdb/cache.h"
12 #include "rocksdb/comparator.h"
13 #include "rocksdb/db.h"
14 #include "rocksdb/env.h"
15 #include "rocksdb/merge_operator.h"
16 #include "rocksdb/utilities/db_ttl.h"
17 #include "db/dbformat.h"
18 #include "db/db_impl.h"
19 #include "db/write_batch_internal.h"
20 #include "utilities/merge_operators.h"
21 #include "util/testharness.h"
23 using namespace rocksdb
;
26 size_t num_merge_operator_calls
;
27 void resetNumMergeOperatorCalls() { num_merge_operator_calls
= 0; }
29 size_t num_partial_merge_calls
;
30 void resetNumPartialMergeCalls() { num_partial_merge_calls
= 0; }
33 class CountMergeOperator
: public AssociativeMergeOperator
{
35 CountMergeOperator() {
36 mergeOperator_
= MergeOperators::CreateUInt64AddOperator();
39 virtual bool Merge(const Slice
& key
,
40 const Slice
* existing_value
,
42 std::string
* new_value
,
43 Logger
* logger
) const override
{
44 assert(new_value
->empty());
45 ++num_merge_operator_calls
;
46 if (existing_value
== nullptr) {
47 new_value
->assign(value
.data(), value
.size());
51 return mergeOperator_
->PartialMerge(
59 virtual bool PartialMergeMulti(const Slice
& key
,
60 const std::deque
<Slice
>& operand_list
,
61 std::string
* new_value
,
62 Logger
* logger
) const override
{
63 assert(new_value
->empty());
64 ++num_partial_merge_calls
;
65 return mergeOperator_
->PartialMergeMulti(key
, operand_list
, new_value
,
69 virtual const char* Name() const override
{
70 return "UInt64AddOperator";
74 std::shared_ptr
<MergeOperator
> mergeOperator_
;
78 std::shared_ptr
<DB
> OpenDb(const std::string
& dbname
, const bool ttl
= false,
79 const size_t max_successive_merges
= 0) {
82 options
.create_if_missing
= true;
83 options
.merge_operator
= std::make_shared
<CountMergeOperator
>();
84 options
.max_successive_merges
= max_successive_merges
;
86 DestroyDB(dbname
, Options());
87 // DBWithTTL is not supported in ROCKSDB_LITE
90 std::cout
<< "Opening database with TTL\n";
91 DBWithTTL
* db_with_ttl
;
92 s
= DBWithTTL::Open(options
, dbname
, &db_with_ttl
);
95 s
= DB::Open(options
, dbname
, &db
);
99 s
= DB::Open(options
, dbname
, &db
);
100 #endif // !ROCKSDB_LITE
102 std::cerr
<< s
.ToString() << std::endl
;
105 return std::shared_ptr
<DB
>(db
);
109 // Imagine we are maintaining a set of uint64 counters.
110 // Each counter has a distinct name. And we would like
111 // to support four high level operations:
112 // set, add, get and remove
113 // This is a quick implementation without a Merge operation.
117 std::shared_ptr
<DB
> db_
;
119 WriteOptions put_option_
;
120 ReadOptions get_option_
;
121 WriteOptions delete_option_
;
126 explicit Counters(std::shared_ptr
<DB
> db
, uint64_t defaultCount
= 0)
131 default_(defaultCount
) {
135 virtual ~Counters() {}
137 // public interface of Counters.
138 // All four functions return false
139 // if the underlying level db operation failed.
141 // mapped to a levedb Put
142 bool set(const std::string
& key
, uint64_t value
) {
143 // just treat the internal rep of int64 as the string
144 char buf
[sizeof(value
)];
145 EncodeFixed64(buf
, value
);
146 Slice
slice(buf
, sizeof(value
));
147 auto s
= db_
->Put(put_option_
, key
, slice
);
152 std::cerr
<< s
.ToString() << std::endl
;
157 // mapped to a rocksdb Delete
158 bool remove(const std::string
& key
) {
159 auto s
= db_
->Delete(delete_option_
, key
);
164 std::cerr
<< s
.ToString() << std::endl
;
169 // mapped to a rocksdb Get
170 bool get(const std::string
& key
, uint64_t* value
) {
172 auto s
= db_
->Get(get_option_
, key
, &str
);
174 if (s
.IsNotFound()) {
175 // return default value if not found;
180 if (str
.size() != sizeof(uint64_t)) {
181 std::cerr
<< "value corruption\n";
184 *value
= DecodeFixed64(&str
[0]);
187 std::cerr
<< s
.ToString() << std::endl
;
192 // 'add' is implemented as get -> modify -> set
193 // An alternative is a single merge operation, see MergeBasedCounters
194 virtual bool add(const std::string
& key
, uint64_t value
) {
195 uint64_t base
= default_
;
196 return get(key
, &base
) && set(key
, base
+ value
);
200 // convenience functions for testing
201 void assert_set(const std::string
& key
, uint64_t value
) {
202 assert(set(key
, value
));
205 void assert_remove(const std::string
& key
) { assert(remove(key
)); }
207 uint64_t assert_get(const std::string
& key
) {
208 uint64_t value
= default_
;
209 int result
= get(key
, &value
);
211 if (result
== 0) exit(1); // Disable unused variable warning.
215 void assert_add(const std::string
& key
, uint64_t value
) {
216 int result
= add(key
, value
);
218 if (result
== 0) exit(1); // Disable unused variable warning.
222 // Implement 'add' directly with the new Merge operation
223 class MergeBasedCounters
: public Counters
{
225 WriteOptions merge_option_
; // for merge
228 explicit MergeBasedCounters(std::shared_ptr
<DB
> db
, uint64_t defaultCount
= 0)
229 : Counters(db
, defaultCount
),
233 // mapped to a rocksdb Merge operation
234 virtual bool add(const std::string
& key
, uint64_t value
) override
{
235 char encoded
[sizeof(uint64_t)];
236 EncodeFixed64(encoded
, value
);
237 Slice
slice(encoded
, sizeof(uint64_t));
238 auto s
= db_
->Merge(merge_option_
, key
, slice
);
243 std::cerr
<< s
.ToString() << std::endl
;
250 void dumpDb(DB
* db
) {
251 auto it
= unique_ptr
<Iterator
>(db
->NewIterator(ReadOptions()));
252 for (it
->SeekToFirst(); it
->Valid(); it
->Next()) {
253 uint64_t value
= DecodeFixed64(it
->value().data());
254 std::cout
<< it
->key().ToString() << ": " << value
<< std::endl
;
256 assert(it
->status().ok()); // Check for any errors found during the scan
259 void testCounters(Counters
& counters
, DB
* db
, bool test_compaction
) {
264 counters
.assert_set("a", 1);
266 if (test_compaction
) db
->Flush(o
);
268 assert(counters
.assert_get("a") == 1);
270 counters
.assert_remove("b");
272 // defaut value is 0 if non-existent
273 assert(counters
.assert_get("b") == 0);
275 counters
.assert_add("a", 2);
277 if (test_compaction
) db
->Flush(o
);
280 assert(counters
.assert_get("a")== 3);
288 for (int i
= 1; i
< 50; i
++) {
289 counters
.assert_add("b", i
);
292 assert(counters
.assert_get("b") == sum
);
299 if (test_compaction
) {
302 std::cout
<< "Compaction started ...\n";
303 db
->CompactRange(CompactRangeOptions(), nullptr, nullptr);
304 std::cout
<< "Compaction ended\n";
308 assert(counters
.assert_get("a")== 3);
309 assert(counters
.assert_get("b") == sum
);
313 void testSuccessiveMerge(Counters
& counters
, size_t max_num_merges
,
316 counters
.assert_remove("z");
319 for (size_t i
= 1; i
<= num_merges
; ++i
) {
320 resetNumMergeOperatorCalls();
321 counters
.assert_add("z", i
);
324 if (i
% (max_num_merges
+ 1) == 0) {
325 assert(num_merge_operator_calls
== max_num_merges
+ 1);
327 assert(num_merge_operator_calls
== 0);
330 resetNumMergeOperatorCalls();
331 assert(counters
.assert_get("z") == sum
);
332 assert(num_merge_operator_calls
== i
% (max_num_merges
+ 1));
336 void testPartialMerge(Counters
* counters
, DB
* db
, size_t max_merge
,
337 size_t min_merge
, size_t count
) {
341 // Test case 1: partial merge should be called when the number of merge
342 // operands exceeds the threshold.
343 uint64_t tmp_sum
= 0;
344 resetNumPartialMergeCalls();
345 for (size_t i
= 1; i
<= count
; i
++) {
346 counters
->assert_add("b", i
);
350 db
->CompactRange(CompactRangeOptions(), nullptr, nullptr);
351 ASSERT_EQ(tmp_sum
, counters
->assert_get("b"));
352 if (count
> max_merge
) {
353 // in this case, FullMerge should be called instead.
354 ASSERT_EQ(num_partial_merge_calls
, 0U);
356 // if count >= min_merge, then partial merge should be called once.
357 ASSERT_EQ((count
>= min_merge
), (num_partial_merge_calls
== 1));
360 // Test case 2: partial merge should not be called when a put is found.
361 resetNumPartialMergeCalls();
363 db
->Put(rocksdb::WriteOptions(), "c", "10");
364 for (size_t i
= 1; i
<= count
; i
++) {
365 counters
->assert_add("c", i
);
369 db
->CompactRange(CompactRangeOptions(), nullptr, nullptr);
370 ASSERT_EQ(tmp_sum
, counters
->assert_get("c"));
371 ASSERT_EQ(num_partial_merge_calls
, 0U);
374 void testSingleBatchSuccessiveMerge(DB
* db
, size_t max_num_merges
,
376 assert(num_merges
> max_num_merges
);
378 Slice
key("BatchSuccessiveMerge");
379 uint64_t merge_value
= 1;
380 char buf
[sizeof(merge_value
)];
381 EncodeFixed64(buf
, merge_value
);
382 Slice
merge_value_slice(buf
, sizeof(merge_value
));
386 for (size_t i
= 0; i
< num_merges
; ++i
) {
387 batch
.Merge(key
, merge_value_slice
);
390 // Apply to memtable and count the number of merges
391 resetNumMergeOperatorCalls();
393 Status s
= db
->Write(WriteOptions(), &batch
);
397 num_merge_operator_calls
,
398 static_cast<size_t>(num_merges
- (num_merges
% (max_num_merges
+ 1))));
401 resetNumMergeOperatorCalls();
402 std::string get_value_str
;
404 Status s
= db
->Get(ReadOptions(), key
, &get_value_str
);
407 assert(get_value_str
.size() == sizeof(uint64_t));
408 uint64_t get_value
= DecodeFixed64(&get_value_str
[0]);
409 ASSERT_EQ(get_value
, num_merges
* merge_value
);
410 ASSERT_EQ(num_merge_operator_calls
,
411 static_cast<size_t>((num_merges
% (max_num_merges
+ 1))));
414 void runTest(int argc
, const std::string
& dbname
, const bool use_ttl
= false) {
415 bool compact
= false;
418 std::cout
<< "Turn on Compaction\n";
422 auto db
= OpenDb(dbname
, use_ttl
);
425 std::cout
<< "Test read-modify-write counters... \n";
426 Counters
counters(db
, 0);
427 testCounters(counters
, db
.get(), true);
431 std::cout
<< "Test merge-based counters... \n";
432 MergeBasedCounters
counters(db
, 0);
433 testCounters(counters
, db
.get(), compact
);
437 DestroyDB(dbname
, Options());
440 std::cout
<< "Test merge in memtable... \n";
441 size_t max_merge
= 5;
442 auto db
= OpenDb(dbname
, use_ttl
, max_merge
);
443 MergeBasedCounters
counters(db
, 0);
444 testCounters(counters
, db
.get(), compact
);
445 testSuccessiveMerge(counters
, max_merge
, max_merge
* 2);
446 testSingleBatchSuccessiveMerge(db
.get(), 5, 7);
447 DestroyDB(dbname
, Options());
451 std::cout
<< "Test Partial-Merge\n";
452 size_t max_merge
= 100;
453 // Min merge is hard-coded to 2.
454 uint32_t min_merge
= 2;
455 for (uint32_t count
= min_merge
- 1; count
<= min_merge
+ 1; count
++) {
456 auto db
= OpenDb(dbname
, use_ttl
, max_merge
);
457 MergeBasedCounters
counters(db
, 0);
458 testPartialMerge(&counters
, db
.get(), max_merge
, min_merge
, count
);
459 DestroyDB(dbname
, Options());
462 auto db
= OpenDb(dbname
, use_ttl
, max_merge
);
463 MergeBasedCounters
counters(db
, 0);
464 testPartialMerge(&counters
, db
.get(), max_merge
, min_merge
,
466 DestroyDB(dbname
, Options());
471 std::cout
<< "Test merge-operator not set after reopen\n";
473 auto db
= OpenDb(dbname
);
474 MergeBasedCounters
counters(db
, 0);
475 counters
.add("test-key", 1);
476 counters
.add("test-key", 1);
477 counters
.add("test-key", 1);
478 db
->CompactRange(CompactRangeOptions(), nullptr, nullptr);
482 ASSERT_OK(DB::Open(Options(), dbname
, &reopen_db
));
484 ASSERT_TRUE(!(reopen_db
->Get(ReadOptions(), "test-key", &value
).ok()));
486 DestroyDB(dbname
, Options());
489 /* Temporary remove this test
491 std::cout << "Test merge-operator not set after reopen (recovery case)\n";
493 auto db = OpenDb(dbname);
494 MergeBasedCounters counters(db, 0);
495 counters.add("test-key", 1);
496 counters.add("test-key", 1);
497 counters.add("test-key", 1);
501 ASSERT_TRUE(DB::Open(Options(), dbname, &reopen_db).IsInvalidArgument());
507 int main(int argc
, char *argv
[]) {
508 //TODO: Make this test like a general rocksdb unit-test
509 rocksdb::port::InstallStackTraceHandler();
510 runTest(argc
, test::TmpDir() + "/merge_testdb");
511 // DBWithTTL is not supported in ROCKSDB_LITE
513 runTest(argc
, test::TmpDir() + "/merge_testdbttl", true); // Run test on TTL database
514 #endif // !ROCKSDB_LITE
515 printf("Passed all tests!\n");