]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | |
6 | #include <memory> | |
7 | #include <string> | |
8 | ||
9 | #include "db/db_test_util.h" | |
10 | #include "db/memtable.h" | |
494da23a | 11 | #include "db/range_del_aggregator.h" |
7c673cae FG |
12 | #include "port/stack_trace.h" |
13 | #include "rocksdb/memtablerep.h" | |
14 | #include "rocksdb/slice_transform.h" | |
15 | ||
f67539c2 | 16 | namespace ROCKSDB_NAMESPACE { |
7c673cae FG |
17 | |
18 | class DBMemTableTest : public DBTestBase { | |
19 | public: | |
1e59de90 | 20 | DBMemTableTest() : DBTestBase("db_memtable_test", /*env_do_fsync=*/true) {} |
7c673cae FG |
21 | }; |
22 | ||
23 | class MockMemTableRep : public MemTableRep { | |
24 | public: | |
11fdf7f2 | 25 | explicit MockMemTableRep(Allocator* allocator, MemTableRep* rep) |
7c673cae FG |
26 | : MemTableRep(allocator), rep_(rep), num_insert_with_hint_(0) {} |
27 | ||
494da23a | 28 | KeyHandle Allocate(const size_t len, char** buf) override { |
7c673cae FG |
29 | return rep_->Allocate(len, buf); |
30 | } | |
31 | ||
494da23a | 32 | void Insert(KeyHandle handle) override { rep_->Insert(handle); } |
7c673cae | 33 | |
494da23a | 34 | void InsertWithHint(KeyHandle handle, void** hint) override { |
7c673cae | 35 | num_insert_with_hint_++; |
11fdf7f2 | 36 | EXPECT_NE(nullptr, hint); |
7c673cae FG |
37 | last_hint_in_ = *hint; |
38 | rep_->InsertWithHint(handle, hint); | |
39 | last_hint_out_ = *hint; | |
40 | } | |
41 | ||
494da23a | 42 | bool Contains(const char* key) const override { return rep_->Contains(key); } |
7c673cae | 43 | |
494da23a TL |
44 | void Get(const LookupKey& k, void* callback_args, |
45 | bool (*callback_func)(void* arg, const char* entry)) override { | |
7c673cae FG |
46 | rep_->Get(k, callback_args, callback_func); |
47 | } | |
48 | ||
494da23a | 49 | size_t ApproximateMemoryUsage() override { |
7c673cae FG |
50 | return rep_->ApproximateMemoryUsage(); |
51 | } | |
52 | ||
494da23a | 53 | Iterator* GetIterator(Arena* arena) override { |
7c673cae FG |
54 | return rep_->GetIterator(arena); |
55 | } | |
56 | ||
57 | void* last_hint_in() { return last_hint_in_; } | |
58 | void* last_hint_out() { return last_hint_out_; } | |
59 | int num_insert_with_hint() { return num_insert_with_hint_; } | |
60 | ||
61 | private: | |
62 | std::unique_ptr<MemTableRep> rep_; | |
63 | void* last_hint_in_; | |
64 | void* last_hint_out_; | |
65 | int num_insert_with_hint_; | |
66 | }; | |
67 | ||
68 | class MockMemTableRepFactory : public MemTableRepFactory { | |
69 | public: | |
494da23a TL |
70 | MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator& cmp, |
71 | Allocator* allocator, | |
72 | const SliceTransform* transform, | |
73 | Logger* logger) override { | |
7c673cae FG |
74 | SkipListFactory factory; |
75 | MemTableRep* skiplist_rep = | |
76 | factory.CreateMemTableRep(cmp, allocator, transform, logger); | |
77 | mock_rep_ = new MockMemTableRep(allocator, skiplist_rep); | |
78 | return mock_rep_; | |
79 | } | |
80 | ||
494da23a TL |
81 | MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator& cmp, |
82 | Allocator* allocator, | |
83 | const SliceTransform* transform, | |
84 | Logger* logger, | |
85 | uint32_t column_family_id) override { | |
11fdf7f2 TL |
86 | last_column_family_id_ = column_family_id; |
87 | return CreateMemTableRep(cmp, allocator, transform, logger); | |
88 | } | |
89 | ||
494da23a | 90 | const char* Name() const override { return "MockMemTableRepFactory"; } |
7c673cae FG |
91 | |
92 | MockMemTableRep* rep() { return mock_rep_; } | |
93 | ||
94 | bool IsInsertConcurrentlySupported() const override { return false; } | |
95 | ||
11fdf7f2 TL |
96 | uint32_t GetLastColumnFamilyId() { return last_column_family_id_; } |
97 | ||
7c673cae FG |
98 | private: |
99 | MockMemTableRep* mock_rep_; | |
1e59de90 | 100 | // workaround since there's no std::numeric_limits<uint32_t>::max() yet. |
11fdf7f2 | 101 | uint32_t last_column_family_id_ = static_cast<uint32_t>(-1); |
7c673cae FG |
102 | }; |
103 | ||
104 | class TestPrefixExtractor : public SliceTransform { | |
105 | public: | |
494da23a | 106 | const char* Name() const override { return "TestPrefixExtractor"; } |
7c673cae | 107 | |
494da23a | 108 | Slice Transform(const Slice& key) const override { |
7c673cae FG |
109 | const char* p = separator(key); |
110 | if (p == nullptr) { | |
111 | return Slice(); | |
112 | } | |
113 | return Slice(key.data(), p - key.data() + 1); | |
114 | } | |
115 | ||
494da23a | 116 | bool InDomain(const Slice& key) const override { |
7c673cae FG |
117 | return separator(key) != nullptr; |
118 | } | |
119 | ||
494da23a | 120 | bool InRange(const Slice& /*key*/) const override { return false; } |
7c673cae FG |
121 | |
122 | private: | |
123 | const char* separator(const Slice& key) const { | |
124 | return reinterpret_cast<const char*>(memchr(key.data(), '_', key.size())); | |
125 | } | |
126 | }; | |
127 | ||
11fdf7f2 TL |
128 | // Test that ::Add properly returns false when inserting duplicate keys |
129 | TEST_F(DBMemTableTest, DuplicateSeq) { | |
130 | SequenceNumber seq = 123; | |
131 | std::string value; | |
11fdf7f2 TL |
132 | MergeContext merge_context; |
133 | Options options; | |
134 | InternalKeyComparator ikey_cmp(options.comparator); | |
494da23a TL |
135 | ReadRangeDelAggregator range_del_agg(&ikey_cmp, |
136 | kMaxSequenceNumber /* upper_bound */); | |
11fdf7f2 TL |
137 | |
138 | // Create a MemTable | |
139 | InternalKeyComparator cmp(BytewiseComparator()); | |
140 | auto factory = std::make_shared<SkipListFactory>(); | |
141 | options.memtable_factory = factory; | |
1e59de90 | 142 | ImmutableOptions ioptions(options); |
11fdf7f2 TL |
143 | WriteBufferManager wb(options.db_write_buffer_size); |
144 | MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb, | |
145 | kMaxSequenceNumber, 0 /* column_family_id */); | |
146 | ||
147 | // Write some keys and make sure it returns false on duplicates | |
1e59de90 TL |
148 | ASSERT_OK( |
149 | mem->Add(seq, kTypeValue, "key", "value2", nullptr /* kv_prot_info */)); | |
150 | ASSERT_TRUE( | |
151 | mem->Add(seq, kTypeValue, "key", "value2", nullptr /* kv_prot_info */) | |
152 | .IsTryAgain()); | |
11fdf7f2 | 153 | // Changing the type should still cause the duplicatae key |
1e59de90 TL |
154 | ASSERT_TRUE( |
155 | mem->Add(seq, kTypeMerge, "key", "value2", nullptr /* kv_prot_info */) | |
156 | .IsTryAgain()); | |
11fdf7f2 | 157 | // Changing the seq number will make the key fresh |
1e59de90 TL |
158 | ASSERT_OK(mem->Add(seq + 1, kTypeMerge, "key", "value2", |
159 | nullptr /* kv_prot_info */)); | |
11fdf7f2 | 160 | // Test with different types for duplicate keys |
1e59de90 TL |
161 | ASSERT_TRUE( |
162 | mem->Add(seq, kTypeDeletion, "key", "", nullptr /* kv_prot_info */) | |
163 | .IsTryAgain()); | |
164 | ASSERT_TRUE( | |
165 | mem->Add(seq, kTypeSingleDeletion, "key", "", nullptr /* kv_prot_info */) | |
166 | .IsTryAgain()); | |
11fdf7f2 TL |
167 | |
168 | // Test the duplicate keys under stress | |
169 | for (int i = 0; i < 10000; i++) { | |
170 | bool insert_dup = i % 10 == 1; | |
171 | if (!insert_dup) { | |
172 | seq++; | |
173 | } | |
1e59de90 TL |
174 | Status s = mem->Add(seq, kTypeValue, "foo", "value" + std::to_string(seq), |
175 | nullptr /* kv_prot_info */); | |
11fdf7f2 | 176 | if (insert_dup) { |
1e59de90 | 177 | ASSERT_TRUE(s.IsTryAgain()); |
11fdf7f2 | 178 | } else { |
1e59de90 | 179 | ASSERT_OK(s); |
11fdf7f2 TL |
180 | } |
181 | } | |
182 | delete mem; | |
183 | ||
184 | // Test with InsertWithHint | |
185 | options.memtable_insert_with_hint_prefix_extractor.reset( | |
186 | new TestPrefixExtractor()); // which uses _ to extract the prefix | |
1e59de90 | 187 | ioptions = ImmutableOptions(options); |
11fdf7f2 TL |
188 | mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb, |
189 | kMaxSequenceNumber, 0 /* column_family_id */); | |
190 | // Insert a duplicate key with _ in it | |
1e59de90 TL |
191 | ASSERT_OK( |
192 | mem->Add(seq, kTypeValue, "key_1", "value", nullptr /* kv_prot_info */)); | |
193 | ASSERT_TRUE( | |
194 | mem->Add(seq, kTypeValue, "key_1", "value", nullptr /* kv_prot_info */) | |
195 | .IsTryAgain()); | |
11fdf7f2 TL |
196 | delete mem; |
197 | ||
198 | // Test when InsertConcurrently will be invoked | |
199 | options.allow_concurrent_memtable_write = true; | |
1e59de90 | 200 | ioptions = ImmutableOptions(options); |
11fdf7f2 TL |
201 | mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb, |
202 | kMaxSequenceNumber, 0 /* column_family_id */); | |
203 | MemTablePostProcessInfo post_process_info; | |
1e59de90 TL |
204 | ASSERT_OK(mem->Add(seq, kTypeValue, "key", "value", |
205 | nullptr /* kv_prot_info */, true, &post_process_info)); | |
206 | ASSERT_TRUE(mem->Add(seq, kTypeValue, "key", "value", | |
207 | nullptr /* kv_prot_info */, true, &post_process_info) | |
208 | .IsTryAgain()); | |
11fdf7f2 TL |
209 | delete mem; |
210 | } | |
211 | ||
f67539c2 TL |
212 | // A simple test to verify that the concurrent merge writes is functional |
213 | TEST_F(DBMemTableTest, ConcurrentMergeWrite) { | |
214 | int num_ops = 1000; | |
215 | std::string value; | |
f67539c2 TL |
216 | MergeContext merge_context; |
217 | Options options; | |
218 | // A merge operator that is not sensitive to concurrent writes since in this | |
219 | // test we don't order the writes. | |
220 | options.merge_operator = MergeOperators::CreateUInt64AddOperator(); | |
221 | ||
222 | // Create a MemTable | |
223 | InternalKeyComparator cmp(BytewiseComparator()); | |
224 | auto factory = std::make_shared<SkipListFactory>(); | |
225 | options.memtable_factory = factory; | |
226 | options.allow_concurrent_memtable_write = true; | |
1e59de90 | 227 | ImmutableOptions ioptions(options); |
f67539c2 TL |
228 | WriteBufferManager wb(options.db_write_buffer_size); |
229 | MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb, | |
230 | kMaxSequenceNumber, 0 /* column_family_id */); | |
231 | ||
232 | // Put 0 as the base | |
233 | PutFixed64(&value, static_cast<uint64_t>(0)); | |
1e59de90 | 234 | ASSERT_OK(mem->Add(0, kTypeValue, "key", value, nullptr /* kv_prot_info */)); |
f67539c2 TL |
235 | value.clear(); |
236 | ||
237 | // Write Merge concurrently | |
238 | ROCKSDB_NAMESPACE::port::Thread write_thread1([&]() { | |
239 | MemTablePostProcessInfo post_process_info1; | |
240 | std::string v1; | |
241 | for (int seq = 1; seq < num_ops / 2; seq++) { | |
242 | PutFixed64(&v1, seq); | |
1e59de90 TL |
243 | ASSERT_OK(mem->Add(seq, kTypeMerge, "key", v1, nullptr /* kv_prot_info */, |
244 | true, &post_process_info1)); | |
f67539c2 TL |
245 | v1.clear(); |
246 | } | |
247 | }); | |
248 | ROCKSDB_NAMESPACE::port::Thread write_thread2([&]() { | |
249 | MemTablePostProcessInfo post_process_info2; | |
250 | std::string v2; | |
251 | for (int seq = num_ops / 2; seq < num_ops; seq++) { | |
252 | PutFixed64(&v2, seq); | |
1e59de90 TL |
253 | ASSERT_OK(mem->Add(seq, kTypeMerge, "key", v2, nullptr /* kv_prot_info */, |
254 | true, &post_process_info2)); | |
f67539c2 TL |
255 | v2.clear(); |
256 | } | |
257 | }); | |
258 | write_thread1.join(); | |
259 | write_thread2.join(); | |
260 | ||
261 | Status status; | |
262 | ReadOptions roptions; | |
263 | SequenceNumber max_covering_tombstone_seq = 0; | |
264 | LookupKey lkey("key", kMaxSequenceNumber); | |
1e59de90 TL |
265 | bool res = mem->Get(lkey, &value, /*columns=*/nullptr, /*timestamp=*/nullptr, |
266 | &status, &merge_context, &max_covering_tombstone_seq, | |
267 | roptions, false /* immutable_memtable */); | |
268 | ASSERT_OK(status); | |
f67539c2 TL |
269 | ASSERT_TRUE(res); |
270 | uint64_t ivalue = DecodeFixed64(Slice(value).data()); | |
271 | uint64_t sum = 0; | |
272 | for (int seq = 0; seq < num_ops; seq++) { | |
273 | sum += seq; | |
274 | } | |
275 | ASSERT_EQ(ivalue, sum); | |
276 | ||
277 | delete mem; | |
278 | } | |
279 | ||
7c673cae FG |
280 | TEST_F(DBMemTableTest, InsertWithHint) { |
281 | Options options; | |
282 | options.allow_concurrent_memtable_write = false; | |
283 | options.create_if_missing = true; | |
284 | options.memtable_factory.reset(new MockMemTableRepFactory()); | |
285 | options.memtable_insert_with_hint_prefix_extractor.reset( | |
286 | new TestPrefixExtractor()); | |
287 | options.env = env_; | |
288 | Reopen(options); | |
289 | MockMemTableRep* rep = | |
290 | reinterpret_cast<MockMemTableRepFactory*>(options.memtable_factory.get()) | |
291 | ->rep(); | |
292 | ASSERT_OK(Put("foo_k1", "foo_v1")); | |
293 | ASSERT_EQ(nullptr, rep->last_hint_in()); | |
294 | void* hint_foo = rep->last_hint_out(); | |
295 | ASSERT_OK(Put("foo_k2", "foo_v2")); | |
296 | ASSERT_EQ(hint_foo, rep->last_hint_in()); | |
297 | ASSERT_EQ(hint_foo, rep->last_hint_out()); | |
298 | ASSERT_OK(Put("foo_k3", "foo_v3")); | |
299 | ASSERT_EQ(hint_foo, rep->last_hint_in()); | |
300 | ASSERT_EQ(hint_foo, rep->last_hint_out()); | |
301 | ASSERT_OK(Put("bar_k1", "bar_v1")); | |
302 | ASSERT_EQ(nullptr, rep->last_hint_in()); | |
303 | void* hint_bar = rep->last_hint_out(); | |
304 | ASSERT_NE(hint_foo, hint_bar); | |
305 | ASSERT_OK(Put("bar_k2", "bar_v2")); | |
306 | ASSERT_EQ(hint_bar, rep->last_hint_in()); | |
307 | ASSERT_EQ(hint_bar, rep->last_hint_out()); | |
308 | ASSERT_EQ(5, rep->num_insert_with_hint()); | |
20effc67 | 309 | ASSERT_OK(Put("NotInPrefixDomain", "vvv")); |
7c673cae FG |
310 | ASSERT_EQ(5, rep->num_insert_with_hint()); |
311 | ASSERT_EQ("foo_v1", Get("foo_k1")); | |
312 | ASSERT_EQ("foo_v2", Get("foo_k2")); | |
313 | ASSERT_EQ("foo_v3", Get("foo_k3")); | |
314 | ASSERT_EQ("bar_v1", Get("bar_k1")); | |
315 | ASSERT_EQ("bar_v2", Get("bar_k2")); | |
20effc67 | 316 | ASSERT_EQ("vvv", Get("NotInPrefixDomain")); |
7c673cae FG |
317 | } |
318 | ||
11fdf7f2 TL |
319 | TEST_F(DBMemTableTest, ColumnFamilyId) { |
320 | // Verifies MemTableRepFactory is told the right column family id. | |
321 | Options options; | |
20effc67 | 322 | options.env = CurrentOptions().env; |
11fdf7f2 TL |
323 | options.allow_concurrent_memtable_write = false; |
324 | options.create_if_missing = true; | |
325 | options.memtable_factory.reset(new MockMemTableRepFactory()); | |
326 | DestroyAndReopen(options); | |
327 | CreateAndReopenWithCF({"pikachu"}, options); | |
328 | ||
f67539c2 | 329 | for (uint32_t cf = 0; cf < 2; ++cf) { |
11fdf7f2 TL |
330 | ASSERT_OK(Put(cf, "key", "val")); |
331 | ASSERT_OK(Flush(cf)); | |
332 | ASSERT_EQ( | |
333 | cf, static_cast<MockMemTableRepFactory*>(options.memtable_factory.get()) | |
334 | ->GetLastColumnFamilyId()); | |
335 | } | |
336 | } | |
337 | ||
f67539c2 | 338 | } // namespace ROCKSDB_NAMESPACE |
7c673cae FG |
339 | |
340 | int main(int argc, char** argv) { | |
f67539c2 | 341 | ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); |
7c673cae FG |
342 | ::testing::InitGoogleTest(&argc, argv); |
343 | return RUN_ALL_TESTS(); | |
344 | } |