]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/db_memtable_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / db_memtable_test.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5
6#include <memory>
7#include <string>
8
9#include "db/db_test_util.h"
10#include "db/memtable.h"
494da23a 11#include "db/range_del_aggregator.h"
7c673cae
FG
12#include "port/stack_trace.h"
13#include "rocksdb/memtablerep.h"
14#include "rocksdb/slice_transform.h"
15
f67539c2 16namespace ROCKSDB_NAMESPACE {
7c673cae
FG
17
18class DBMemTableTest : public DBTestBase {
19 public:
1e59de90 20 DBMemTableTest() : DBTestBase("db_memtable_test", /*env_do_fsync=*/true) {}
7c673cae
FG
21};
22
23class MockMemTableRep : public MemTableRep {
24 public:
11fdf7f2 25 explicit MockMemTableRep(Allocator* allocator, MemTableRep* rep)
7c673cae
FG
26 : MemTableRep(allocator), rep_(rep), num_insert_with_hint_(0) {}
27
494da23a 28 KeyHandle Allocate(const size_t len, char** buf) override {
7c673cae
FG
29 return rep_->Allocate(len, buf);
30 }
31
494da23a 32 void Insert(KeyHandle handle) override { rep_->Insert(handle); }
7c673cae 33
494da23a 34 void InsertWithHint(KeyHandle handle, void** hint) override {
7c673cae 35 num_insert_with_hint_++;
11fdf7f2 36 EXPECT_NE(nullptr, hint);
7c673cae
FG
37 last_hint_in_ = *hint;
38 rep_->InsertWithHint(handle, hint);
39 last_hint_out_ = *hint;
40 }
41
494da23a 42 bool Contains(const char* key) const override { return rep_->Contains(key); }
7c673cae 43
494da23a
TL
44 void Get(const LookupKey& k, void* callback_args,
45 bool (*callback_func)(void* arg, const char* entry)) override {
7c673cae
FG
46 rep_->Get(k, callback_args, callback_func);
47 }
48
494da23a 49 size_t ApproximateMemoryUsage() override {
7c673cae
FG
50 return rep_->ApproximateMemoryUsage();
51 }
52
494da23a 53 Iterator* GetIterator(Arena* arena) override {
7c673cae
FG
54 return rep_->GetIterator(arena);
55 }
56
57 void* last_hint_in() { return last_hint_in_; }
58 void* last_hint_out() { return last_hint_out_; }
59 int num_insert_with_hint() { return num_insert_with_hint_; }
60
61 private:
62 std::unique_ptr<MemTableRep> rep_;
63 void* last_hint_in_;
64 void* last_hint_out_;
65 int num_insert_with_hint_;
66};
67
68class MockMemTableRepFactory : public MemTableRepFactory {
69 public:
494da23a
TL
70 MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator& cmp,
71 Allocator* allocator,
72 const SliceTransform* transform,
73 Logger* logger) override {
7c673cae
FG
74 SkipListFactory factory;
75 MemTableRep* skiplist_rep =
76 factory.CreateMemTableRep(cmp, allocator, transform, logger);
77 mock_rep_ = new MockMemTableRep(allocator, skiplist_rep);
78 return mock_rep_;
79 }
80
494da23a
TL
81 MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator& cmp,
82 Allocator* allocator,
83 const SliceTransform* transform,
84 Logger* logger,
85 uint32_t column_family_id) override {
11fdf7f2
TL
86 last_column_family_id_ = column_family_id;
87 return CreateMemTableRep(cmp, allocator, transform, logger);
88 }
89
494da23a 90 const char* Name() const override { return "MockMemTableRepFactory"; }
7c673cae
FG
91
92 MockMemTableRep* rep() { return mock_rep_; }
93
94 bool IsInsertConcurrentlySupported() const override { return false; }
95
11fdf7f2
TL
96 uint32_t GetLastColumnFamilyId() { return last_column_family_id_; }
97
7c673cae
FG
98 private:
99 MockMemTableRep* mock_rep_;
1e59de90 100 // workaround since there's no std::numeric_limits<uint32_t>::max() yet.
11fdf7f2 101 uint32_t last_column_family_id_ = static_cast<uint32_t>(-1);
7c673cae
FG
102};
103
104class TestPrefixExtractor : public SliceTransform {
105 public:
494da23a 106 const char* Name() const override { return "TestPrefixExtractor"; }
7c673cae 107
494da23a 108 Slice Transform(const Slice& key) const override {
7c673cae
FG
109 const char* p = separator(key);
110 if (p == nullptr) {
111 return Slice();
112 }
113 return Slice(key.data(), p - key.data() + 1);
114 }
115
494da23a 116 bool InDomain(const Slice& key) const override {
7c673cae
FG
117 return separator(key) != nullptr;
118 }
119
494da23a 120 bool InRange(const Slice& /*key*/) const override { return false; }
7c673cae
FG
121
122 private:
123 const char* separator(const Slice& key) const {
124 return reinterpret_cast<const char*>(memchr(key.data(), '_', key.size()));
125 }
126};
127
11fdf7f2
TL
128// Test that ::Add properly returns false when inserting duplicate keys
129TEST_F(DBMemTableTest, DuplicateSeq) {
130 SequenceNumber seq = 123;
131 std::string value;
11fdf7f2
TL
132 MergeContext merge_context;
133 Options options;
134 InternalKeyComparator ikey_cmp(options.comparator);
494da23a
TL
135 ReadRangeDelAggregator range_del_agg(&ikey_cmp,
136 kMaxSequenceNumber /* upper_bound */);
11fdf7f2
TL
137
138 // Create a MemTable
139 InternalKeyComparator cmp(BytewiseComparator());
140 auto factory = std::make_shared<SkipListFactory>();
141 options.memtable_factory = factory;
1e59de90 142 ImmutableOptions ioptions(options);
11fdf7f2
TL
143 WriteBufferManager wb(options.db_write_buffer_size);
144 MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
145 kMaxSequenceNumber, 0 /* column_family_id */);
146
147 // Write some keys and make sure it returns false on duplicates
1e59de90
TL
148 ASSERT_OK(
149 mem->Add(seq, kTypeValue, "key", "value2", nullptr /* kv_prot_info */));
150 ASSERT_TRUE(
151 mem->Add(seq, kTypeValue, "key", "value2", nullptr /* kv_prot_info */)
152 .IsTryAgain());
11fdf7f2 153 // Changing the type should still cause the duplicatae key
1e59de90
TL
154 ASSERT_TRUE(
155 mem->Add(seq, kTypeMerge, "key", "value2", nullptr /* kv_prot_info */)
156 .IsTryAgain());
11fdf7f2 157 // Changing the seq number will make the key fresh
1e59de90
TL
158 ASSERT_OK(mem->Add(seq + 1, kTypeMerge, "key", "value2",
159 nullptr /* kv_prot_info */));
11fdf7f2 160 // Test with different types for duplicate keys
1e59de90
TL
161 ASSERT_TRUE(
162 mem->Add(seq, kTypeDeletion, "key", "", nullptr /* kv_prot_info */)
163 .IsTryAgain());
164 ASSERT_TRUE(
165 mem->Add(seq, kTypeSingleDeletion, "key", "", nullptr /* kv_prot_info */)
166 .IsTryAgain());
11fdf7f2
TL
167
168 // Test the duplicate keys under stress
169 for (int i = 0; i < 10000; i++) {
170 bool insert_dup = i % 10 == 1;
171 if (!insert_dup) {
172 seq++;
173 }
1e59de90
TL
174 Status s = mem->Add(seq, kTypeValue, "foo", "value" + std::to_string(seq),
175 nullptr /* kv_prot_info */);
11fdf7f2 176 if (insert_dup) {
1e59de90 177 ASSERT_TRUE(s.IsTryAgain());
11fdf7f2 178 } else {
1e59de90 179 ASSERT_OK(s);
11fdf7f2
TL
180 }
181 }
182 delete mem;
183
184 // Test with InsertWithHint
185 options.memtable_insert_with_hint_prefix_extractor.reset(
186 new TestPrefixExtractor()); // which uses _ to extract the prefix
1e59de90 187 ioptions = ImmutableOptions(options);
11fdf7f2
TL
188 mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
189 kMaxSequenceNumber, 0 /* column_family_id */);
190 // Insert a duplicate key with _ in it
1e59de90
TL
191 ASSERT_OK(
192 mem->Add(seq, kTypeValue, "key_1", "value", nullptr /* kv_prot_info */));
193 ASSERT_TRUE(
194 mem->Add(seq, kTypeValue, "key_1", "value", nullptr /* kv_prot_info */)
195 .IsTryAgain());
11fdf7f2
TL
196 delete mem;
197
198 // Test when InsertConcurrently will be invoked
199 options.allow_concurrent_memtable_write = true;
1e59de90 200 ioptions = ImmutableOptions(options);
11fdf7f2
TL
201 mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
202 kMaxSequenceNumber, 0 /* column_family_id */);
203 MemTablePostProcessInfo post_process_info;
1e59de90
TL
204 ASSERT_OK(mem->Add(seq, kTypeValue, "key", "value",
205 nullptr /* kv_prot_info */, true, &post_process_info));
206 ASSERT_TRUE(mem->Add(seq, kTypeValue, "key", "value",
207 nullptr /* kv_prot_info */, true, &post_process_info)
208 .IsTryAgain());
11fdf7f2
TL
209 delete mem;
210}
211
f67539c2
TL
212// A simple test to verify that the concurrent merge writes is functional
213TEST_F(DBMemTableTest, ConcurrentMergeWrite) {
214 int num_ops = 1000;
215 std::string value;
f67539c2
TL
216 MergeContext merge_context;
217 Options options;
218 // A merge operator that is not sensitive to concurrent writes since in this
219 // test we don't order the writes.
220 options.merge_operator = MergeOperators::CreateUInt64AddOperator();
221
222 // Create a MemTable
223 InternalKeyComparator cmp(BytewiseComparator());
224 auto factory = std::make_shared<SkipListFactory>();
225 options.memtable_factory = factory;
226 options.allow_concurrent_memtable_write = true;
1e59de90 227 ImmutableOptions ioptions(options);
f67539c2
TL
228 WriteBufferManager wb(options.db_write_buffer_size);
229 MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
230 kMaxSequenceNumber, 0 /* column_family_id */);
231
232 // Put 0 as the base
233 PutFixed64(&value, static_cast<uint64_t>(0));
1e59de90 234 ASSERT_OK(mem->Add(0, kTypeValue, "key", value, nullptr /* kv_prot_info */));
f67539c2
TL
235 value.clear();
236
237 // Write Merge concurrently
238 ROCKSDB_NAMESPACE::port::Thread write_thread1([&]() {
239 MemTablePostProcessInfo post_process_info1;
240 std::string v1;
241 for (int seq = 1; seq < num_ops / 2; seq++) {
242 PutFixed64(&v1, seq);
1e59de90
TL
243 ASSERT_OK(mem->Add(seq, kTypeMerge, "key", v1, nullptr /* kv_prot_info */,
244 true, &post_process_info1));
f67539c2
TL
245 v1.clear();
246 }
247 });
248 ROCKSDB_NAMESPACE::port::Thread write_thread2([&]() {
249 MemTablePostProcessInfo post_process_info2;
250 std::string v2;
251 for (int seq = num_ops / 2; seq < num_ops; seq++) {
252 PutFixed64(&v2, seq);
1e59de90
TL
253 ASSERT_OK(mem->Add(seq, kTypeMerge, "key", v2, nullptr /* kv_prot_info */,
254 true, &post_process_info2));
f67539c2
TL
255 v2.clear();
256 }
257 });
258 write_thread1.join();
259 write_thread2.join();
260
261 Status status;
262 ReadOptions roptions;
263 SequenceNumber max_covering_tombstone_seq = 0;
264 LookupKey lkey("key", kMaxSequenceNumber);
1e59de90
TL
265 bool res = mem->Get(lkey, &value, /*columns=*/nullptr, /*timestamp=*/nullptr,
266 &status, &merge_context, &max_covering_tombstone_seq,
267 roptions, false /* immutable_memtable */);
268 ASSERT_OK(status);
f67539c2
TL
269 ASSERT_TRUE(res);
270 uint64_t ivalue = DecodeFixed64(Slice(value).data());
271 uint64_t sum = 0;
272 for (int seq = 0; seq < num_ops; seq++) {
273 sum += seq;
274 }
275 ASSERT_EQ(ivalue, sum);
276
277 delete mem;
278}
279
7c673cae
FG
280TEST_F(DBMemTableTest, InsertWithHint) {
281 Options options;
282 options.allow_concurrent_memtable_write = false;
283 options.create_if_missing = true;
284 options.memtable_factory.reset(new MockMemTableRepFactory());
285 options.memtable_insert_with_hint_prefix_extractor.reset(
286 new TestPrefixExtractor());
287 options.env = env_;
288 Reopen(options);
289 MockMemTableRep* rep =
290 reinterpret_cast<MockMemTableRepFactory*>(options.memtable_factory.get())
291 ->rep();
292 ASSERT_OK(Put("foo_k1", "foo_v1"));
293 ASSERT_EQ(nullptr, rep->last_hint_in());
294 void* hint_foo = rep->last_hint_out();
295 ASSERT_OK(Put("foo_k2", "foo_v2"));
296 ASSERT_EQ(hint_foo, rep->last_hint_in());
297 ASSERT_EQ(hint_foo, rep->last_hint_out());
298 ASSERT_OK(Put("foo_k3", "foo_v3"));
299 ASSERT_EQ(hint_foo, rep->last_hint_in());
300 ASSERT_EQ(hint_foo, rep->last_hint_out());
301 ASSERT_OK(Put("bar_k1", "bar_v1"));
302 ASSERT_EQ(nullptr, rep->last_hint_in());
303 void* hint_bar = rep->last_hint_out();
304 ASSERT_NE(hint_foo, hint_bar);
305 ASSERT_OK(Put("bar_k2", "bar_v2"));
306 ASSERT_EQ(hint_bar, rep->last_hint_in());
307 ASSERT_EQ(hint_bar, rep->last_hint_out());
308 ASSERT_EQ(5, rep->num_insert_with_hint());
20effc67 309 ASSERT_OK(Put("NotInPrefixDomain", "vvv"));
7c673cae
FG
310 ASSERT_EQ(5, rep->num_insert_with_hint());
311 ASSERT_EQ("foo_v1", Get("foo_k1"));
312 ASSERT_EQ("foo_v2", Get("foo_k2"));
313 ASSERT_EQ("foo_v3", Get("foo_k3"));
314 ASSERT_EQ("bar_v1", Get("bar_k1"));
315 ASSERT_EQ("bar_v2", Get("bar_k2"));
20effc67 316 ASSERT_EQ("vvv", Get("NotInPrefixDomain"));
7c673cae
FG
317}
318
11fdf7f2
TL
319TEST_F(DBMemTableTest, ColumnFamilyId) {
320 // Verifies MemTableRepFactory is told the right column family id.
321 Options options;
20effc67 322 options.env = CurrentOptions().env;
11fdf7f2
TL
323 options.allow_concurrent_memtable_write = false;
324 options.create_if_missing = true;
325 options.memtable_factory.reset(new MockMemTableRepFactory());
326 DestroyAndReopen(options);
327 CreateAndReopenWithCF({"pikachu"}, options);
328
f67539c2 329 for (uint32_t cf = 0; cf < 2; ++cf) {
11fdf7f2
TL
330 ASSERT_OK(Put(cf, "key", "val"));
331 ASSERT_OK(Flush(cf));
332 ASSERT_EQ(
333 cf, static_cast<MockMemTableRepFactory*>(options.memtable_factory.get())
334 ->GetLastColumnFamilyId());
335 }
336}
337
f67539c2 338} // namespace ROCKSDB_NAMESPACE
7c673cae
FG
339
340int main(int argc, char** argv) {
f67539c2 341 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
7c673cae
FG
342 ::testing::InitGoogleTest(&argc, argv);
343 return RUN_ALL_TESTS();
344}