]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/merge_test.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rocksdb / db / merge_test.cc
CommitLineData
7c673cae
FG
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under the BSD-style license found in the
3// LICENSE file in the root directory of this source tree. An additional grant
4// of patent rights can be found in the PATENTS file in the same directory.
5//
6#include <assert.h>
7#include <memory>
8#include <iostream>
9
10#include "port/stack_trace.h"
11#include "rocksdb/cache.h"
12#include "rocksdb/comparator.h"
13#include "rocksdb/db.h"
14#include "rocksdb/env.h"
15#include "rocksdb/merge_operator.h"
16#include "rocksdb/utilities/db_ttl.h"
17#include "db/dbformat.h"
18#include "db/db_impl.h"
19#include "db/write_batch_internal.h"
20#include "utilities/merge_operators.h"
21#include "util/testharness.h"
22
23using namespace rocksdb;
24
25namespace {
26size_t num_merge_operator_calls;
27void resetNumMergeOperatorCalls() { num_merge_operator_calls = 0; }
28
29size_t num_partial_merge_calls;
30void resetNumPartialMergeCalls() { num_partial_merge_calls = 0; }
31}
32
33class CountMergeOperator : public AssociativeMergeOperator {
34 public:
35 CountMergeOperator() {
36 mergeOperator_ = MergeOperators::CreateUInt64AddOperator();
37 }
38
39 virtual bool Merge(const Slice& key,
40 const Slice* existing_value,
41 const Slice& value,
42 std::string* new_value,
43 Logger* logger) const override {
44 assert(new_value->empty());
45 ++num_merge_operator_calls;
46 if (existing_value == nullptr) {
47 new_value->assign(value.data(), value.size());
48 return true;
49 }
50
51 return mergeOperator_->PartialMerge(
52 key,
53 *existing_value,
54 value,
55 new_value,
56 logger);
57 }
58
59 virtual bool PartialMergeMulti(const Slice& key,
60 const std::deque<Slice>& operand_list,
61 std::string* new_value,
62 Logger* logger) const override {
63 assert(new_value->empty());
64 ++num_partial_merge_calls;
65 return mergeOperator_->PartialMergeMulti(key, operand_list, new_value,
66 logger);
67 }
68
69 virtual const char* Name() const override {
70 return "UInt64AddOperator";
71 }
72
73 private:
74 std::shared_ptr<MergeOperator> mergeOperator_;
75};
76
77namespace {
78std::shared_ptr<DB> OpenDb(const std::string& dbname, const bool ttl = false,
79 const size_t max_successive_merges = 0) {
80 DB* db;
81 Options options;
82 options.create_if_missing = true;
83 options.merge_operator = std::make_shared<CountMergeOperator>();
84 options.max_successive_merges = max_successive_merges;
85 Status s;
86 DestroyDB(dbname, Options());
87// DBWithTTL is not supported in ROCKSDB_LITE
88#ifndef ROCKSDB_LITE
89 if (ttl) {
90 std::cout << "Opening database with TTL\n";
91 DBWithTTL* db_with_ttl;
92 s = DBWithTTL::Open(options, dbname, &db_with_ttl);
93 db = db_with_ttl;
94 } else {
95 s = DB::Open(options, dbname, &db);
96 }
97#else
98 assert(!ttl);
99 s = DB::Open(options, dbname, &db);
100#endif // !ROCKSDB_LITE
101 if (!s.ok()) {
102 std::cerr << s.ToString() << std::endl;
103 assert(false);
104 }
105 return std::shared_ptr<DB>(db);
106}
107} // namespace
108
109// Imagine we are maintaining a set of uint64 counters.
110// Each counter has a distinct name. And we would like
111// to support four high level operations:
112// set, add, get and remove
113// This is a quick implementation without a Merge operation.
114class Counters {
115
116 protected:
117 std::shared_ptr<DB> db_;
118
119 WriteOptions put_option_;
120 ReadOptions get_option_;
121 WriteOptions delete_option_;
122
123 uint64_t default_;
124
125 public:
126 explicit Counters(std::shared_ptr<DB> db, uint64_t defaultCount = 0)
127 : db_(db),
128 put_option_(),
129 get_option_(),
130 delete_option_(),
131 default_(defaultCount) {
132 assert(db_);
133 }
134
135 virtual ~Counters() {}
136
137 // public interface of Counters.
138 // All four functions return false
139 // if the underlying level db operation failed.
140
141 // mapped to a levedb Put
142 bool set(const std::string& key, uint64_t value) {
143 // just treat the internal rep of int64 as the string
144 char buf[sizeof(value)];
145 EncodeFixed64(buf, value);
146 Slice slice(buf, sizeof(value));
147 auto s = db_->Put(put_option_, key, slice);
148
149 if (s.ok()) {
150 return true;
151 } else {
152 std::cerr << s.ToString() << std::endl;
153 return false;
154 }
155 }
156
157 // mapped to a rocksdb Delete
158 bool remove(const std::string& key) {
159 auto s = db_->Delete(delete_option_, key);
160
161 if (s.ok()) {
162 return true;
163 } else {
164 std::cerr << s.ToString() << std::endl;
165 return false;
166 }
167 }
168
169 // mapped to a rocksdb Get
170 bool get(const std::string& key, uint64_t* value) {
171 std::string str;
172 auto s = db_->Get(get_option_, key, &str);
173
174 if (s.IsNotFound()) {
175 // return default value if not found;
176 *value = default_;
177 return true;
178 } else if (s.ok()) {
179 // deserialization
180 if (str.size() != sizeof(uint64_t)) {
181 std::cerr << "value corruption\n";
182 return false;
183 }
184 *value = DecodeFixed64(&str[0]);
185 return true;
186 } else {
187 std::cerr << s.ToString() << std::endl;
188 return false;
189 }
190 }
191
192 // 'add' is implemented as get -> modify -> set
193 // An alternative is a single merge operation, see MergeBasedCounters
194 virtual bool add(const std::string& key, uint64_t value) {
195 uint64_t base = default_;
196 return get(key, &base) && set(key, base + value);
197 }
198
199
200 // convenience functions for testing
201 void assert_set(const std::string& key, uint64_t value) {
202 assert(set(key, value));
203 }
204
205 void assert_remove(const std::string& key) { assert(remove(key)); }
206
207 uint64_t assert_get(const std::string& key) {
208 uint64_t value = default_;
209 int result = get(key, &value);
210 assert(result);
211 if (result == 0) exit(1); // Disable unused variable warning.
212 return value;
213 }
214
215 void assert_add(const std::string& key, uint64_t value) {
216 int result = add(key, value);
217 assert(result);
218 if (result == 0) exit(1); // Disable unused variable warning.
219 }
220};
221
222// Implement 'add' directly with the new Merge operation
223class MergeBasedCounters : public Counters {
224 private:
225 WriteOptions merge_option_; // for merge
226
227 public:
228 explicit MergeBasedCounters(std::shared_ptr<DB> db, uint64_t defaultCount = 0)
229 : Counters(db, defaultCount),
230 merge_option_() {
231 }
232
233 // mapped to a rocksdb Merge operation
234 virtual bool add(const std::string& key, uint64_t value) override {
235 char encoded[sizeof(uint64_t)];
236 EncodeFixed64(encoded, value);
237 Slice slice(encoded, sizeof(uint64_t));
238 auto s = db_->Merge(merge_option_, key, slice);
239
240 if (s.ok()) {
241 return true;
242 } else {
243 std::cerr << s.ToString() << std::endl;
244 return false;
245 }
246 }
247};
248
249namespace {
250void dumpDb(DB* db) {
251 auto it = unique_ptr<Iterator>(db->NewIterator(ReadOptions()));
252 for (it->SeekToFirst(); it->Valid(); it->Next()) {
253 uint64_t value = DecodeFixed64(it->value().data());
254 std::cout << it->key().ToString() << ": " << value << std::endl;
255 }
256 assert(it->status().ok()); // Check for any errors found during the scan
257}
258
259void testCounters(Counters& counters, DB* db, bool test_compaction) {
260
261 FlushOptions o;
262 o.wait = true;
263
264 counters.assert_set("a", 1);
265
266 if (test_compaction) db->Flush(o);
267
268 assert(counters.assert_get("a") == 1);
269
270 counters.assert_remove("b");
271
272 // defaut value is 0 if non-existent
273 assert(counters.assert_get("b") == 0);
274
275 counters.assert_add("a", 2);
276
277 if (test_compaction) db->Flush(o);
278
279 // 1+2 = 3
280 assert(counters.assert_get("a")== 3);
281
282 dumpDb(db);
283
284 std::cout << "1\n";
285
286 // 1+...+49 = ?
287 uint64_t sum = 0;
288 for (int i = 1; i < 50; i++) {
289 counters.assert_add("b", i);
290 sum += i;
291 }
292 assert(counters.assert_get("b") == sum);
293
294 std::cout << "2\n";
295 dumpDb(db);
296
297 std::cout << "3\n";
298
299 if (test_compaction) {
300 db->Flush(o);
301
302 std::cout << "Compaction started ...\n";
303 db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
304 std::cout << "Compaction ended\n";
305
306 dumpDb(db);
307
308 assert(counters.assert_get("a")== 3);
309 assert(counters.assert_get("b") == sum);
310 }
311}
312
313void testSuccessiveMerge(Counters& counters, size_t max_num_merges,
314 size_t num_merges) {
315
316 counters.assert_remove("z");
317 uint64_t sum = 0;
318
319 for (size_t i = 1; i <= num_merges; ++i) {
320 resetNumMergeOperatorCalls();
321 counters.assert_add("z", i);
322 sum += i;
323
324 if (i % (max_num_merges + 1) == 0) {
325 assert(num_merge_operator_calls == max_num_merges + 1);
326 } else {
327 assert(num_merge_operator_calls == 0);
328 }
329
330 resetNumMergeOperatorCalls();
331 assert(counters.assert_get("z") == sum);
332 assert(num_merge_operator_calls == i % (max_num_merges + 1));
333 }
334}
335
336void testPartialMerge(Counters* counters, DB* db, size_t max_merge,
337 size_t min_merge, size_t count) {
338 FlushOptions o;
339 o.wait = true;
340
341 // Test case 1: partial merge should be called when the number of merge
342 // operands exceeds the threshold.
343 uint64_t tmp_sum = 0;
344 resetNumPartialMergeCalls();
345 for (size_t i = 1; i <= count; i++) {
346 counters->assert_add("b", i);
347 tmp_sum += i;
348 }
349 db->Flush(o);
350 db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
351 ASSERT_EQ(tmp_sum, counters->assert_get("b"));
352 if (count > max_merge) {
353 // in this case, FullMerge should be called instead.
354 ASSERT_EQ(num_partial_merge_calls, 0U);
355 } else {
356 // if count >= min_merge, then partial merge should be called once.
357 ASSERT_EQ((count >= min_merge), (num_partial_merge_calls == 1));
358 }
359
360 // Test case 2: partial merge should not be called when a put is found.
361 resetNumPartialMergeCalls();
362 tmp_sum = 0;
363 db->Put(rocksdb::WriteOptions(), "c", "10");
364 for (size_t i = 1; i <= count; i++) {
365 counters->assert_add("c", i);
366 tmp_sum += i;
367 }
368 db->Flush(o);
369 db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
370 ASSERT_EQ(tmp_sum, counters->assert_get("c"));
371 ASSERT_EQ(num_partial_merge_calls, 0U);
372}
373
374void testSingleBatchSuccessiveMerge(DB* db, size_t max_num_merges,
375 size_t num_merges) {
376 assert(num_merges > max_num_merges);
377
378 Slice key("BatchSuccessiveMerge");
379 uint64_t merge_value = 1;
380 char buf[sizeof(merge_value)];
381 EncodeFixed64(buf, merge_value);
382 Slice merge_value_slice(buf, sizeof(merge_value));
383
384 // Create the batch
385 WriteBatch batch;
386 for (size_t i = 0; i < num_merges; ++i) {
387 batch.Merge(key, merge_value_slice);
388 }
389
390 // Apply to memtable and count the number of merges
391 resetNumMergeOperatorCalls();
392 {
393 Status s = db->Write(WriteOptions(), &batch);
394 assert(s.ok());
395 }
396 ASSERT_EQ(
397 num_merge_operator_calls,
398 static_cast<size_t>(num_merges - (num_merges % (max_num_merges + 1))));
399
400 // Get the value
401 resetNumMergeOperatorCalls();
402 std::string get_value_str;
403 {
404 Status s = db->Get(ReadOptions(), key, &get_value_str);
405 assert(s.ok());
406 }
407 assert(get_value_str.size() == sizeof(uint64_t));
408 uint64_t get_value = DecodeFixed64(&get_value_str[0]);
409 ASSERT_EQ(get_value, num_merges * merge_value);
410 ASSERT_EQ(num_merge_operator_calls,
411 static_cast<size_t>((num_merges % (max_num_merges + 1))));
412}
413
414void runTest(int argc, const std::string& dbname, const bool use_ttl = false) {
415 bool compact = false;
416 if (argc > 1) {
417 compact = true;
418 std::cout << "Turn on Compaction\n";
419 }
420
421 {
422 auto db = OpenDb(dbname, use_ttl);
423
424 {
425 std::cout << "Test read-modify-write counters... \n";
426 Counters counters(db, 0);
427 testCounters(counters, db.get(), true);
428 }
429
430 {
431 std::cout << "Test merge-based counters... \n";
432 MergeBasedCounters counters(db, 0);
433 testCounters(counters, db.get(), compact);
434 }
435 }
436
437 DestroyDB(dbname, Options());
438
439 {
440 std::cout << "Test merge in memtable... \n";
441 size_t max_merge = 5;
442 auto db = OpenDb(dbname, use_ttl, max_merge);
443 MergeBasedCounters counters(db, 0);
444 testCounters(counters, db.get(), compact);
445 testSuccessiveMerge(counters, max_merge, max_merge * 2);
446 testSingleBatchSuccessiveMerge(db.get(), 5, 7);
447 DestroyDB(dbname, Options());
448 }
449
450 {
451 std::cout << "Test Partial-Merge\n";
452 size_t max_merge = 100;
453 // Min merge is hard-coded to 2.
454 uint32_t min_merge = 2;
455 for (uint32_t count = min_merge - 1; count <= min_merge + 1; count++) {
456 auto db = OpenDb(dbname, use_ttl, max_merge);
457 MergeBasedCounters counters(db, 0);
458 testPartialMerge(&counters, db.get(), max_merge, min_merge, count);
459 DestroyDB(dbname, Options());
460 }
461 {
462 auto db = OpenDb(dbname, use_ttl, max_merge);
463 MergeBasedCounters counters(db, 0);
464 testPartialMerge(&counters, db.get(), max_merge, min_merge,
465 min_merge * 10);
466 DestroyDB(dbname, Options());
467 }
468 }
469
470 {
471 std::cout << "Test merge-operator not set after reopen\n";
472 {
473 auto db = OpenDb(dbname);
474 MergeBasedCounters counters(db, 0);
475 counters.add("test-key", 1);
476 counters.add("test-key", 1);
477 counters.add("test-key", 1);
478 db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
479 }
480
481 DB* reopen_db;
482 ASSERT_OK(DB::Open(Options(), dbname, &reopen_db));
483 std::string value;
484 ASSERT_TRUE(!(reopen_db->Get(ReadOptions(), "test-key", &value).ok()));
485 delete reopen_db;
486 DestroyDB(dbname, Options());
487 }
488
489 /* Temporary remove this test
490 {
491 std::cout << "Test merge-operator not set after reopen (recovery case)\n";
492 {
493 auto db = OpenDb(dbname);
494 MergeBasedCounters counters(db, 0);
495 counters.add("test-key", 1);
496 counters.add("test-key", 1);
497 counters.add("test-key", 1);
498 }
499
500 DB* reopen_db;
501 ASSERT_TRUE(DB::Open(Options(), dbname, &reopen_db).IsInvalidArgument());
502 }
503 */
504}
505} // namespace
506
507int main(int argc, char *argv[]) {
508 //TODO: Make this test like a general rocksdb unit-test
509 rocksdb::port::InstallStackTraceHandler();
510 runTest(argc, test::TmpDir() + "/merge_testdb");
511// DBWithTTL is not supported in ROCKSDB_LITE
512#ifndef ROCKSDB_LITE
513 runTest(argc, test::TmpDir() + "/merge_testdbttl", true); // Run test on TTL database
514#endif // !ROCKSDB_LITE
515 printf("Passed all tests!\n");
516 return 0;
517}