1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
10 #include "port/stack_trace.h"
11 #include "rocksdb/cache.h"
12 #include "rocksdb/comparator.h"
13 #include "rocksdb/db.h"
14 #include "rocksdb/env.h"
15 #include "rocksdb/merge_operator.h"
16 #include "rocksdb/utilities/db_ttl.h"
17 #include "db/dbformat.h"
18 #include "db/db_impl.h"
19 #include "db/write_batch_internal.h"
20 #include "utilities/merge_operators.h"
21 #include "util/testharness.h"
27 class MergeTest
: public testing::Test
{};
29 size_t num_merge_operator_calls
;
30 void resetNumMergeOperatorCalls() { num_merge_operator_calls
= 0; }
32 size_t num_partial_merge_calls
;
33 void resetNumPartialMergeCalls() { num_partial_merge_calls
= 0; }
35 class CountMergeOperator
: public AssociativeMergeOperator
{
37 CountMergeOperator() {
38 mergeOperator_
= MergeOperators::CreateUInt64AddOperator();
41 virtual bool Merge(const Slice
& key
,
42 const Slice
* existing_value
,
44 std::string
* new_value
,
45 Logger
* logger
) const override
{
46 assert(new_value
->empty());
47 ++num_merge_operator_calls
;
48 if (existing_value
== nullptr) {
49 new_value
->assign(value
.data(), value
.size());
53 return mergeOperator_
->PartialMerge(
61 virtual bool PartialMergeMulti(const Slice
& key
,
62 const std::deque
<Slice
>& operand_list
,
63 std::string
* new_value
,
64 Logger
* logger
) const override
{
65 assert(new_value
->empty());
66 ++num_partial_merge_calls
;
67 return mergeOperator_
->PartialMergeMulti(key
, operand_list
, new_value
,
71 virtual const char* Name() const override
{
72 return "UInt64AddOperator";
76 std::shared_ptr
<MergeOperator
> mergeOperator_
;
79 std::shared_ptr
<DB
> OpenDb(const std::string
& dbname
, const bool ttl
= false,
80 const size_t max_successive_merges
= 0) {
83 options
.create_if_missing
= true;
84 options
.merge_operator
= std::make_shared
<CountMergeOperator
>();
85 options
.max_successive_merges
= max_successive_merges
;
87 DestroyDB(dbname
, Options());
88 // DBWithTTL is not supported in ROCKSDB_LITE
91 DBWithTTL
* db_with_ttl
;
92 s
= DBWithTTL::Open(options
, dbname
, &db_with_ttl
);
95 s
= DB::Open(options
, dbname
, &db
);
99 s
= DB::Open(options
, dbname
, &db
);
100 #endif // !ROCKSDB_LITE
102 std::cerr
<< s
.ToString() << std::endl
;
105 return std::shared_ptr
<DB
>(db
);
108 // Imagine we are maintaining a set of uint64 counters.
109 // Each counter has a distinct name. And we would like
110 // to support four high level operations:
111 // set, add, get and remove
112 // This is a quick implementation without a Merge operation.
116 std::shared_ptr
<DB
> db_
;
118 WriteOptions put_option_
;
119 ReadOptions get_option_
;
120 WriteOptions delete_option_
;
125 explicit Counters(std::shared_ptr
<DB
> db
, uint64_t defaultCount
= 0)
130 default_(defaultCount
) {
134 virtual ~Counters() {}
136 // public interface of Counters.
137 // All four functions return false
138 // if the underlying level db operation failed.
140 // mapped to a levedb Put
141 bool set(const std::string
& key
, uint64_t value
) {
142 // just treat the internal rep of int64 as the string
143 char buf
[sizeof(value
)];
144 EncodeFixed64(buf
, value
);
145 Slice
slice(buf
, sizeof(value
));
146 auto s
= db_
->Put(put_option_
, key
, slice
);
151 std::cerr
<< s
.ToString() << std::endl
;
156 // mapped to a rocksdb Delete
157 bool remove(const std::string
& key
) {
158 auto s
= db_
->Delete(delete_option_
, key
);
163 std::cerr
<< s
.ToString() << std::endl
;
168 // mapped to a rocksdb Get
169 bool get(const std::string
& key
, uint64_t* value
) {
171 auto s
= db_
->Get(get_option_
, key
, &str
);
173 if (s
.IsNotFound()) {
174 // return default value if not found;
179 if (str
.size() != sizeof(uint64_t)) {
180 std::cerr
<< "value corruption\n";
183 *value
= DecodeFixed64(&str
[0]);
186 std::cerr
<< s
.ToString() << std::endl
;
191 // 'add' is implemented as get -> modify -> set
192 // An alternative is a single merge operation, see MergeBasedCounters
193 virtual bool add(const std::string
& key
, uint64_t value
) {
194 uint64_t base
= default_
;
195 return get(key
, &base
) && set(key
, base
+ value
);
199 // convenience functions for testing
200 void assert_set(const std::string
& key
, uint64_t value
) {
201 assert(set(key
, value
));
204 void assert_remove(const std::string
& key
) { assert(remove(key
)); }
206 uint64_t assert_get(const std::string
& key
) {
207 uint64_t value
= default_
;
208 int result
= get(key
, &value
);
210 if (result
== 0) exit(1); // Disable unused variable warning.
214 void assert_add(const std::string
& key
, uint64_t value
) {
215 int result
= add(key
, value
);
217 if (result
== 0) exit(1); // Disable unused variable warning.
221 // Implement 'add' directly with the new Merge operation
222 class MergeBasedCounters
: public Counters
{
224 WriteOptions merge_option_
; // for merge
227 explicit MergeBasedCounters(std::shared_ptr
<DB
> db
, uint64_t defaultCount
= 0)
228 : Counters(db
, defaultCount
),
232 // mapped to a rocksdb Merge operation
233 virtual bool add(const std::string
& key
, uint64_t value
) override
{
234 char encoded
[sizeof(uint64_t)];
235 EncodeFixed64(encoded
, value
);
236 Slice
slice(encoded
, sizeof(uint64_t));
237 auto s
= db_
->Merge(merge_option_
, key
, slice
);
242 std::cerr
<< s
.ToString() << std::endl
;
248 void dumpDb(DB
* db
) {
249 auto it
= unique_ptr
<Iterator
>(db
->NewIterator(ReadOptions()));
250 for (it
->SeekToFirst(); it
->Valid(); it
->Next()) {
251 //uint64_t value = DecodeFixed64(it->value().data());
252 //std::cout << it->key().ToString() << ": " << value << std::endl;
254 assert(it
->status().ok()); // Check for any errors found during the scan
257 void testCounters(Counters
& counters
, DB
* db
, bool test_compaction
) {
262 counters
.assert_set("a", 1);
264 if (test_compaction
) db
->Flush(o
);
266 assert(counters
.assert_get("a") == 1);
268 counters
.assert_remove("b");
270 // defaut value is 0 if non-existent
271 assert(counters
.assert_get("b") == 0);
273 counters
.assert_add("a", 2);
275 if (test_compaction
) db
->Flush(o
);
278 assert(counters
.assert_get("a")== 3);
284 for (int i
= 1; i
< 50; i
++) {
285 counters
.assert_add("b", i
);
288 assert(counters
.assert_get("b") == sum
);
292 if (test_compaction
) {
295 db
->CompactRange(CompactRangeOptions(), nullptr, nullptr);
299 assert(counters
.assert_get("a")== 3);
300 assert(counters
.assert_get("b") == sum
);
304 void testSuccessiveMerge(Counters
& counters
, size_t max_num_merges
,
307 counters
.assert_remove("z");
310 for (size_t i
= 1; i
<= num_merges
; ++i
) {
311 resetNumMergeOperatorCalls();
312 counters
.assert_add("z", i
);
315 if (i
% (max_num_merges
+ 1) == 0) {
316 assert(num_merge_operator_calls
== max_num_merges
+ 1);
318 assert(num_merge_operator_calls
== 0);
321 resetNumMergeOperatorCalls();
322 assert(counters
.assert_get("z") == sum
);
323 assert(num_merge_operator_calls
== i
% (max_num_merges
+ 1));
327 void testPartialMerge(Counters
* counters
, DB
* db
, size_t max_merge
,
328 size_t min_merge
, size_t count
) {
332 // Test case 1: partial merge should be called when the number of merge
333 // operands exceeds the threshold.
334 uint64_t tmp_sum
= 0;
335 resetNumPartialMergeCalls();
336 for (size_t i
= 1; i
<= count
; i
++) {
337 counters
->assert_add("b", i
);
341 db
->CompactRange(CompactRangeOptions(), nullptr, nullptr);
342 ASSERT_EQ(tmp_sum
, counters
->assert_get("b"));
343 if (count
> max_merge
) {
344 // in this case, FullMerge should be called instead.
345 ASSERT_EQ(num_partial_merge_calls
, 0U);
347 // if count >= min_merge, then partial merge should be called once.
348 ASSERT_EQ((count
>= min_merge
), (num_partial_merge_calls
== 1));
351 // Test case 2: partial merge should not be called when a put is found.
352 resetNumPartialMergeCalls();
354 db
->Put(rocksdb::WriteOptions(), "c", "10");
355 for (size_t i
= 1; i
<= count
; i
++) {
356 counters
->assert_add("c", i
);
360 db
->CompactRange(CompactRangeOptions(), nullptr, nullptr);
361 ASSERT_EQ(tmp_sum
, counters
->assert_get("c"));
362 ASSERT_EQ(num_partial_merge_calls
, 0U);
365 void testSingleBatchSuccessiveMerge(DB
* db
, size_t max_num_merges
,
367 assert(num_merges
> max_num_merges
);
369 Slice
key("BatchSuccessiveMerge");
370 uint64_t merge_value
= 1;
371 char buf
[sizeof(merge_value
)];
372 EncodeFixed64(buf
, merge_value
);
373 Slice
merge_value_slice(buf
, sizeof(merge_value
));
377 for (size_t i
= 0; i
< num_merges
; ++i
) {
378 batch
.Merge(key
, merge_value_slice
);
381 // Apply to memtable and count the number of merges
382 resetNumMergeOperatorCalls();
384 Status s
= db
->Write(WriteOptions(), &batch
);
388 num_merge_operator_calls
,
389 static_cast<size_t>(num_merges
- (num_merges
% (max_num_merges
+ 1))));
392 resetNumMergeOperatorCalls();
393 std::string get_value_str
;
395 Status s
= db
->Get(ReadOptions(), key
, &get_value_str
);
398 assert(get_value_str
.size() == sizeof(uint64_t));
399 uint64_t get_value
= DecodeFixed64(&get_value_str
[0]);
400 ASSERT_EQ(get_value
, num_merges
* merge_value
);
401 ASSERT_EQ(num_merge_operator_calls
,
402 static_cast<size_t>((num_merges
% (max_num_merges
+ 1))));
405 void runTest(const std::string
& dbname
, const bool use_ttl
= false) {
408 auto db
= OpenDb(dbname
, use_ttl
);
411 Counters
counters(db
, 0);
412 testCounters(counters
, db
.get(), true);
416 MergeBasedCounters
counters(db
, 0);
417 testCounters(counters
, db
.get(), use_compression
);
421 DestroyDB(dbname
, Options());
424 size_t max_merge
= 5;
425 auto db
= OpenDb(dbname
, use_ttl
, max_merge
);
426 MergeBasedCounters
counters(db
, 0);
427 testCounters(counters
, db
.get(), use_compression
);
428 testSuccessiveMerge(counters
, max_merge
, max_merge
* 2);
429 testSingleBatchSuccessiveMerge(db
.get(), 5, 7);
430 DestroyDB(dbname
, Options());
434 size_t max_merge
= 100;
435 // Min merge is hard-coded to 2.
436 uint32_t min_merge
= 2;
437 for (uint32_t count
= min_merge
- 1; count
<= min_merge
+ 1; count
++) {
438 auto db
= OpenDb(dbname
, use_ttl
, max_merge
);
439 MergeBasedCounters
counters(db
, 0);
440 testPartialMerge(&counters
, db
.get(), max_merge
, min_merge
, count
);
441 DestroyDB(dbname
, Options());
444 auto db
= OpenDb(dbname
, use_ttl
, max_merge
);
445 MergeBasedCounters
counters(db
, 0);
446 testPartialMerge(&counters
, db
.get(), max_merge
, min_merge
,
448 DestroyDB(dbname
, Options());
454 auto db
= OpenDb(dbname
);
455 MergeBasedCounters
counters(db
, 0);
456 counters
.add("test-key", 1);
457 counters
.add("test-key", 1);
458 counters
.add("test-key", 1);
459 db
->CompactRange(CompactRangeOptions(), nullptr, nullptr);
463 ASSERT_OK(DB::Open(Options(), dbname
, &reopen_db
));
465 ASSERT_TRUE(!(reopen_db
->Get(ReadOptions(), "test-key", &value
).ok()));
467 DestroyDB(dbname
, Options());
470 /* Temporary remove this test
472 std::cout << "Test merge-operator not set after reopen (recovery case)\n";
474 auto db = OpenDb(dbname);
475 MergeBasedCounters counters(db, 0);
476 counters.add("test-key", 1);
477 counters.add("test-key", 1);
478 counters.add("test-key", 1);
482 ASSERT_TRUE(DB::Open(Options(), dbname, &reopen_db).IsInvalidArgument());
487 TEST_F(MergeTest
, MergeDbTest
) {
488 runTest(test::PerThreadDBPath("merge_testdb"));
492 TEST_F(MergeTest
, MergeDbTtlTest
) {
493 runTest(test::PerThreadDBPath("merge_testdbttl"),
494 true); // Run test on TTL database
496 #endif // !ROCKSDB_LITE
498 } // namespace rocksdb
500 int main(int argc
, char** argv
) {
501 rocksdb::use_compression
= false;
503 rocksdb::use_compression
= true;
506 rocksdb::port::InstallStackTraceHandler();
507 ::testing::InitGoogleTest(&argc
, argv
);
508 return RUN_ALL_TESTS();