return true;
}
- return mergeOperator_->PartialMerge(
- key,
- *existing_value,
- value,
- new_value,
- logger);
+ return mergeOperator_->PartialMerge(key, *existing_value, value, new_value,
+ logger);
}
bool PartialMergeMulti(const Slice& key,
std::shared_ptr<MergeOperator> mergeOperator_;
};
+class EnvMergeTest : public EnvWrapper {
+ public:
+ EnvMergeTest() : EnvWrapper(Env::Default()) {}
+ static const char* kClassName() { return "MergeEnv"; }
+ const char* Name() const override { return kClassName(); }
+ // ~EnvMergeTest() override {}
+
+ uint64_t NowNanos() override {
+ ++now_nanos_count_;
+ return target()->NowNanos();
+ }
+
+ static uint64_t now_nanos_count_;
+
+ static std::unique_ptr<EnvMergeTest> singleton_;
+
+ static EnvMergeTest* GetInstance() {
+ if (nullptr == singleton_) singleton_.reset(new EnvMergeTest);
+ return singleton_.get();
+ }
+};
+
+uint64_t EnvMergeTest::now_nanos_count_{0};
+std::unique_ptr<EnvMergeTest> EnvMergeTest::singleton_;
+
std::shared_ptr<DB> OpenDb(const std::string& dbname, const bool ttl = false,
const size_t max_successive_merges = 0) {
DB* db;
options.create_if_missing = true;
options.merge_operator = std::make_shared<CountMergeOperator>();
options.max_successive_merges = max_successive_merges;
+ options.env = EnvMergeTest::GetInstance();
+ EXPECT_OK(DestroyDB(dbname, Options()));
Status s;
- DestroyDB(dbname, Options());
// DBWithTTL is not supported in ROCKSDB_LITE
#ifndef ROCKSDB_LITE
if (ttl) {
assert(!ttl);
s = DB::Open(options, dbname, &db);
#endif // !ROCKSDB_LITE
- if (!s.ok()) {
- std::cerr << s.ToString() << std::endl;
- assert(false);
- }
+ EXPECT_OK(s);
+ assert(s.ok());
+ // Allowed to call NowNanos during DB creation (in GenerateRawUniqueId() for
+ // session ID)
+ EnvMergeTest::now_nanos_count_ = 0;
return std::shared_ptr<DB>(db);
}
// set, add, get and remove
// This is a quick implementation without a Merge operation.
class Counters {
-
protected:
std::shared_ptr<DB> db_;
return get(key, &base) && set(key, base + value);
}
-
// convenience functions for testing
void assert_set(const std::string& key, uint64_t value) {
assert(set(key, value));
uint64_t value = default_;
int result = get(key, &value);
assert(result);
- if (result == 0) exit(1); // Disable unused variable warning.
+ if (result == 0) exit(1); // Disable unused variable warning.
return value;
}
void assert_add(const std::string& key, uint64_t value) {
int result = add(key, value);
assert(result);
- if (result == 0) exit(1); // Disable unused variable warning.
+ if (result == 0) exit(1); // Disable unused variable warning.
}
};
// Implement 'add' directly with the new Merge operation
class MergeBasedCounters : public Counters {
private:
- WriteOptions merge_option_; // for merge
+ WriteOptions merge_option_; // for merge
public:
explicit MergeBasedCounters(std::shared_ptr<DB> db, uint64_t defaultCount = 0)
- : Counters(db, defaultCount),
- merge_option_() {
- }
+ : Counters(db, defaultCount), merge_option_() {}
// mapped to a rocksdb Merge operation
bool add(const std::string& key, uint64_t value) override {
void dumpDb(DB* db) {
auto it = std::unique_ptr<Iterator>(db->NewIterator(ReadOptions()));
for (it->SeekToFirst(); it->Valid(); it->Next()) {
- //uint64_t value = DecodeFixed64(it->value().data());
- //std::cout << it->key().ToString() << ": " << value << std::endl;
+ // uint64_t value = DecodeFixed64(it->value().data());
+ // std::cout << it->key().ToString() << ": " << value << std::endl;
}
assert(it->status().ok()); // Check for any errors found during the scan
}
void testCounters(Counters& counters, DB* db, bool test_compaction) {
-
FlushOptions o;
o.wait = true;
counters.assert_set("a", 1);
- if (test_compaction) db->Flush(o);
+ if (test_compaction) {
+ ASSERT_OK(db->Flush(o));
+ }
- assert(counters.assert_get("a") == 1);
+ ASSERT_EQ(counters.assert_get("a"), 1);
counters.assert_remove("b");
// defaut value is 0 if non-existent
- assert(counters.assert_get("b") == 0);
+ ASSERT_EQ(counters.assert_get("b"), 0);
counters.assert_add("a", 2);
- if (test_compaction) db->Flush(o);
+ if (test_compaction) {
+ ASSERT_OK(db->Flush(o));
+ }
// 1+2 = 3
- assert(counters.assert_get("a")== 3);
+ ASSERT_EQ(counters.assert_get("a"), 3);
dumpDb(db);
counters.assert_add("b", i);
sum += i;
}
- assert(counters.assert_get("b") == sum);
+ ASSERT_EQ(counters.assert_get("b"), sum);
dumpDb(db);
if (test_compaction) {
- db->Flush(o);
+ ASSERT_OK(db->Flush(o));
- db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
dumpDb(db);
- assert(counters.assert_get("a")== 3);
- assert(counters.assert_get("b") == sum);
+ ASSERT_EQ(counters.assert_get("a"), 3);
+ ASSERT_EQ(counters.assert_get("b"), sum);
}
}
void testSuccessiveMerge(Counters& counters, size_t max_num_merges,
size_t num_merges) {
-
counters.assert_remove("z");
uint64_t sum = 0;
sum += i;
if (i % (max_num_merges + 1) == 0) {
- assert(num_merge_operator_calls == max_num_merges + 1);
+ ASSERT_EQ(num_merge_operator_calls, max_num_merges + 1);
} else {
- assert(num_merge_operator_calls == 0);
+ ASSERT_EQ(num_merge_operator_calls, 0);
}
resetNumMergeOperatorCalls();
- assert(counters.assert_get("z") == sum);
- assert(num_merge_operator_calls == i % (max_num_merges + 1));
+ ASSERT_EQ(counters.assert_get("z"), sum);
+ ASSERT_EQ(num_merge_operator_calls, i % (max_num_merges + 1));
}
}
counters->assert_add("b", i);
tmp_sum += i;
}
- db->Flush(o);
- db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ ASSERT_OK(db->Flush(o));
+ ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ(tmp_sum, counters->assert_get("b"));
if (count > max_merge) {
// in this case, FullMerge should be called instead.
// Test case 2: partial merge should not be called when a put is found.
resetNumPartialMergeCalls();
tmp_sum = 0;
- db->Put(ROCKSDB_NAMESPACE::WriteOptions(), "c", "10");
+ ASSERT_OK(db->Put(ROCKSDB_NAMESPACE::WriteOptions(), "c", "10"));
for (size_t i = 1; i <= count; i++) {
counters->assert_add("c", i);
tmp_sum += i;
}
- db->Flush(o);
- db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ ASSERT_OK(db->Flush(o));
+ ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ(tmp_sum, counters->assert_get("c"));
ASSERT_EQ(num_partial_merge_calls, 0U);
+ // NowNanos was previously called in MergeHelper::FilterMerge(), which
+ // harmed performance.
+ ASSERT_EQ(EnvMergeTest::now_nanos_count_, 0U);
}
void testSingleBatchSuccessiveMerge(DB* db, size_t max_num_merges,
size_t num_merges) {
- assert(num_merges > max_num_merges);
+ ASSERT_GT(num_merges, max_num_merges);
Slice key("BatchSuccessiveMerge");
uint64_t merge_value = 1;
// Create the batch
WriteBatch batch;
for (size_t i = 0; i < num_merges; ++i) {
- batch.Merge(key, merge_value_slice);
+ ASSERT_OK(batch.Merge(key, merge_value_slice));
}
// Apply to memtable and count the number of merges
resetNumMergeOperatorCalls();
- {
- Status s = db->Write(WriteOptions(), &batch);
- assert(s.ok());
- }
+ ASSERT_OK(db->Write(WriteOptions(), &batch));
ASSERT_EQ(
num_merge_operator_calls,
static_cast<size_t>(num_merges - (num_merges % (max_num_merges + 1))));
// Get the value
resetNumMergeOperatorCalls();
std::string get_value_str;
- {
- Status s = db->Get(ReadOptions(), key, &get_value_str);
- assert(s.ok());
- }
+ ASSERT_OK(db->Get(ReadOptions(), key, &get_value_str));
assert(get_value_str.size() == sizeof(uint64_t));
uint64_t get_value = DecodeFixed64(&get_value_str[0]);
ASSERT_EQ(get_value, num_merges * merge_value);
}
void runTest(const std::string& dbname, const bool use_ttl = false) {
-
{
auto db = OpenDb(dbname, use_ttl);
}
}
- DestroyDB(dbname, Options());
+ ASSERT_OK(DestroyDB(dbname, Options()));
{
size_t max_merge = 5;
testCounters(counters, db.get(), use_compression);
testSuccessiveMerge(counters, max_merge, max_merge * 2);
testSingleBatchSuccessiveMerge(db.get(), 5, 7);
- DestroyDB(dbname, Options());
+ ASSERT_OK(db->Close());
+ ASSERT_OK(DestroyDB(dbname, Options()));
}
{
auto db = OpenDb(dbname, use_ttl, max_merge);
MergeBasedCounters counters(db, 0);
testPartialMerge(&counters, db.get(), max_merge, min_merge, count);
- DestroyDB(dbname, Options());
+ ASSERT_OK(db->Close());
+ ASSERT_OK(DestroyDB(dbname, Options()));
}
{
auto db = OpenDb(dbname, use_ttl, max_merge);
MergeBasedCounters counters(db, 0);
testPartialMerge(&counters, db.get(), max_merge, min_merge,
min_merge * 10);
- DestroyDB(dbname, Options());
+ ASSERT_OK(db->Close());
+ ASSERT_OK(DestroyDB(dbname, Options()));
}
}
counters.add("test-key", 1);
counters.add("test-key", 1);
counters.add("test-key", 1);
- db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
}
DB* reopen_db;
ASSERT_OK(DB::Open(Options(), dbname, &reopen_db));
std::string value;
- ASSERT_TRUE(!(reopen_db->Get(ReadOptions(), "test-key", &value).ok()));
+ ASSERT_NOK(reopen_db->Get(ReadOptions(), "test-key", &value));
delete reopen_db;
- DestroyDB(dbname, Options());
+ ASSERT_OK(DestroyDB(dbname, Options()));
}
/* Temporary remove this test
testCountersWithFlushAndCompaction(counters, db.get());
}
}
- DestroyDB(dbname, Options());
+ ASSERT_OK(DestroyDB(dbname, Options()));
}
#endif // !ROCKSDB_LITE