]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/memtable_list_test.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / memtable_list_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "db/memtable_list.h"
7 #include <algorithm>
8 #include <string>
9 #include <vector>
10 #include "db/merge_context.h"
11 #include "db/version_set.h"
12 #include "db/write_controller.h"
13 #include "rocksdb/db.h"
14 #include "rocksdb/status.h"
15 #include "rocksdb/write_buffer_manager.h"
16 #include "util/string_util.h"
17 #include "util/testharness.h"
18 #include "util/testutil.h"
19
20 namespace rocksdb {
21
22 class MemTableListTest : public testing::Test {
23 public:
24 std::string dbname;
25 DB* db;
26 Options options;
27 std::vector<ColumnFamilyHandle*> handles;
28 std::atomic<uint64_t> file_number;
29
30 MemTableListTest() : db(nullptr), file_number(1) {
31 dbname = test::PerThreadDBPath("memtable_list_test");
32 options.create_if_missing = true;
33 DestroyDB(dbname, options);
34 }
35
36 // Create a test db if not yet created
37 void CreateDB() {
38 if (db == nullptr) {
39 options.create_if_missing = true;
40 DestroyDB(dbname, options);
41 // Open DB only with default column family
42 ColumnFamilyOptions cf_options;
43 std::vector<ColumnFamilyDescriptor> cf_descs;
44 cf_descs.emplace_back(kDefaultColumnFamilyName, cf_options);
45 Status s = DB::Open(options, dbname, cf_descs, &handles, &db);
46 EXPECT_OK(s);
47
48 ColumnFamilyOptions cf_opt1, cf_opt2;
49 cf_opt1.cf_paths.emplace_back(dbname + "_one_1",
50 std::numeric_limits<uint64_t>::max());
51 cf_opt2.cf_paths.emplace_back(dbname + "_two_1",
52 std::numeric_limits<uint64_t>::max());
53 int sz = static_cast<int>(handles.size());
54 handles.resize(sz + 2);
55 s = db->CreateColumnFamily(cf_opt1, "one", &handles[1]);
56 EXPECT_OK(s);
57 s = db->CreateColumnFamily(cf_opt2, "two", &handles[2]);
58 EXPECT_OK(s);
59
60 cf_descs.emplace_back("one", cf_options);
61 cf_descs.emplace_back("two", cf_options);
62 }
63 }
64
65 ~MemTableListTest() override {
66 if (db) {
67 std::vector<ColumnFamilyDescriptor> cf_descs(handles.size());
68 for (int i = 0; i != static_cast<int>(handles.size()); ++i) {
69 handles[i]->GetDescriptor(&cf_descs[i]);
70 }
71 for (auto h : handles) {
72 if (h) {
73 db->DestroyColumnFamilyHandle(h);
74 }
75 }
76 handles.clear();
77 delete db;
78 db = nullptr;
79 DestroyDB(dbname, options, cf_descs);
80 }
81 }
82
83 // Calls MemTableList::TryInstallMemtableFlushResults() and sets up all
84 // structures needed to call this function.
85 Status Mock_InstallMemtableFlushResults(
86 MemTableList* list, const MutableCFOptions& mutable_cf_options,
87 const autovector<MemTable*>& m, autovector<MemTable*>* to_delete) {
88 // Create a mock Logger
89 test::NullLogger logger;
90 LogBuffer log_buffer(DEBUG_LEVEL, &logger);
91
92 CreateDB();
93 // Create a mock VersionSet
94 DBOptions db_options;
95 ImmutableDBOptions immutable_db_options(db_options);
96 EnvOptions env_options;
97 std::shared_ptr<Cache> table_cache(NewLRUCache(50000, 16));
98 WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
99 WriteController write_controller(10000000u);
100
101 VersionSet versions(dbname, &immutable_db_options, env_options,
102 table_cache.get(), &write_buffer_manager,
103 &write_controller);
104 std::vector<ColumnFamilyDescriptor> cf_descs;
105 cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
106 cf_descs.emplace_back("one", ColumnFamilyOptions());
107 cf_descs.emplace_back("two", ColumnFamilyOptions());
108
109 EXPECT_OK(versions.Recover(cf_descs, false));
110
111 // Create mock default ColumnFamilyData
112 auto column_family_set = versions.GetColumnFamilySet();
113 LogsWithPrepTracker dummy_prep_tracker;
114 auto cfd = column_family_set->GetDefault();
115 EXPECT_TRUE(nullptr != cfd);
116 uint64_t file_num = file_number.fetch_add(1);
117 // Create dummy mutex.
118 InstrumentedMutex mutex;
119 InstrumentedMutexLock l(&mutex);
120 return list->TryInstallMemtableFlushResults(
121 cfd, mutable_cf_options, m, &dummy_prep_tracker, &versions, &mutex,
122 file_num, to_delete, nullptr, &log_buffer);
123 }
124
125 // Calls MemTableList::InstallMemtableFlushResults() and sets up all
126 // structures needed to call this function.
127 Status Mock_InstallMemtableAtomicFlushResults(
128 autovector<MemTableList*>& lists, const autovector<uint32_t>& cf_ids,
129 const autovector<const MutableCFOptions*>& mutable_cf_options_list,
130 const autovector<const autovector<MemTable*>*>& mems_list,
131 autovector<MemTable*>* to_delete) {
132 // Create a mock Logger
133 test::NullLogger logger;
134 LogBuffer log_buffer(DEBUG_LEVEL, &logger);
135
136 CreateDB();
137 // Create a mock VersionSet
138 DBOptions db_options;
139 ImmutableDBOptions immutable_db_options(db_options);
140 EnvOptions env_options;
141 std::shared_ptr<Cache> table_cache(NewLRUCache(50000, 16));
142 WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
143 WriteController write_controller(10000000u);
144
145 VersionSet versions(dbname, &immutable_db_options, env_options,
146 table_cache.get(), &write_buffer_manager,
147 &write_controller);
148 std::vector<ColumnFamilyDescriptor> cf_descs;
149 cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
150 cf_descs.emplace_back("one", ColumnFamilyOptions());
151 cf_descs.emplace_back("two", ColumnFamilyOptions());
152 EXPECT_OK(versions.Recover(cf_descs, false));
153
154 // Create mock default ColumnFamilyData
155
156 auto column_family_set = versions.GetColumnFamilySet();
157
158 LogsWithPrepTracker dummy_prep_tracker;
159 autovector<ColumnFamilyData*> cfds;
160 for (int i = 0; i != static_cast<int>(cf_ids.size()); ++i) {
161 cfds.emplace_back(column_family_set->GetColumnFamily(cf_ids[i]));
162 EXPECT_NE(nullptr, cfds[i]);
163 }
164 std::vector<FileMetaData> file_metas;
165 file_metas.reserve(cf_ids.size());
166 for (size_t i = 0; i != cf_ids.size(); ++i) {
167 FileMetaData meta;
168 uint64_t file_num = file_number.fetch_add(1);
169 meta.fd = FileDescriptor(file_num, 0, 0);
170 file_metas.emplace_back(meta);
171 }
172 autovector<FileMetaData*> file_meta_ptrs;
173 for (auto& meta : file_metas) {
174 file_meta_ptrs.push_back(&meta);
175 }
176 InstrumentedMutex mutex;
177 InstrumentedMutexLock l(&mutex);
178 return InstallMemtableAtomicFlushResults(
179 &lists, cfds, mutable_cf_options_list, mems_list, &versions, &mutex,
180 file_meta_ptrs, to_delete, nullptr, &log_buffer);
181 }
182 };
183
184 TEST_F(MemTableListTest, Empty) {
185 // Create an empty MemTableList and validate basic functions.
186 MemTableList list(1, 0);
187
188 ASSERT_EQ(0, list.NumNotFlushed());
189 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
190 ASSERT_FALSE(list.IsFlushPending());
191
192 autovector<MemTable*> mems;
193 list.PickMemtablesToFlush(nullptr /* memtable_id */, &mems);
194 ASSERT_EQ(0, mems.size());
195
196 autovector<MemTable*> to_delete;
197 list.current()->Unref(&to_delete);
198 ASSERT_EQ(0, to_delete.size());
199 }
200
201 TEST_F(MemTableListTest, GetTest) {
202 // Create MemTableList
203 int min_write_buffer_number_to_merge = 2;
204 int max_write_buffer_number_to_maintain = 0;
205 MemTableList list(min_write_buffer_number_to_merge,
206 max_write_buffer_number_to_maintain);
207
208 SequenceNumber seq = 1;
209 std::string value;
210 Status s;
211 MergeContext merge_context;
212 InternalKeyComparator ikey_cmp(options.comparator);
213 SequenceNumber max_covering_tombstone_seq = 0;
214 autovector<MemTable*> to_delete;
215
216 LookupKey lkey("key1", seq);
217 bool found = list.current()->Get(lkey, &value, &s, &merge_context,
218 &max_covering_tombstone_seq, ReadOptions());
219 ASSERT_FALSE(found);
220
221 // Create a MemTable
222 InternalKeyComparator cmp(BytewiseComparator());
223 auto factory = std::make_shared<SkipListFactory>();
224 options.memtable_factory = factory;
225 ImmutableCFOptions ioptions(options);
226
227 WriteBufferManager wb(options.db_write_buffer_size);
228 MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
229 kMaxSequenceNumber, 0 /* column_family_id */);
230 mem->Ref();
231
232 // Write some keys to this memtable.
233 mem->Add(++seq, kTypeDeletion, "key1", "");
234 mem->Add(++seq, kTypeValue, "key2", "value2");
235 mem->Add(++seq, kTypeValue, "key1", "value1");
236 mem->Add(++seq, kTypeValue, "key2", "value2.2");
237
238 // Fetch the newly written keys
239 merge_context.Clear();
240 found = mem->Get(LookupKey("key1", seq), &value, &s, &merge_context,
241 &max_covering_tombstone_seq, ReadOptions());
242 ASSERT_TRUE(s.ok() && found);
243 ASSERT_EQ(value, "value1");
244
245 merge_context.Clear();
246 found = mem->Get(LookupKey("key1", 2), &value, &s, &merge_context,
247 &max_covering_tombstone_seq, ReadOptions());
248 // MemTable found out that this key is *not* found (at this sequence#)
249 ASSERT_TRUE(found && s.IsNotFound());
250
251 merge_context.Clear();
252 found = mem->Get(LookupKey("key2", seq), &value, &s, &merge_context,
253 &max_covering_tombstone_seq, ReadOptions());
254 ASSERT_TRUE(s.ok() && found);
255 ASSERT_EQ(value, "value2.2");
256
257 ASSERT_EQ(4, mem->num_entries());
258 ASSERT_EQ(1, mem->num_deletes());
259
260 // Add memtable to list
261 list.Add(mem, &to_delete);
262
263 SequenceNumber saved_seq = seq;
264
265 // Create another memtable and write some keys to it
266 WriteBufferManager wb2(options.db_write_buffer_size);
267 MemTable* mem2 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb2,
268 kMaxSequenceNumber, 0 /* column_family_id */);
269 mem2->Ref();
270
271 mem2->Add(++seq, kTypeDeletion, "key1", "");
272 mem2->Add(++seq, kTypeValue, "key2", "value2.3");
273
274 // Add second memtable to list
275 list.Add(mem2, &to_delete);
276
277 // Fetch keys via MemTableList
278 merge_context.Clear();
279 found =
280 list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context,
281 &max_covering_tombstone_seq, ReadOptions());
282 ASSERT_TRUE(found && s.IsNotFound());
283
284 merge_context.Clear();
285 found = list.current()->Get(LookupKey("key1", saved_seq), &value, &s,
286 &merge_context, &max_covering_tombstone_seq,
287 ReadOptions());
288 ASSERT_TRUE(s.ok() && found);
289 ASSERT_EQ("value1", value);
290
291 merge_context.Clear();
292 found =
293 list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context,
294 &max_covering_tombstone_seq, ReadOptions());
295 ASSERT_TRUE(s.ok() && found);
296 ASSERT_EQ(value, "value2.3");
297
298 merge_context.Clear();
299 found = list.current()->Get(LookupKey("key2", 1), &value, &s, &merge_context,
300 &max_covering_tombstone_seq, ReadOptions());
301 ASSERT_FALSE(found);
302
303 ASSERT_EQ(2, list.NumNotFlushed());
304
305 list.current()->Unref(&to_delete);
306 for (MemTable* m : to_delete) {
307 delete m;
308 }
309 }
310
311 TEST_F(MemTableListTest, GetFromHistoryTest) {
312 // Create MemTableList
313 int min_write_buffer_number_to_merge = 2;
314 int max_write_buffer_number_to_maintain = 2;
315 MemTableList list(min_write_buffer_number_to_merge,
316 max_write_buffer_number_to_maintain);
317
318 SequenceNumber seq = 1;
319 std::string value;
320 Status s;
321 MergeContext merge_context;
322 InternalKeyComparator ikey_cmp(options.comparator);
323 SequenceNumber max_covering_tombstone_seq = 0;
324 autovector<MemTable*> to_delete;
325
326 LookupKey lkey("key1", seq);
327 bool found = list.current()->Get(lkey, &value, &s, &merge_context,
328 &max_covering_tombstone_seq, ReadOptions());
329 ASSERT_FALSE(found);
330
331 // Create a MemTable
332 InternalKeyComparator cmp(BytewiseComparator());
333 auto factory = std::make_shared<SkipListFactory>();
334 options.memtable_factory = factory;
335 ImmutableCFOptions ioptions(options);
336
337 WriteBufferManager wb(options.db_write_buffer_size);
338 MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
339 kMaxSequenceNumber, 0 /* column_family_id */);
340 mem->Ref();
341
342 // Write some keys to this memtable.
343 mem->Add(++seq, kTypeDeletion, "key1", "");
344 mem->Add(++seq, kTypeValue, "key2", "value2");
345 mem->Add(++seq, kTypeValue, "key2", "value2.2");
346
347 // Fetch the newly written keys
348 merge_context.Clear();
349 found = mem->Get(LookupKey("key1", seq), &value, &s, &merge_context,
350 &max_covering_tombstone_seq, ReadOptions());
351 // MemTable found out that this key is *not* found (at this sequence#)
352 ASSERT_TRUE(found && s.IsNotFound());
353
354 merge_context.Clear();
355 found = mem->Get(LookupKey("key2", seq), &value, &s, &merge_context,
356 &max_covering_tombstone_seq, ReadOptions());
357 ASSERT_TRUE(s.ok() && found);
358 ASSERT_EQ(value, "value2.2");
359
360 // Add memtable to list
361 list.Add(mem, &to_delete);
362 ASSERT_EQ(0, to_delete.size());
363
364 // Fetch keys via MemTableList
365 merge_context.Clear();
366 found =
367 list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context,
368 &max_covering_tombstone_seq, ReadOptions());
369 ASSERT_TRUE(found && s.IsNotFound());
370
371 merge_context.Clear();
372 found =
373 list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context,
374 &max_covering_tombstone_seq, ReadOptions());
375 ASSERT_TRUE(s.ok() && found);
376 ASSERT_EQ("value2.2", value);
377
378 // Flush this memtable from the list.
379 // (It will then be a part of the memtable history).
380 autovector<MemTable*> to_flush;
381 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
382 ASSERT_EQ(1, to_flush.size());
383
384 MutableCFOptions mutable_cf_options(options);
385 s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush,
386 &to_delete);
387 ASSERT_OK(s);
388 ASSERT_EQ(0, list.NumNotFlushed());
389 ASSERT_EQ(1, list.NumFlushed());
390 ASSERT_EQ(0, to_delete.size());
391
392 // Verify keys are no longer in MemTableList
393 merge_context.Clear();
394 found =
395 list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context,
396 &max_covering_tombstone_seq, ReadOptions());
397 ASSERT_FALSE(found);
398
399 merge_context.Clear();
400 found =
401 list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context,
402 &max_covering_tombstone_seq, ReadOptions());
403 ASSERT_FALSE(found);
404
405 // Verify keys are present in history
406 merge_context.Clear();
407 found = list.current()->GetFromHistory(
408 LookupKey("key1", seq), &value, &s, &merge_context,
409 &max_covering_tombstone_seq, ReadOptions());
410 ASSERT_TRUE(found && s.IsNotFound());
411
412 merge_context.Clear();
413 found = list.current()->GetFromHistory(
414 LookupKey("key2", seq), &value, &s, &merge_context,
415 &max_covering_tombstone_seq, ReadOptions());
416 ASSERT_TRUE(found);
417 ASSERT_EQ("value2.2", value);
418
419 // Create another memtable and write some keys to it
420 WriteBufferManager wb2(options.db_write_buffer_size);
421 MemTable* mem2 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb2,
422 kMaxSequenceNumber, 0 /* column_family_id */);
423 mem2->Ref();
424
425 mem2->Add(++seq, kTypeDeletion, "key1", "");
426 mem2->Add(++seq, kTypeValue, "key3", "value3");
427
428 // Add second memtable to list
429 list.Add(mem2, &to_delete);
430 ASSERT_EQ(0, to_delete.size());
431
432 to_flush.clear();
433 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
434 ASSERT_EQ(1, to_flush.size());
435
436 // Flush second memtable
437 s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush,
438 &to_delete);
439 ASSERT_OK(s);
440 ASSERT_EQ(0, list.NumNotFlushed());
441 ASSERT_EQ(2, list.NumFlushed());
442 ASSERT_EQ(0, to_delete.size());
443
444 // Add a third memtable to push the first memtable out of the history
445 WriteBufferManager wb3(options.db_write_buffer_size);
446 MemTable* mem3 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb3,
447 kMaxSequenceNumber, 0 /* column_family_id */);
448 mem3->Ref();
449 list.Add(mem3, &to_delete);
450 ASSERT_EQ(1, list.NumNotFlushed());
451 ASSERT_EQ(1, list.NumFlushed());
452 ASSERT_EQ(1, to_delete.size());
453
454 // Verify keys are no longer in MemTableList
455 merge_context.Clear();
456 found =
457 list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context,
458 &max_covering_tombstone_seq, ReadOptions());
459 ASSERT_FALSE(found);
460
461 merge_context.Clear();
462 found =
463 list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context,
464 &max_covering_tombstone_seq, ReadOptions());
465 ASSERT_FALSE(found);
466
467 merge_context.Clear();
468 found =
469 list.current()->Get(LookupKey("key3", seq), &value, &s, &merge_context,
470 &max_covering_tombstone_seq, ReadOptions());
471 ASSERT_FALSE(found);
472
473 // Verify that the second memtable's keys are in the history
474 merge_context.Clear();
475 found = list.current()->GetFromHistory(
476 LookupKey("key1", seq), &value, &s, &merge_context,
477 &max_covering_tombstone_seq, ReadOptions());
478 ASSERT_TRUE(found && s.IsNotFound());
479
480 merge_context.Clear();
481 found = list.current()->GetFromHistory(
482 LookupKey("key3", seq), &value, &s, &merge_context,
483 &max_covering_tombstone_seq, ReadOptions());
484 ASSERT_TRUE(found);
485 ASSERT_EQ("value3", value);
486
487 // Verify that key2 from the first memtable is no longer in the history
488 merge_context.Clear();
489 found =
490 list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context,
491 &max_covering_tombstone_seq, ReadOptions());
492 ASSERT_FALSE(found);
493
494 // Cleanup
495 list.current()->Unref(&to_delete);
496 ASSERT_EQ(3, to_delete.size());
497 for (MemTable* m : to_delete) {
498 delete m;
499 }
500 }
501
502 TEST_F(MemTableListTest, FlushPendingTest) {
503 const int num_tables = 6;
504 SequenceNumber seq = 1;
505 Status s;
506
507 auto factory = std::make_shared<SkipListFactory>();
508 options.memtable_factory = factory;
509 ImmutableCFOptions ioptions(options);
510 InternalKeyComparator cmp(BytewiseComparator());
511 WriteBufferManager wb(options.db_write_buffer_size);
512 autovector<MemTable*> to_delete;
513
514 // Create MemTableList
515 int min_write_buffer_number_to_merge = 3;
516 int max_write_buffer_number_to_maintain = 7;
517 MemTableList list(min_write_buffer_number_to_merge,
518 max_write_buffer_number_to_maintain);
519
520 // Create some MemTables
521 uint64_t memtable_id = 0;
522 std::vector<MemTable*> tables;
523 MutableCFOptions mutable_cf_options(options);
524 for (int i = 0; i < num_tables; i++) {
525 MemTable* mem = new MemTable(cmp, ioptions, mutable_cf_options, &wb,
526 kMaxSequenceNumber, 0 /* column_family_id */);
527 mem->SetID(memtable_id++);
528 mem->Ref();
529
530 std::string value;
531 MergeContext merge_context;
532
533 mem->Add(++seq, kTypeValue, "key1", ToString(i));
534 mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN");
535 mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value");
536 mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM");
537 mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), "");
538
539 tables.push_back(mem);
540 }
541
542 // Nothing to flush
543 ASSERT_FALSE(list.IsFlushPending());
544 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
545 autovector<MemTable*> to_flush;
546 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
547 ASSERT_EQ(0, to_flush.size());
548
549 // Request a flush even though there is nothing to flush
550 list.FlushRequested();
551 ASSERT_FALSE(list.IsFlushPending());
552 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
553
554 // Attempt to 'flush' to clear request for flush
555 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
556 ASSERT_EQ(0, to_flush.size());
557 ASSERT_FALSE(list.IsFlushPending());
558 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
559
560 // Request a flush again
561 list.FlushRequested();
562 // No flush pending since the list is empty.
563 ASSERT_FALSE(list.IsFlushPending());
564 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
565
566 // Add 2 tables
567 list.Add(tables[0], &to_delete);
568 list.Add(tables[1], &to_delete);
569 ASSERT_EQ(2, list.NumNotFlushed());
570 ASSERT_EQ(0, to_delete.size());
571
572 // Even though we have less than the minimum to flush, a flush is
573 // pending since we had previously requested a flush and never called
574 // PickMemtablesToFlush() to clear the flush.
575 ASSERT_TRUE(list.IsFlushPending());
576 ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
577
578 // Pick tables to flush
579 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
580 ASSERT_EQ(2, to_flush.size());
581 ASSERT_EQ(2, list.NumNotFlushed());
582 ASSERT_FALSE(list.IsFlushPending());
583 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
584
585 // Revert flush
586 list.RollbackMemtableFlush(to_flush, 0);
587 ASSERT_FALSE(list.IsFlushPending());
588 ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
589 to_flush.clear();
590
591 // Add another table
592 list.Add(tables[2], &to_delete);
593 // We now have the minimum to flush regardles of whether FlushRequested()
594 // was called.
595 ASSERT_TRUE(list.IsFlushPending());
596 ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
597 ASSERT_EQ(0, to_delete.size());
598
599 // Pick tables to flush
600 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
601 ASSERT_EQ(3, to_flush.size());
602 ASSERT_EQ(3, list.NumNotFlushed());
603 ASSERT_FALSE(list.IsFlushPending());
604 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
605
606 // Pick tables to flush again
607 autovector<MemTable*> to_flush2;
608 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2);
609 ASSERT_EQ(0, to_flush2.size());
610 ASSERT_EQ(3, list.NumNotFlushed());
611 ASSERT_FALSE(list.IsFlushPending());
612 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
613
614 // Add another table
615 list.Add(tables[3], &to_delete);
616 ASSERT_FALSE(list.IsFlushPending());
617 ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
618 ASSERT_EQ(0, to_delete.size());
619
620 // Request a flush again
621 list.FlushRequested();
622 ASSERT_TRUE(list.IsFlushPending());
623 ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
624
625 // Pick tables to flush again
626 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2);
627 ASSERT_EQ(1, to_flush2.size());
628 ASSERT_EQ(4, list.NumNotFlushed());
629 ASSERT_FALSE(list.IsFlushPending());
630 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
631
632 // Rollback first pick of tables
633 list.RollbackMemtableFlush(to_flush, 0);
634 ASSERT_TRUE(list.IsFlushPending());
635 ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
636 to_flush.clear();
637
638 // Add another tables
639 list.Add(tables[4], &to_delete);
640 ASSERT_EQ(5, list.NumNotFlushed());
641 // We now have the minimum to flush regardles of whether FlushRequested()
642 ASSERT_TRUE(list.IsFlushPending());
643 ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
644 ASSERT_EQ(0, to_delete.size());
645
646 // Pick tables to flush
647 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
648 // Should pick 4 of 5 since 1 table has been picked in to_flush2
649 ASSERT_EQ(4, to_flush.size());
650 ASSERT_EQ(5, list.NumNotFlushed());
651 ASSERT_FALSE(list.IsFlushPending());
652 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
653
654 // Pick tables to flush again
655 autovector<MemTable*> to_flush3;
656 list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush3);
657 ASSERT_EQ(0, to_flush3.size()); // nothing not in progress of being flushed
658 ASSERT_EQ(5, list.NumNotFlushed());
659 ASSERT_FALSE(list.IsFlushPending());
660 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
661
662 // Flush the 4 memtables that were picked in to_flush
663 s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush,
664 &to_delete);
665 ASSERT_OK(s);
666
667 // Note: now to_flush contains tables[0,1,2,4]. to_flush2 contains
668 // tables[3].
669 // Current implementation will only commit memtables in the order they were
670 // created. So TryInstallMemtableFlushResults will install the first 3 tables
671 // in to_flush and stop when it encounters a table not yet flushed.
672 ASSERT_EQ(2, list.NumNotFlushed());
673 int num_in_history = std::min(3, max_write_buffer_number_to_maintain);
674 ASSERT_EQ(num_in_history, list.NumFlushed());
675 ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size());
676
677 // Request a flush again. Should be nothing to flush
678 list.FlushRequested();
679 ASSERT_FALSE(list.IsFlushPending());
680 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
681
682 // Flush the 1 memtable that was picked in to_flush2
683 s = MemTableListTest::Mock_InstallMemtableFlushResults(
684 &list, mutable_cf_options, to_flush2, &to_delete);
685 ASSERT_OK(s);
686
687 // This will actually install 2 tables. The 1 we told it to flush, and also
688 // tables[4] which has been waiting for tables[3] to commit.
689 ASSERT_EQ(0, list.NumNotFlushed());
690 num_in_history = std::min(5, max_write_buffer_number_to_maintain);
691 ASSERT_EQ(num_in_history, list.NumFlushed());
692 ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size());
693
694 for (const auto& m : to_delete) {
695 // Refcount should be 0 after calling TryInstallMemtableFlushResults.
696 // Verify this, by Ref'ing then UnRef'ing:
697 m->Ref();
698 ASSERT_EQ(m, m->Unref());
699 delete m;
700 }
701 to_delete.clear();
702
703 // Add another table
704 list.Add(tables[5], &to_delete);
705 ASSERT_EQ(1, list.NumNotFlushed());
706 ASSERT_EQ(5, list.GetLatestMemTableID());
707 memtable_id = 4;
708 // Pick tables to flush. The tables to pick must have ID smaller than or
709 // equal to 4. Therefore, no table will be selected in this case.
710 autovector<MemTable*> to_flush4;
711 list.FlushRequested();
712 ASSERT_TRUE(list.HasFlushRequested());
713 list.PickMemtablesToFlush(&memtable_id, &to_flush4);
714 ASSERT_TRUE(to_flush4.empty());
715 ASSERT_EQ(1, list.NumNotFlushed());
716 ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
717 ASSERT_FALSE(list.IsFlushPending());
718 ASSERT_FALSE(list.HasFlushRequested());
719
720 // Pick tables to flush. The tables to pick must have ID smaller than or
721 // equal to 5. Therefore, only tables[5] will be selected.
722 memtable_id = 5;
723 list.FlushRequested();
724 list.PickMemtablesToFlush(&memtable_id, &to_flush4);
725 ASSERT_EQ(1, static_cast<int>(to_flush4.size()));
726 ASSERT_EQ(1, list.NumNotFlushed());
727 ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
728 ASSERT_FALSE(list.IsFlushPending());
729 to_delete.clear();
730
731 list.current()->Unref(&to_delete);
732 int to_delete_size =
733 std::min(num_tables, max_write_buffer_number_to_maintain);
734 ASSERT_EQ(to_delete_size, to_delete.size());
735
736 for (const auto& m : to_delete) {
737 // Refcount should be 0 after calling TryInstallMemtableFlushResults.
738 // Verify this, by Ref'ing then UnRef'ing:
739 m->Ref();
740 ASSERT_EQ(m, m->Unref());
741 delete m;
742 }
743 to_delete.clear();
744 }
745
746 TEST_F(MemTableListTest, EmptyAtomicFlusTest) {
747 autovector<MemTableList*> lists;
748 autovector<uint32_t> cf_ids;
749 autovector<const MutableCFOptions*> options_list;
750 autovector<const autovector<MemTable*>*> to_flush;
751 autovector<MemTable*> to_delete;
752 Status s = Mock_InstallMemtableAtomicFlushResults(lists, cf_ids, options_list,
753 to_flush, &to_delete);
754 ASSERT_OK(s);
755 ASSERT_TRUE(to_delete.empty());
756 }
757
758 TEST_F(MemTableListTest, AtomicFlusTest) {
759 const int num_cfs = 3;
760 const int num_tables_per_cf = 2;
761 SequenceNumber seq = 1;
762
763 auto factory = std::make_shared<SkipListFactory>();
764 options.memtable_factory = factory;
765 ImmutableCFOptions ioptions(options);
766 InternalKeyComparator cmp(BytewiseComparator());
767 WriteBufferManager wb(options.db_write_buffer_size);
768
769 // Create MemTableLists
770 int min_write_buffer_number_to_merge = 3;
771 int max_write_buffer_number_to_maintain = 7;
772 autovector<MemTableList*> lists;
773 for (int i = 0; i != num_cfs; ++i) {
774 lists.emplace_back(new MemTableList(min_write_buffer_number_to_merge,
775 max_write_buffer_number_to_maintain));
776 }
777
778 autovector<uint32_t> cf_ids;
779 std::vector<std::vector<MemTable*>> tables(num_cfs);
780 autovector<const MutableCFOptions*> mutable_cf_options_list;
781 uint32_t cf_id = 0;
782 for (auto& elem : tables) {
783 mutable_cf_options_list.emplace_back(new MutableCFOptions(options));
784 uint64_t memtable_id = 0;
785 for (int i = 0; i != num_tables_per_cf; ++i) {
786 MemTable* mem =
787 new MemTable(cmp, ioptions, *(mutable_cf_options_list.back()), &wb,
788 kMaxSequenceNumber, cf_id);
789 mem->SetID(memtable_id++);
790 mem->Ref();
791
792 std::string value;
793
794 mem->Add(++seq, kTypeValue, "key1", ToString(i));
795 mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN");
796 mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value");
797 mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM");
798 mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), "");
799
800 elem.push_back(mem);
801 }
802 cf_ids.push_back(cf_id++);
803 }
804
805 std::vector<autovector<MemTable*>> flush_candidates(num_cfs);
806
807 // Nothing to flush
808 for (auto i = 0; i != num_cfs; ++i) {
809 auto* list = lists[i];
810 ASSERT_FALSE(list->IsFlushPending());
811 ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire));
812 list->PickMemtablesToFlush(nullptr /* memtable_id */, &flush_candidates[i]);
813 ASSERT_EQ(0, flush_candidates[i].size());
814 }
815 // Request flush even though there is nothing to flush
816 for (auto i = 0; i != num_cfs; ++i) {
817 auto* list = lists[i];
818 list->FlushRequested();
819 ASSERT_FALSE(list->IsFlushPending());
820 ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire));
821 }
822 autovector<MemTable*> to_delete;
823 // Add tables to the immutable memtalbe lists associated with column families
824 for (auto i = 0; i != num_cfs; ++i) {
825 for (auto j = 0; j != num_tables_per_cf; ++j) {
826 lists[i]->Add(tables[i][j], &to_delete);
827 }
828 ASSERT_EQ(num_tables_per_cf, lists[i]->NumNotFlushed());
829 ASSERT_TRUE(lists[i]->IsFlushPending());
830 ASSERT_TRUE(lists[i]->imm_flush_needed.load(std::memory_order_acquire));
831 }
832 std::vector<uint64_t> flush_memtable_ids = {1, 1, 0};
833 // +----+
834 // list[0]: |0 1|
835 // list[1]: |0 1|
836 // | +--+
837 // list[2]: |0| 1
838 // +-+
839 // Pick memtables to flush
840 for (auto i = 0; i != num_cfs; ++i) {
841 flush_candidates[i].clear();
842 lists[i]->PickMemtablesToFlush(&flush_memtable_ids[i],
843 &flush_candidates[i]);
844 ASSERT_EQ(flush_memtable_ids[i] - 0 + 1,
845 static_cast<uint64_t>(flush_candidates[i].size()));
846 }
847 autovector<MemTableList*> tmp_lists;
848 autovector<uint32_t> tmp_cf_ids;
849 autovector<const MutableCFOptions*> tmp_options_list;
850 autovector<const autovector<MemTable*>*> to_flush;
851 for (auto i = 0; i != num_cfs; ++i) {
852 if (!flush_candidates[i].empty()) {
853 to_flush.push_back(&flush_candidates[i]);
854 tmp_lists.push_back(lists[i]);
855 tmp_cf_ids.push_back(i);
856 tmp_options_list.push_back(mutable_cf_options_list[i]);
857 }
858 }
859 Status s = Mock_InstallMemtableAtomicFlushResults(
860 tmp_lists, tmp_cf_ids, tmp_options_list, to_flush, &to_delete);
861 ASSERT_OK(s);
862
863 for (auto i = 0; i != num_cfs; ++i) {
864 for (auto j = 0; j != num_tables_per_cf; ++j) {
865 if (static_cast<uint64_t>(j) <= flush_memtable_ids[i]) {
866 ASSERT_LT(0, tables[i][j]->GetFileNumber());
867 }
868 }
869 ASSERT_EQ(
870 static_cast<size_t>(num_tables_per_cf) - flush_candidates[i].size(),
871 lists[i]->NumNotFlushed());
872 }
873
874 to_delete.clear();
875 for (auto list : lists) {
876 list->current()->Unref(&to_delete);
877 delete list;
878 }
879 for (auto& mutable_cf_options : mutable_cf_options_list) {
880 if (mutable_cf_options != nullptr) {
881 delete mutable_cf_options;
882 mutable_cf_options = nullptr;
883 }
884 }
885 // All memtables in tables array must have been flushed, thus ready to be
886 // deleted.
887 ASSERT_EQ(to_delete.size(), tables.size() * tables.front().size());
888 for (const auto& m : to_delete) {
889 // Refcount should be 0 after calling InstallMemtableFlushResults.
890 // Verify this by Ref'ing and then Unref'ing.
891 m->Ref();
892 ASSERT_EQ(m, m->Unref());
893 delete m;
894 }
895 }
896
897 } // namespace rocksdb
898
899 int main(int argc, char** argv) {
900 ::testing::InitGoogleTest(&argc, argv);
901 return RUN_ALL_TESTS();
902 }