]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/memtable_list_test.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / memtable_list_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "db/memtable_list.h"
7 #include <algorithm>
8 #include <string>
9 #include <vector>
10 #include "db/merge_context.h"
11 #include "db/version_set.h"
12 #include "db/write_controller.h"
13 #include "rocksdb/db.h"
14 #include "rocksdb/status.h"
15 #include "rocksdb/write_buffer_manager.h"
16 #include "test_util/testharness.h"
17 #include "test_util/testutil.h"
18 #include "util/string_util.h"
19
20 namespace ROCKSDB_NAMESPACE {
21
22 class MemTableListTest : public testing::Test {
23 public:
24 std::string dbname;
25 DB* db;
26 Options options;
27 std::vector<ColumnFamilyHandle*> handles;
28 std::atomic<uint64_t> file_number;
29
30 MemTableListTest() : db(nullptr), file_number(1) {
31 dbname = test::PerThreadDBPath("memtable_list_test");
32 options.create_if_missing = true;
33 DestroyDB(dbname, options);
34 }
35
36 // Create a test db if not yet created
37 void CreateDB() {
38 if (db == nullptr) {
39 options.create_if_missing = true;
40 DestroyDB(dbname, options);
41 // Open DB only with default column family
42 ColumnFamilyOptions cf_options;
43 std::vector<ColumnFamilyDescriptor> cf_descs;
44 cf_descs.emplace_back(kDefaultColumnFamilyName, cf_options);
45 Status s = DB::Open(options, dbname, cf_descs, &handles, &db);
46 EXPECT_OK(s);
47
48 ColumnFamilyOptions cf_opt1, cf_opt2;
49 cf_opt1.cf_paths.emplace_back(dbname + "_one_1",
50 std::numeric_limits<uint64_t>::max());
51 cf_opt2.cf_paths.emplace_back(dbname + "_two_1",
52 std::numeric_limits<uint64_t>::max());
53 int sz = static_cast<int>(handles.size());
54 handles.resize(sz + 2);
55 s = db->CreateColumnFamily(cf_opt1, "one", &handles[1]);
56 EXPECT_OK(s);
57 s = db->CreateColumnFamily(cf_opt2, "two", &handles[2]);
58 EXPECT_OK(s);
59
60 cf_descs.emplace_back("one", cf_options);
61 cf_descs.emplace_back("two", cf_options);
62 }
63 }
64
65 ~MemTableListTest() override {
66 if (db) {
67 std::vector<ColumnFamilyDescriptor> cf_descs(handles.size());
68 for (int i = 0; i != static_cast<int>(handles.size()); ++i) {
69 handles[i]->GetDescriptor(&cf_descs[i]);
70 }
71 for (auto h : handles) {
72 if (h) {
73 db->DestroyColumnFamilyHandle(h);
74 }
75 }
76 handles.clear();
77 delete db;
78 db = nullptr;
79 DestroyDB(dbname, options, cf_descs);
80 }
81 }
82
83 // Calls MemTableList::TryInstallMemtableFlushResults() and sets up all
84 // structures needed to call this function.
85 Status Mock_InstallMemtableFlushResults(
86 MemTableList* list, const MutableCFOptions& mutable_cf_options,
87 const autovector<MemTable*>& m, autovector<MemTable*>* to_delete) {
88 // Create a mock Logger
89 test::NullLogger logger;
90 LogBuffer log_buffer(DEBUG_LEVEL, &logger);
91
92 CreateDB();
93 // Create a mock VersionSet
94 DBOptions db_options;
95 db_options.file_system = FileSystem::Default();
96 ImmutableDBOptions immutable_db_options(db_options);
97 EnvOptions env_options;
98 std::shared_ptr<Cache> table_cache(NewLRUCache(50000, 16));
99 WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
100 WriteController write_controller(10000000u);
101
102 VersionSet versions(dbname, &immutable_db_options, env_options,
103 table_cache.get(), &write_buffer_manager,
104 &write_controller, /*block_cache_tracer=*/nullptr);
105 std::vector<ColumnFamilyDescriptor> cf_descs;
106 cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
107 cf_descs.emplace_back("one", ColumnFamilyOptions());
108 cf_descs.emplace_back("two", ColumnFamilyOptions());
109
110 EXPECT_OK(versions.Recover(cf_descs, false));
111
112 // Create mock default ColumnFamilyData
113 auto column_family_set = versions.GetColumnFamilySet();
114 LogsWithPrepTracker dummy_prep_tracker;
115 auto cfd = column_family_set->GetDefault();
116 EXPECT_TRUE(nullptr != cfd);
117 uint64_t file_num = file_number.fetch_add(1);
118 // Create dummy mutex.
119 InstrumentedMutex mutex;
120 InstrumentedMutexLock l(&mutex);
121 std::list<std::unique_ptr<FlushJobInfo>> flush_jobs_info;
122 Status s = list->TryInstallMemtableFlushResults(
123 cfd, mutable_cf_options, m, &dummy_prep_tracker, &versions, &mutex,
124 file_num, to_delete, nullptr, &log_buffer, &flush_jobs_info);
125 return s;
126 }
127
128 // Calls MemTableList::InstallMemtableFlushResults() and sets up all
129 // structures needed to call this function.
130 Status Mock_InstallMemtableAtomicFlushResults(
131 autovector<MemTableList*>& lists, const autovector<uint32_t>& cf_ids,
132 const autovector<const MutableCFOptions*>& mutable_cf_options_list,
133 const autovector<const autovector<MemTable*>*>& mems_list,
134 autovector<MemTable*>* to_delete) {
135 // Create a mock Logger
136 test::NullLogger logger;
137 LogBuffer log_buffer(DEBUG_LEVEL, &logger);
138
139 CreateDB();
140 // Create a mock VersionSet
141 DBOptions db_options;
142 db_options.file_system.reset(new LegacyFileSystemWrapper(db_options.env));
143
144 ImmutableDBOptions immutable_db_options(db_options);
145 EnvOptions env_options;
146 std::shared_ptr<Cache> table_cache(NewLRUCache(50000, 16));
147 WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
148 WriteController write_controller(10000000u);
149
150 VersionSet versions(dbname, &immutable_db_options, env_options,
151 table_cache.get(), &write_buffer_manager,
152 &write_controller, /*block_cache_tracer=*/nullptr);
153 std::vector<ColumnFamilyDescriptor> cf_descs;
154 cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
155 cf_descs.emplace_back("one", ColumnFamilyOptions());
156 cf_descs.emplace_back("two", ColumnFamilyOptions());
157 EXPECT_OK(versions.Recover(cf_descs, false));
158
159 // Create mock default ColumnFamilyData
160
161 auto column_family_set = versions.GetColumnFamilySet();
162
163 LogsWithPrepTracker dummy_prep_tracker;
164 autovector<ColumnFamilyData*> cfds;
165 for (int i = 0; i != static_cast<int>(cf_ids.size()); ++i) {
166 cfds.emplace_back(column_family_set->GetColumnFamily(cf_ids[i]));
167 EXPECT_NE(nullptr, cfds[i]);
168 }
169 std::vector<FileMetaData> file_metas;
170 file_metas.reserve(cf_ids.size());
171 for (size_t i = 0; i != cf_ids.size(); ++i) {
172 FileMetaData meta;
173 uint64_t file_num = file_number.fetch_add(1);
174 meta.fd = FileDescriptor(file_num, 0, 0);
175 file_metas.emplace_back(meta);
176 }
177 autovector<FileMetaData*> file_meta_ptrs;
178 for (auto& meta : file_metas) {
179 file_meta_ptrs.push_back(&meta);
180 }
181 InstrumentedMutex mutex;
182 InstrumentedMutexLock l(&mutex);
183 return InstallMemtableAtomicFlushResults(
184 &lists, cfds, mutable_cf_options_list, mems_list, &versions, &mutex,
185 file_meta_ptrs, to_delete, nullptr, &log_buffer);
186 }
187 };
188
189 TEST_F(MemTableListTest, Empty) {
190 // Create an empty MemTableList and validate basic functions.
191 MemTableList list(1, 0, 0);
192
193 ASSERT_EQ(0, list.NumNotFlushed());
194 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
195 ASSERT_FALSE(list.IsFlushPending());
196
197 autovector<MemTable*> mems;
198 list.PickMemtablesToFlush(nullptr /* memtable_id */, &mems);
199 ASSERT_EQ(0, mems.size());
200
201 autovector<MemTable*> to_delete;
202 list.current()->Unref(&to_delete);
203 ASSERT_EQ(0, to_delete.size());
204 }
205
206 TEST_F(MemTableListTest, GetTest) {
207 // Create MemTableList
208 int min_write_buffer_number_to_merge = 2;
209 int max_write_buffer_number_to_maintain = 0;
210 int64_t max_write_buffer_size_to_maintain = 0;
211 MemTableList list(min_write_buffer_number_to_merge,
212 max_write_buffer_number_to_maintain,
213 max_write_buffer_size_to_maintain);
214
215 SequenceNumber seq = 1;
216 std::string value;
217 Status s;
218 MergeContext merge_context;
219 InternalKeyComparator ikey_cmp(options.comparator);
220 SequenceNumber max_covering_tombstone_seq = 0;
221 autovector<MemTable*> to_delete;
222
223 LookupKey lkey("key1", seq);
224 bool found = list.current()->Get(lkey, &value, &s, &merge_context,
225 &max_covering_tombstone_seq, ReadOptions());
226 ASSERT_FALSE(found);
227
228 // Create a MemTable
229 InternalKeyComparator cmp(BytewiseComparator());
230 auto factory = std::make_shared<SkipListFactory>();
231 options.memtable_factory = factory;
232 ImmutableCFOptions ioptions(options);
233
234 WriteBufferManager wb(options.db_write_buffer_size);
235 MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
236 kMaxSequenceNumber, 0 /* column_family_id */);
237 mem->Ref();
238
239 // Write some keys to this memtable.
240 mem->Add(++seq, kTypeDeletion, "key1", "");
241 mem->Add(++seq, kTypeValue, "key2", "value2");
242 mem->Add(++seq, kTypeValue, "key1", "value1");
243 mem->Add(++seq, kTypeValue, "key2", "value2.2");
244
245 // Fetch the newly written keys
246 merge_context.Clear();
247 found = mem->Get(LookupKey("key1", seq), &value, &s, &merge_context,
248 &max_covering_tombstone_seq, ReadOptions());
249 ASSERT_TRUE(s.ok() && found);
250 ASSERT_EQ(value, "value1");
251
252 merge_context.Clear();
253 found = mem->Get(LookupKey("key1", 2), &value, &s, &merge_context,
254 &max_covering_tombstone_seq, ReadOptions());
255 // MemTable found out that this key is *not* found (at this sequence#)
256 ASSERT_TRUE(found && s.IsNotFound());
257
258 merge_context.Clear();
259 found = mem->Get(LookupKey("key2", seq), &value, &s, &merge_context,
260 &max_covering_tombstone_seq, ReadOptions());
261 ASSERT_TRUE(s.ok() && found);
262 ASSERT_EQ(value, "value2.2");
263
264 ASSERT_EQ(4, mem->num_entries());
265 ASSERT_EQ(1, mem->num_deletes());
266
267 // Add memtable to list
268 list.Add(mem, &to_delete);
269
270 SequenceNumber saved_seq = seq;
271
272 // Create another memtable and write some keys to it
273 WriteBufferManager wb2(options.db_write_buffer_size);
274 MemTable* mem2 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb2,
275 kMaxSequenceNumber, 0 /* column_family_id */);
276 mem2->Ref();
277
278 mem2->Add(++seq, kTypeDeletion, "key1", "");
279 mem2->Add(++seq, kTypeValue, "key2", "value2.3");
280
281 // Add second memtable to list
282 list.Add(mem2, &to_delete);
283
284 // Fetch keys via MemTableList
285 merge_context.Clear();
286 found =
287 list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context,
288 &max_covering_tombstone_seq, ReadOptions());
289 ASSERT_TRUE(found && s.IsNotFound());
290
291 merge_context.Clear();
292 found = list.current()->Get(LookupKey("key1", saved_seq), &value, &s,
293 &merge_context, &max_covering_tombstone_seq,
294 ReadOptions());
295 ASSERT_TRUE(s.ok() && found);
296 ASSERT_EQ("value1", value);
297
298 merge_context.Clear();
299 found =
300 list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context,
301 &max_covering_tombstone_seq, ReadOptions());
302 ASSERT_TRUE(s.ok() && found);
303 ASSERT_EQ(value, "value2.3");
304
305 merge_context.Clear();
306 found = list.current()->Get(LookupKey("key2", 1), &value, &s, &merge_context,
307 &max_covering_tombstone_seq, ReadOptions());
308 ASSERT_FALSE(found);
309
310 ASSERT_EQ(2, list.NumNotFlushed());
311
312 list.current()->Unref(&to_delete);
313 for (MemTable* m : to_delete) {
314 delete m;
315 }
316 }
317
318 TEST_F(MemTableListTest, GetFromHistoryTest) {
319 // Create MemTableList
320 int min_write_buffer_number_to_merge = 2;
321 int max_write_buffer_number_to_maintain = 2;
322 int64_t max_write_buffer_size_to_maintain = 2000;
323 MemTableList list(min_write_buffer_number_to_merge,
324 max_write_buffer_number_to_maintain,
325 max_write_buffer_size_to_maintain);
326
327 SequenceNumber seq = 1;
328 std::string value;
329 Status s;
330 MergeContext merge_context;
331 InternalKeyComparator ikey_cmp(options.comparator);
332 SequenceNumber max_covering_tombstone_seq = 0;
333 autovector<MemTable*> to_delete;
334
335 LookupKey lkey("key1", seq);
336 bool found = list.current()->Get(lkey, &value, &s, &merge_context,
337 &max_covering_tombstone_seq, ReadOptions());
338 ASSERT_FALSE(found);
339
340 // Create a MemTable
341 InternalKeyComparator cmp(BytewiseComparator());
342 auto factory = std::make_shared<SkipListFactory>();
343 options.memtable_factory = factory;
344 ImmutableCFOptions ioptions(options);
345
346 WriteBufferManager wb(options.db_write_buffer_size);
347 MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
348 kMaxSequenceNumber, 0 /* column_family_id */);
349 mem->Ref();
350
351 // Write some keys to this memtable.
352 mem->Add(++seq, kTypeDeletion, "key1", "");
353 mem->Add(++seq, kTypeValue, "key2", "value2");
354 mem->Add(++seq, kTypeValue, "key2", "value2.2");
355
356 // Fetch the newly written keys
357 merge_context.Clear();
358 found = mem->Get(LookupKey("key1", seq), &value, &s, &merge_context,
359 &max_covering_tombstone_seq, ReadOptions());
360 // MemTable found out that this key is *not* found (at this sequence#)
361 ASSERT_TRUE(found && s.IsNotFound());
362
363 merge_context.Clear();
364 found = mem->Get(LookupKey("key2", seq), &value, &s, &merge_context,
365 &max_covering_tombstone_seq, ReadOptions());
366 ASSERT_TRUE(s.ok() && found);
367 ASSERT_EQ(value, "value2.2");
368
369 // Add memtable to list
370 list.Add(mem, &to_delete);
371 ASSERT_EQ(0, to_delete.size());
372
373 // Fetch keys via MemTableList
374 merge_context.Clear();
375 found =
376 list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context,
377 &max_covering_tombstone_seq, ReadOptions());
378 ASSERT_TRUE(found && s.IsNotFound());
379
380 merge_context.Clear();
381 found =
382 list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context,
383 &max_covering_tombstone_seq, ReadOptions());
384 ASSERT_TRUE(s.ok() && found);
385 ASSERT_EQ("value2.2", value);
386
387 // Flush this memtable from the list.
388 // (It will then be a part of the memtable history).
389 autovector<MemTable*> to_flush;
390 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
391 ASSERT_EQ(1, to_flush.size());
392
393 MutableCFOptions mutable_cf_options(options);
394 s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush,
395 &to_delete);
396 ASSERT_OK(s);
397 ASSERT_EQ(0, list.NumNotFlushed());
398 ASSERT_EQ(1, list.NumFlushed());
399 ASSERT_EQ(0, to_delete.size());
400
401 // Verify keys are no longer in MemTableList
402 merge_context.Clear();
403 found =
404 list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context,
405 &max_covering_tombstone_seq, ReadOptions());
406 ASSERT_FALSE(found);
407
408 merge_context.Clear();
409 found =
410 list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context,
411 &max_covering_tombstone_seq, ReadOptions());
412 ASSERT_FALSE(found);
413
414 // Verify keys are present in history
415 merge_context.Clear();
416 found = list.current()->GetFromHistory(
417 LookupKey("key1", seq), &value, &s, &merge_context,
418 &max_covering_tombstone_seq, ReadOptions());
419 ASSERT_TRUE(found && s.IsNotFound());
420
421 merge_context.Clear();
422 found = list.current()->GetFromHistory(
423 LookupKey("key2", seq), &value, &s, &merge_context,
424 &max_covering_tombstone_seq, ReadOptions());
425 ASSERT_TRUE(found);
426 ASSERT_EQ("value2.2", value);
427
428 // Create another memtable and write some keys to it
429 WriteBufferManager wb2(options.db_write_buffer_size);
430 MemTable* mem2 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb2,
431 kMaxSequenceNumber, 0 /* column_family_id */);
432 mem2->Ref();
433
434 mem2->Add(++seq, kTypeDeletion, "key1", "");
435 mem2->Add(++seq, kTypeValue, "key3", "value3");
436
437 // Add second memtable to list
438 list.Add(mem2, &to_delete);
439 ASSERT_EQ(0, to_delete.size());
440
441 to_flush.clear();
442 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
443 ASSERT_EQ(1, to_flush.size());
444
445 // Flush second memtable
446 s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush,
447 &to_delete);
448 ASSERT_OK(s);
449 ASSERT_EQ(0, list.NumNotFlushed());
450 ASSERT_EQ(2, list.NumFlushed());
451 ASSERT_EQ(0, to_delete.size());
452
453 // Add a third memtable to push the first memtable out of the history
454 WriteBufferManager wb3(options.db_write_buffer_size);
455 MemTable* mem3 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb3,
456 kMaxSequenceNumber, 0 /* column_family_id */);
457 mem3->Ref();
458 list.Add(mem3, &to_delete);
459 ASSERT_EQ(1, list.NumNotFlushed());
460 ASSERT_EQ(1, list.NumFlushed());
461 ASSERT_EQ(1, to_delete.size());
462
463 // Verify keys are no longer in MemTableList
464 merge_context.Clear();
465 found =
466 list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context,
467 &max_covering_tombstone_seq, ReadOptions());
468 ASSERT_FALSE(found);
469
470 merge_context.Clear();
471 found =
472 list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context,
473 &max_covering_tombstone_seq, ReadOptions());
474 ASSERT_FALSE(found);
475
476 merge_context.Clear();
477 found =
478 list.current()->Get(LookupKey("key3", seq), &value, &s, &merge_context,
479 &max_covering_tombstone_seq, ReadOptions());
480 ASSERT_FALSE(found);
481
482 // Verify that the second memtable's keys are in the history
483 merge_context.Clear();
484 found = list.current()->GetFromHistory(
485 LookupKey("key1", seq), &value, &s, &merge_context,
486 &max_covering_tombstone_seq, ReadOptions());
487 ASSERT_TRUE(found && s.IsNotFound());
488
489 merge_context.Clear();
490 found = list.current()->GetFromHistory(
491 LookupKey("key3", seq), &value, &s, &merge_context,
492 &max_covering_tombstone_seq, ReadOptions());
493 ASSERT_TRUE(found);
494 ASSERT_EQ("value3", value);
495
496 // Verify that key2 from the first memtable is no longer in the history
497 merge_context.Clear();
498 found =
499 list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context,
500 &max_covering_tombstone_seq, ReadOptions());
501 ASSERT_FALSE(found);
502
503 // Cleanup
504 list.current()->Unref(&to_delete);
505 ASSERT_EQ(3, to_delete.size());
506 for (MemTable* m : to_delete) {
507 delete m;
508 }
509 }
510
511 TEST_F(MemTableListTest, FlushPendingTest) {
512 const int num_tables = 6;
513 SequenceNumber seq = 1;
514 Status s;
515
516 auto factory = std::make_shared<SkipListFactory>();
517 options.memtable_factory = factory;
518 ImmutableCFOptions ioptions(options);
519 InternalKeyComparator cmp(BytewiseComparator());
520 WriteBufferManager wb(options.db_write_buffer_size);
521 autovector<MemTable*> to_delete;
522
523 // Create MemTableList
524 int min_write_buffer_number_to_merge = 3;
525 int max_write_buffer_number_to_maintain = 7;
526 int64_t max_write_buffer_size_to_maintain =
527 7 * static_cast<int>(options.write_buffer_size);
528 MemTableList list(min_write_buffer_number_to_merge,
529 max_write_buffer_number_to_maintain,
530 max_write_buffer_size_to_maintain);
531
532 // Create some MemTables
533 uint64_t memtable_id = 0;
534 std::vector<MemTable*> tables;
535 MutableCFOptions mutable_cf_options(options);
536 for (int i = 0; i < num_tables; i++) {
537 MemTable* mem = new MemTable(cmp, ioptions, mutable_cf_options, &wb,
538 kMaxSequenceNumber, 0 /* column_family_id */);
539 mem->SetID(memtable_id++);
540 mem->Ref();
541
542 std::string value;
543 MergeContext merge_context;
544
545 mem->Add(++seq, kTypeValue, "key1", ToString(i));
546 mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN");
547 mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value");
548 mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM");
549 mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), "");
550
551 tables.push_back(mem);
552 }
553
554 // Nothing to flush
555 ASSERT_FALSE(list.IsFlushPending());
556 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
557 autovector<MemTable*> to_flush;
558 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
559 ASSERT_EQ(0, to_flush.size());
560
561 // Request a flush even though there is nothing to flush
562 list.FlushRequested();
563 ASSERT_FALSE(list.IsFlushPending());
564 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
565
566 // Attempt to 'flush' to clear request for flush
567 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
568 ASSERT_EQ(0, to_flush.size());
569 ASSERT_FALSE(list.IsFlushPending());
570 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
571
572 // Request a flush again
573 list.FlushRequested();
574 // No flush pending since the list is empty.
575 ASSERT_FALSE(list.IsFlushPending());
576 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
577
578 // Add 2 tables
579 list.Add(tables[0], &to_delete);
580 list.Add(tables[1], &to_delete);
581 ASSERT_EQ(2, list.NumNotFlushed());
582 ASSERT_EQ(0, to_delete.size());
583
584 // Even though we have less than the minimum to flush, a flush is
585 // pending since we had previously requested a flush and never called
586 // PickMemtablesToFlush() to clear the flush.
587 ASSERT_TRUE(list.IsFlushPending());
588 ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
589
590 // Pick tables to flush
591 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
592 ASSERT_EQ(2, to_flush.size());
593 ASSERT_EQ(2, list.NumNotFlushed());
594 ASSERT_FALSE(list.IsFlushPending());
595 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
596
597 // Revert flush
598 list.RollbackMemtableFlush(to_flush, 0);
599 ASSERT_FALSE(list.IsFlushPending());
600 ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
601 to_flush.clear();
602
603 // Add another table
604 list.Add(tables[2], &to_delete);
605 // We now have the minimum to flush regardles of whether FlushRequested()
606 // was called.
607 ASSERT_TRUE(list.IsFlushPending());
608 ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
609 ASSERT_EQ(0, to_delete.size());
610
611 // Pick tables to flush
612 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
613 ASSERT_EQ(3, to_flush.size());
614 ASSERT_EQ(3, list.NumNotFlushed());
615 ASSERT_FALSE(list.IsFlushPending());
616 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
617
618 // Pick tables to flush again
619 autovector<MemTable*> to_flush2;
620 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2);
621 ASSERT_EQ(0, to_flush2.size());
622 ASSERT_EQ(3, list.NumNotFlushed());
623 ASSERT_FALSE(list.IsFlushPending());
624 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
625
626 // Add another table
627 list.Add(tables[3], &to_delete);
628 ASSERT_FALSE(list.IsFlushPending());
629 ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
630 ASSERT_EQ(0, to_delete.size());
631
632 // Request a flush again
633 list.FlushRequested();
634 ASSERT_TRUE(list.IsFlushPending());
635 ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
636
637 // Pick tables to flush again
638 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2);
639 ASSERT_EQ(1, to_flush2.size());
640 ASSERT_EQ(4, list.NumNotFlushed());
641 ASSERT_FALSE(list.IsFlushPending());
642 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
643
644 // Rollback first pick of tables
645 list.RollbackMemtableFlush(to_flush, 0);
646 ASSERT_TRUE(list.IsFlushPending());
647 ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
648 to_flush.clear();
649
650 // Add another tables
651 list.Add(tables[4], &to_delete);
652 ASSERT_EQ(5, list.NumNotFlushed());
653 // We now have the minimum to flush regardles of whether FlushRequested()
654 ASSERT_TRUE(list.IsFlushPending());
655 ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
656 ASSERT_EQ(0, to_delete.size());
657
658 // Pick tables to flush
659 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
660 // Should pick 4 of 5 since 1 table has been picked in to_flush2
661 ASSERT_EQ(4, to_flush.size());
662 ASSERT_EQ(5, list.NumNotFlushed());
663 ASSERT_FALSE(list.IsFlushPending());
664 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
665
666 // Pick tables to flush again
667 autovector<MemTable*> to_flush3;
668 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush3);
669 ASSERT_EQ(0, to_flush3.size()); // nothing not in progress of being flushed
670 ASSERT_EQ(5, list.NumNotFlushed());
671 ASSERT_FALSE(list.IsFlushPending());
672 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
673
674 // Flush the 4 memtables that were picked in to_flush
675 s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush,
676 &to_delete);
677 ASSERT_OK(s);
678
679 // Note: now to_flush contains tables[0,1,2,4]. to_flush2 contains
680 // tables[3].
681 // Current implementation will only commit memtables in the order they were
682 // created. So TryInstallMemtableFlushResults will install the first 3 tables
683 // in to_flush and stop when it encounters a table not yet flushed.
684 ASSERT_EQ(2, list.NumNotFlushed());
685 int num_in_history =
686 std::min(3, static_cast<int>(max_write_buffer_size_to_maintain) /
687 static_cast<int>(options.write_buffer_size));
688 ASSERT_EQ(num_in_history, list.NumFlushed());
689 ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size());
690
691 // Request a flush again. Should be nothing to flush
692 list.FlushRequested();
693 ASSERT_FALSE(list.IsFlushPending());
694 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
695
696 // Flush the 1 memtable that was picked in to_flush2
697 s = MemTableListTest::Mock_InstallMemtableFlushResults(
698 &list, mutable_cf_options, to_flush2, &to_delete);
699 ASSERT_OK(s);
700
701 // This will actually install 2 tables. The 1 we told it to flush, and also
702 // tables[4] which has been waiting for tables[3] to commit.
703 ASSERT_EQ(0, list.NumNotFlushed());
704 num_in_history =
705 std::min(5, static_cast<int>(max_write_buffer_size_to_maintain) /
706 static_cast<int>(options.write_buffer_size));
707 ASSERT_EQ(num_in_history, list.NumFlushed());
708 ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size());
709
710 for (const auto& m : to_delete) {
711 // Refcount should be 0 after calling TryInstallMemtableFlushResults.
712 // Verify this, by Ref'ing then UnRef'ing:
713 m->Ref();
714 ASSERT_EQ(m, m->Unref());
715 delete m;
716 }
717 to_delete.clear();
718
719 // Add another table
720 list.Add(tables[5], &to_delete);
721 ASSERT_EQ(1, list.NumNotFlushed());
722 ASSERT_EQ(5, list.GetLatestMemTableID());
723 memtable_id = 4;
724 // Pick tables to flush. The tables to pick must have ID smaller than or
725 // equal to 4. Therefore, no table will be selected in this case.
726 autovector<MemTable*> to_flush4;
727 list.FlushRequested();
728 ASSERT_TRUE(list.HasFlushRequested());
729 list.PickMemtablesToFlush(&memtable_id, &to_flush4);
730 ASSERT_TRUE(to_flush4.empty());
731 ASSERT_EQ(1, list.NumNotFlushed());
732 ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
733 ASSERT_FALSE(list.IsFlushPending());
734 ASSERT_FALSE(list.HasFlushRequested());
735
736 // Pick tables to flush. The tables to pick must have ID smaller than or
737 // equal to 5. Therefore, only tables[5] will be selected.
738 memtable_id = 5;
739 list.FlushRequested();
740 list.PickMemtablesToFlush(&memtable_id, &to_flush4);
741 ASSERT_EQ(1, static_cast<int>(to_flush4.size()));
742 ASSERT_EQ(1, list.NumNotFlushed());
743 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
744 ASSERT_FALSE(list.IsFlushPending());
745 to_delete.clear();
746
747 list.current()->Unref(&to_delete);
748 int to_delete_size =
749 std::min(num_tables, static_cast<int>(max_write_buffer_size_to_maintain) /
750 static_cast<int>(options.write_buffer_size));
751 ASSERT_EQ(to_delete_size, to_delete.size());
752
753 for (const auto& m : to_delete) {
754 // Refcount should be 0 after calling TryInstallMemtableFlushResults.
755 // Verify this, by Ref'ing then UnRef'ing:
756 m->Ref();
757 ASSERT_EQ(m, m->Unref());
758 delete m;
759 }
760 to_delete.clear();
761 }
762
763 TEST_F(MemTableListTest, EmptyAtomicFlusTest) {
764 autovector<MemTableList*> lists;
765 autovector<uint32_t> cf_ids;
766 autovector<const MutableCFOptions*> options_list;
767 autovector<const autovector<MemTable*>*> to_flush;
768 autovector<MemTable*> to_delete;
769 Status s = Mock_InstallMemtableAtomicFlushResults(lists, cf_ids, options_list,
770 to_flush, &to_delete);
771 ASSERT_OK(s);
772 ASSERT_TRUE(to_delete.empty());
773 }
774
775 TEST_F(MemTableListTest, AtomicFlusTest) {
776 const int num_cfs = 3;
777 const int num_tables_per_cf = 2;
778 SequenceNumber seq = 1;
779
780 auto factory = std::make_shared<SkipListFactory>();
781 options.memtable_factory = factory;
782 ImmutableCFOptions ioptions(options);
783 InternalKeyComparator cmp(BytewiseComparator());
784 WriteBufferManager wb(options.db_write_buffer_size);
785
786 // Create MemTableLists
787 int min_write_buffer_number_to_merge = 3;
788 int max_write_buffer_number_to_maintain = 7;
789 int64_t max_write_buffer_size_to_maintain =
790 7 * static_cast<int64_t>(options.write_buffer_size);
791 autovector<MemTableList*> lists;
792 for (int i = 0; i != num_cfs; ++i) {
793 lists.emplace_back(new MemTableList(min_write_buffer_number_to_merge,
794 max_write_buffer_number_to_maintain,
795 max_write_buffer_size_to_maintain));
796 }
797
798 autovector<uint32_t> cf_ids;
799 std::vector<std::vector<MemTable*>> tables(num_cfs);
800 autovector<const MutableCFOptions*> mutable_cf_options_list;
801 uint32_t cf_id = 0;
802 for (auto& elem : tables) {
803 mutable_cf_options_list.emplace_back(new MutableCFOptions(options));
804 uint64_t memtable_id = 0;
805 for (int i = 0; i != num_tables_per_cf; ++i) {
806 MemTable* mem =
807 new MemTable(cmp, ioptions, *(mutable_cf_options_list.back()), &wb,
808 kMaxSequenceNumber, cf_id);
809 mem->SetID(memtable_id++);
810 mem->Ref();
811
812 std::string value;
813
814 mem->Add(++seq, kTypeValue, "key1", ToString(i));
815 mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN");
816 mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value");
817 mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM");
818 mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), "");
819
820 elem.push_back(mem);
821 }
822 cf_ids.push_back(cf_id++);
823 }
824
825 std::vector<autovector<MemTable*>> flush_candidates(num_cfs);
826
827 // Nothing to flush
828 for (auto i = 0; i != num_cfs; ++i) {
829 auto* list = lists[i];
830 ASSERT_FALSE(list->IsFlushPending());
831 ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire));
832 list->PickMemtablesToFlush(nullptr /* memtable_id */, &flush_candidates[i]);
833 ASSERT_EQ(0, flush_candidates[i].size());
834 }
835 // Request flush even though there is nothing to flush
836 for (auto i = 0; i != num_cfs; ++i) {
837 auto* list = lists[i];
838 list->FlushRequested();
839 ASSERT_FALSE(list->IsFlushPending());
840 ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire));
841 }
842 autovector<MemTable*> to_delete;
843 // Add tables to the immutable memtalbe lists associated with column families
844 for (auto i = 0; i != num_cfs; ++i) {
845 for (auto j = 0; j != num_tables_per_cf; ++j) {
846 lists[i]->Add(tables[i][j], &to_delete);
847 }
848 ASSERT_EQ(num_tables_per_cf, lists[i]->NumNotFlushed());
849 ASSERT_TRUE(lists[i]->IsFlushPending());
850 ASSERT_TRUE(lists[i]->imm_flush_needed.load(std::memory_order_acquire));
851 }
852 std::vector<uint64_t> flush_memtable_ids = {1, 1, 0};
853 // +----+
854 // list[0]: |0 1|
855 // list[1]: |0 1|
856 // | +--+
857 // list[2]: |0| 1
858 // +-+
859 // Pick memtables to flush
860 for (auto i = 0; i != num_cfs; ++i) {
861 flush_candidates[i].clear();
862 lists[i]->PickMemtablesToFlush(&flush_memtable_ids[i],
863 &flush_candidates[i]);
864 ASSERT_EQ(flush_memtable_ids[i] - 0 + 1,
865 static_cast<uint64_t>(flush_candidates[i].size()));
866 }
867 autovector<MemTableList*> tmp_lists;
868 autovector<uint32_t> tmp_cf_ids;
869 autovector<const MutableCFOptions*> tmp_options_list;
870 autovector<const autovector<MemTable*>*> to_flush;
871 for (auto i = 0; i != num_cfs; ++i) {
872 if (!flush_candidates[i].empty()) {
873 to_flush.push_back(&flush_candidates[i]);
874 tmp_lists.push_back(lists[i]);
875 tmp_cf_ids.push_back(i);
876 tmp_options_list.push_back(mutable_cf_options_list[i]);
877 }
878 }
879 Status s = Mock_InstallMemtableAtomicFlushResults(
880 tmp_lists, tmp_cf_ids, tmp_options_list, to_flush, &to_delete);
881 ASSERT_OK(s);
882
883 for (auto i = 0; i != num_cfs; ++i) {
884 for (auto j = 0; j != num_tables_per_cf; ++j) {
885 if (static_cast<uint64_t>(j) <= flush_memtable_ids[i]) {
886 ASSERT_LT(0, tables[i][j]->GetFileNumber());
887 }
888 }
889 ASSERT_EQ(
890 static_cast<size_t>(num_tables_per_cf) - flush_candidates[i].size(),
891 lists[i]->NumNotFlushed());
892 }
893
894 to_delete.clear();
895 for (auto list : lists) {
896 list->current()->Unref(&to_delete);
897 delete list;
898 }
899 for (auto& mutable_cf_options : mutable_cf_options_list) {
900 if (mutable_cf_options != nullptr) {
901 delete mutable_cf_options;
902 mutable_cf_options = nullptr;
903 }
904 }
905 // All memtables in tables array must have been flushed, thus ready to be
906 // deleted.
907 ASSERT_EQ(to_delete.size(), tables.size() * tables.front().size());
908 for (const auto& m : to_delete) {
909 // Refcount should be 0 after calling InstallMemtableFlushResults.
910 // Verify this by Ref'ing and then Unref'ing.
911 m->Ref();
912 ASSERT_EQ(m, m->Unref());
913 delete m;
914 }
915 }
916
917 } // namespace ROCKSDB_NAMESPACE
918
919 int main(int argc, char** argv) {
920 ::testing::InitGoogleTest(&argc, argv);
921 return RUN_ALL_TESTS();
922 }