1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "db/memtable_list.h"
10 #include "db/merge_context.h"
11 #include "db/version_set.h"
12 #include "db/write_controller.h"
13 #include "rocksdb/db.h"
14 #include "rocksdb/status.h"
15 #include "rocksdb/write_buffer_manager.h"
16 #include "util/string_util.h"
17 #include "util/testharness.h"
18 #include "util/testutil.h"
22 class MemTableListTest
: public testing::Test
{
27 std::vector
<ColumnFamilyHandle
*> handles
;
28 std::atomic
<uint64_t> file_number
;
30 MemTableListTest() : db(nullptr), file_number(1) {
31 dbname
= test::PerThreadDBPath("memtable_list_test");
32 options
.create_if_missing
= true;
33 DestroyDB(dbname
, options
);
36 // Create a test db if not yet created
39 options
.create_if_missing
= true;
40 DestroyDB(dbname
, options
);
41 // Open DB only with default column family
42 ColumnFamilyOptions cf_options
;
43 std::vector
<ColumnFamilyDescriptor
> cf_descs
;
44 cf_descs
.emplace_back(kDefaultColumnFamilyName
, cf_options
);
45 Status s
= DB::Open(options
, dbname
, cf_descs
, &handles
, &db
);
48 ColumnFamilyOptions cf_opt1
, cf_opt2
;
49 cf_opt1
.cf_paths
.emplace_back(dbname
+ "_one_1",
50 std::numeric_limits
<uint64_t>::max());
51 cf_opt2
.cf_paths
.emplace_back(dbname
+ "_two_1",
52 std::numeric_limits
<uint64_t>::max());
53 int sz
= static_cast<int>(handles
.size());
54 handles
.resize(sz
+ 2);
55 s
= db
->CreateColumnFamily(cf_opt1
, "one", &handles
[1]);
57 s
= db
->CreateColumnFamily(cf_opt2
, "two", &handles
[2]);
60 cf_descs
.emplace_back("one", cf_options
);
61 cf_descs
.emplace_back("two", cf_options
);
65 ~MemTableListTest() override
{
67 std::vector
<ColumnFamilyDescriptor
> cf_descs(handles
.size());
68 for (int i
= 0; i
!= static_cast<int>(handles
.size()); ++i
) {
69 handles
[i
]->GetDescriptor(&cf_descs
[i
]);
71 for (auto h
: handles
) {
73 db
->DestroyColumnFamilyHandle(h
);
79 DestroyDB(dbname
, options
, cf_descs
);
83 // Calls MemTableList::TryInstallMemtableFlushResults() and sets up all
84 // structures needed to call this function.
85 Status
Mock_InstallMemtableFlushResults(
86 MemTableList
* list
, const MutableCFOptions
& mutable_cf_options
,
87 const autovector
<MemTable
*>& m
, autovector
<MemTable
*>* to_delete
) {
88 // Create a mock Logger
89 test::NullLogger logger
;
90 LogBuffer
log_buffer(DEBUG_LEVEL
, &logger
);
93 // Create a mock VersionSet
95 ImmutableDBOptions
immutable_db_options(db_options
);
96 EnvOptions env_options
;
97 std::shared_ptr
<Cache
> table_cache(NewLRUCache(50000, 16));
98 WriteBufferManager
write_buffer_manager(db_options
.db_write_buffer_size
);
99 WriteController
write_controller(10000000u);
101 VersionSet
versions(dbname
, &immutable_db_options
, env_options
,
102 table_cache
.get(), &write_buffer_manager
,
104 std::vector
<ColumnFamilyDescriptor
> cf_descs
;
105 cf_descs
.emplace_back(kDefaultColumnFamilyName
, ColumnFamilyOptions());
106 cf_descs
.emplace_back("one", ColumnFamilyOptions());
107 cf_descs
.emplace_back("two", ColumnFamilyOptions());
109 EXPECT_OK(versions
.Recover(cf_descs
, false));
111 // Create mock default ColumnFamilyData
112 auto column_family_set
= versions
.GetColumnFamilySet();
113 LogsWithPrepTracker dummy_prep_tracker
;
114 auto cfd
= column_family_set
->GetDefault();
115 EXPECT_TRUE(nullptr != cfd
);
116 uint64_t file_num
= file_number
.fetch_add(1);
117 // Create dummy mutex.
118 InstrumentedMutex mutex
;
119 InstrumentedMutexLock
l(&mutex
);
120 return list
->TryInstallMemtableFlushResults(
121 cfd
, mutable_cf_options
, m
, &dummy_prep_tracker
, &versions
, &mutex
,
122 file_num
, to_delete
, nullptr, &log_buffer
);
125 // Calls MemTableList::InstallMemtableFlushResults() and sets up all
126 // structures needed to call this function.
127 Status
Mock_InstallMemtableAtomicFlushResults(
128 autovector
<MemTableList
*>& lists
, const autovector
<uint32_t>& cf_ids
,
129 const autovector
<const MutableCFOptions
*>& mutable_cf_options_list
,
130 const autovector
<const autovector
<MemTable
*>*>& mems_list
,
131 autovector
<MemTable
*>* to_delete
) {
132 // Create a mock Logger
133 test::NullLogger logger
;
134 LogBuffer
log_buffer(DEBUG_LEVEL
, &logger
);
137 // Create a mock VersionSet
138 DBOptions db_options
;
139 ImmutableDBOptions
immutable_db_options(db_options
);
140 EnvOptions env_options
;
141 std::shared_ptr
<Cache
> table_cache(NewLRUCache(50000, 16));
142 WriteBufferManager
write_buffer_manager(db_options
.db_write_buffer_size
);
143 WriteController
write_controller(10000000u);
145 VersionSet
versions(dbname
, &immutable_db_options
, env_options
,
146 table_cache
.get(), &write_buffer_manager
,
148 std::vector
<ColumnFamilyDescriptor
> cf_descs
;
149 cf_descs
.emplace_back(kDefaultColumnFamilyName
, ColumnFamilyOptions());
150 cf_descs
.emplace_back("one", ColumnFamilyOptions());
151 cf_descs
.emplace_back("two", ColumnFamilyOptions());
152 EXPECT_OK(versions
.Recover(cf_descs
, false));
154 // Create mock default ColumnFamilyData
156 auto column_family_set
= versions
.GetColumnFamilySet();
158 LogsWithPrepTracker dummy_prep_tracker
;
159 autovector
<ColumnFamilyData
*> cfds
;
160 for (int i
= 0; i
!= static_cast<int>(cf_ids
.size()); ++i
) {
161 cfds
.emplace_back(column_family_set
->GetColumnFamily(cf_ids
[i
]));
162 EXPECT_NE(nullptr, cfds
[i
]);
164 std::vector
<FileMetaData
> file_metas
;
165 file_metas
.reserve(cf_ids
.size());
166 for (size_t i
= 0; i
!= cf_ids
.size(); ++i
) {
168 uint64_t file_num
= file_number
.fetch_add(1);
169 meta
.fd
= FileDescriptor(file_num
, 0, 0);
170 file_metas
.emplace_back(meta
);
172 autovector
<FileMetaData
*> file_meta_ptrs
;
173 for (auto& meta
: file_metas
) {
174 file_meta_ptrs
.push_back(&meta
);
176 InstrumentedMutex mutex
;
177 InstrumentedMutexLock
l(&mutex
);
178 return InstallMemtableAtomicFlushResults(
179 &lists
, cfds
, mutable_cf_options_list
, mems_list
, &versions
, &mutex
,
180 file_meta_ptrs
, to_delete
, nullptr, &log_buffer
);
184 TEST_F(MemTableListTest
, Empty
) {
185 // Create an empty MemTableList and validate basic functions.
186 MemTableList
list(1, 0);
188 ASSERT_EQ(0, list
.NumNotFlushed());
189 ASSERT_FALSE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
190 ASSERT_FALSE(list
.IsFlushPending());
192 autovector
<MemTable
*> mems
;
193 list
.PickMemtablesToFlush(nullptr /* memtable_id */, &mems
);
194 ASSERT_EQ(0, mems
.size());
196 autovector
<MemTable
*> to_delete
;
197 list
.current()->Unref(&to_delete
);
198 ASSERT_EQ(0, to_delete
.size());
201 TEST_F(MemTableListTest
, GetTest
) {
202 // Create MemTableList
203 int min_write_buffer_number_to_merge
= 2;
204 int max_write_buffer_number_to_maintain
= 0;
205 MemTableList
list(min_write_buffer_number_to_merge
,
206 max_write_buffer_number_to_maintain
);
208 SequenceNumber seq
= 1;
211 MergeContext merge_context
;
212 InternalKeyComparator
ikey_cmp(options
.comparator
);
213 SequenceNumber max_covering_tombstone_seq
= 0;
214 autovector
<MemTable
*> to_delete
;
216 LookupKey
lkey("key1", seq
);
217 bool found
= list
.current()->Get(lkey
, &value
, &s
, &merge_context
,
218 &max_covering_tombstone_seq
, ReadOptions());
222 InternalKeyComparator
cmp(BytewiseComparator());
223 auto factory
= std::make_shared
<SkipListFactory
>();
224 options
.memtable_factory
= factory
;
225 ImmutableCFOptions
ioptions(options
);
227 WriteBufferManager
wb(options
.db_write_buffer_size
);
228 MemTable
* mem
= new MemTable(cmp
, ioptions
, MutableCFOptions(options
), &wb
,
229 kMaxSequenceNumber
, 0 /* column_family_id */);
232 // Write some keys to this memtable.
233 mem
->Add(++seq
, kTypeDeletion
, "key1", "");
234 mem
->Add(++seq
, kTypeValue
, "key2", "value2");
235 mem
->Add(++seq
, kTypeValue
, "key1", "value1");
236 mem
->Add(++seq
, kTypeValue
, "key2", "value2.2");
238 // Fetch the newly written keys
239 merge_context
.Clear();
240 found
= mem
->Get(LookupKey("key1", seq
), &value
, &s
, &merge_context
,
241 &max_covering_tombstone_seq
, ReadOptions());
242 ASSERT_TRUE(s
.ok() && found
);
243 ASSERT_EQ(value
, "value1");
245 merge_context
.Clear();
246 found
= mem
->Get(LookupKey("key1", 2), &value
, &s
, &merge_context
,
247 &max_covering_tombstone_seq
, ReadOptions());
248 // MemTable found out that this key is *not* found (at this sequence#)
249 ASSERT_TRUE(found
&& s
.IsNotFound());
251 merge_context
.Clear();
252 found
= mem
->Get(LookupKey("key2", seq
), &value
, &s
, &merge_context
,
253 &max_covering_tombstone_seq
, ReadOptions());
254 ASSERT_TRUE(s
.ok() && found
);
255 ASSERT_EQ(value
, "value2.2");
257 ASSERT_EQ(4, mem
->num_entries());
258 ASSERT_EQ(1, mem
->num_deletes());
260 // Add memtable to list
261 list
.Add(mem
, &to_delete
);
263 SequenceNumber saved_seq
= seq
;
265 // Create another memtable and write some keys to it
266 WriteBufferManager
wb2(options
.db_write_buffer_size
);
267 MemTable
* mem2
= new MemTable(cmp
, ioptions
, MutableCFOptions(options
), &wb2
,
268 kMaxSequenceNumber
, 0 /* column_family_id */);
271 mem2
->Add(++seq
, kTypeDeletion
, "key1", "");
272 mem2
->Add(++seq
, kTypeValue
, "key2", "value2.3");
274 // Add second memtable to list
275 list
.Add(mem2
, &to_delete
);
277 // Fetch keys via MemTableList
278 merge_context
.Clear();
280 list
.current()->Get(LookupKey("key1", seq
), &value
, &s
, &merge_context
,
281 &max_covering_tombstone_seq
, ReadOptions());
282 ASSERT_TRUE(found
&& s
.IsNotFound());
284 merge_context
.Clear();
285 found
= list
.current()->Get(LookupKey("key1", saved_seq
), &value
, &s
,
286 &merge_context
, &max_covering_tombstone_seq
,
288 ASSERT_TRUE(s
.ok() && found
);
289 ASSERT_EQ("value1", value
);
291 merge_context
.Clear();
293 list
.current()->Get(LookupKey("key2", seq
), &value
, &s
, &merge_context
,
294 &max_covering_tombstone_seq
, ReadOptions());
295 ASSERT_TRUE(s
.ok() && found
);
296 ASSERT_EQ(value
, "value2.3");
298 merge_context
.Clear();
299 found
= list
.current()->Get(LookupKey("key2", 1), &value
, &s
, &merge_context
,
300 &max_covering_tombstone_seq
, ReadOptions());
303 ASSERT_EQ(2, list
.NumNotFlushed());
305 list
.current()->Unref(&to_delete
);
306 for (MemTable
* m
: to_delete
) {
311 TEST_F(MemTableListTest
, GetFromHistoryTest
) {
312 // Create MemTableList
313 int min_write_buffer_number_to_merge
= 2;
314 int max_write_buffer_number_to_maintain
= 2;
315 MemTableList
list(min_write_buffer_number_to_merge
,
316 max_write_buffer_number_to_maintain
);
318 SequenceNumber seq
= 1;
321 MergeContext merge_context
;
322 InternalKeyComparator
ikey_cmp(options
.comparator
);
323 SequenceNumber max_covering_tombstone_seq
= 0;
324 autovector
<MemTable
*> to_delete
;
326 LookupKey
lkey("key1", seq
);
327 bool found
= list
.current()->Get(lkey
, &value
, &s
, &merge_context
,
328 &max_covering_tombstone_seq
, ReadOptions());
332 InternalKeyComparator
cmp(BytewiseComparator());
333 auto factory
= std::make_shared
<SkipListFactory
>();
334 options
.memtable_factory
= factory
;
335 ImmutableCFOptions
ioptions(options
);
337 WriteBufferManager
wb(options
.db_write_buffer_size
);
338 MemTable
* mem
= new MemTable(cmp
, ioptions
, MutableCFOptions(options
), &wb
,
339 kMaxSequenceNumber
, 0 /* column_family_id */);
342 // Write some keys to this memtable.
343 mem
->Add(++seq
, kTypeDeletion
, "key1", "");
344 mem
->Add(++seq
, kTypeValue
, "key2", "value2");
345 mem
->Add(++seq
, kTypeValue
, "key2", "value2.2");
347 // Fetch the newly written keys
348 merge_context
.Clear();
349 found
= mem
->Get(LookupKey("key1", seq
), &value
, &s
, &merge_context
,
350 &max_covering_tombstone_seq
, ReadOptions());
351 // MemTable found out that this key is *not* found (at this sequence#)
352 ASSERT_TRUE(found
&& s
.IsNotFound());
354 merge_context
.Clear();
355 found
= mem
->Get(LookupKey("key2", seq
), &value
, &s
, &merge_context
,
356 &max_covering_tombstone_seq
, ReadOptions());
357 ASSERT_TRUE(s
.ok() && found
);
358 ASSERT_EQ(value
, "value2.2");
360 // Add memtable to list
361 list
.Add(mem
, &to_delete
);
362 ASSERT_EQ(0, to_delete
.size());
364 // Fetch keys via MemTableList
365 merge_context
.Clear();
367 list
.current()->Get(LookupKey("key1", seq
), &value
, &s
, &merge_context
,
368 &max_covering_tombstone_seq
, ReadOptions());
369 ASSERT_TRUE(found
&& s
.IsNotFound());
371 merge_context
.Clear();
373 list
.current()->Get(LookupKey("key2", seq
), &value
, &s
, &merge_context
,
374 &max_covering_tombstone_seq
, ReadOptions());
375 ASSERT_TRUE(s
.ok() && found
);
376 ASSERT_EQ("value2.2", value
);
378 // Flush this memtable from the list.
379 // (It will then be a part of the memtable history).
380 autovector
<MemTable
*> to_flush
;
381 list
.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush
);
382 ASSERT_EQ(1, to_flush
.size());
384 MutableCFOptions
mutable_cf_options(options
);
385 s
= Mock_InstallMemtableFlushResults(&list
, mutable_cf_options
, to_flush
,
388 ASSERT_EQ(0, list
.NumNotFlushed());
389 ASSERT_EQ(1, list
.NumFlushed());
390 ASSERT_EQ(0, to_delete
.size());
392 // Verify keys are no longer in MemTableList
393 merge_context
.Clear();
395 list
.current()->Get(LookupKey("key1", seq
), &value
, &s
, &merge_context
,
396 &max_covering_tombstone_seq
, ReadOptions());
399 merge_context
.Clear();
401 list
.current()->Get(LookupKey("key2", seq
), &value
, &s
, &merge_context
,
402 &max_covering_tombstone_seq
, ReadOptions());
405 // Verify keys are present in history
406 merge_context
.Clear();
407 found
= list
.current()->GetFromHistory(
408 LookupKey("key1", seq
), &value
, &s
, &merge_context
,
409 &max_covering_tombstone_seq
, ReadOptions());
410 ASSERT_TRUE(found
&& s
.IsNotFound());
412 merge_context
.Clear();
413 found
= list
.current()->GetFromHistory(
414 LookupKey("key2", seq
), &value
, &s
, &merge_context
,
415 &max_covering_tombstone_seq
, ReadOptions());
417 ASSERT_EQ("value2.2", value
);
419 // Create another memtable and write some keys to it
420 WriteBufferManager
wb2(options
.db_write_buffer_size
);
421 MemTable
* mem2
= new MemTable(cmp
, ioptions
, MutableCFOptions(options
), &wb2
,
422 kMaxSequenceNumber
, 0 /* column_family_id */);
425 mem2
->Add(++seq
, kTypeDeletion
, "key1", "");
426 mem2
->Add(++seq
, kTypeValue
, "key3", "value3");
428 // Add second memtable to list
429 list
.Add(mem2
, &to_delete
);
430 ASSERT_EQ(0, to_delete
.size());
433 list
.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush
);
434 ASSERT_EQ(1, to_flush
.size());
436 // Flush second memtable
437 s
= Mock_InstallMemtableFlushResults(&list
, mutable_cf_options
, to_flush
,
440 ASSERT_EQ(0, list
.NumNotFlushed());
441 ASSERT_EQ(2, list
.NumFlushed());
442 ASSERT_EQ(0, to_delete
.size());
444 // Add a third memtable to push the first memtable out of the history
445 WriteBufferManager
wb3(options
.db_write_buffer_size
);
446 MemTable
* mem3
= new MemTable(cmp
, ioptions
, MutableCFOptions(options
), &wb3
,
447 kMaxSequenceNumber
, 0 /* column_family_id */);
449 list
.Add(mem3
, &to_delete
);
450 ASSERT_EQ(1, list
.NumNotFlushed());
451 ASSERT_EQ(1, list
.NumFlushed());
452 ASSERT_EQ(1, to_delete
.size());
454 // Verify keys are no longer in MemTableList
455 merge_context
.Clear();
457 list
.current()->Get(LookupKey("key1", seq
), &value
, &s
, &merge_context
,
458 &max_covering_tombstone_seq
, ReadOptions());
461 merge_context
.Clear();
463 list
.current()->Get(LookupKey("key2", seq
), &value
, &s
, &merge_context
,
464 &max_covering_tombstone_seq
, ReadOptions());
467 merge_context
.Clear();
469 list
.current()->Get(LookupKey("key3", seq
), &value
, &s
, &merge_context
,
470 &max_covering_tombstone_seq
, ReadOptions());
473 // Verify that the second memtable's keys are in the history
474 merge_context
.Clear();
475 found
= list
.current()->GetFromHistory(
476 LookupKey("key1", seq
), &value
, &s
, &merge_context
,
477 &max_covering_tombstone_seq
, ReadOptions());
478 ASSERT_TRUE(found
&& s
.IsNotFound());
480 merge_context
.Clear();
481 found
= list
.current()->GetFromHistory(
482 LookupKey("key3", seq
), &value
, &s
, &merge_context
,
483 &max_covering_tombstone_seq
, ReadOptions());
485 ASSERT_EQ("value3", value
);
487 // Verify that key2 from the first memtable is no longer in the history
488 merge_context
.Clear();
490 list
.current()->Get(LookupKey("key2", seq
), &value
, &s
, &merge_context
,
491 &max_covering_tombstone_seq
, ReadOptions());
495 list
.current()->Unref(&to_delete
);
496 ASSERT_EQ(3, to_delete
.size());
497 for (MemTable
* m
: to_delete
) {
502 TEST_F(MemTableListTest
, FlushPendingTest
) {
503 const int num_tables
= 6;
504 SequenceNumber seq
= 1;
507 auto factory
= std::make_shared
<SkipListFactory
>();
508 options
.memtable_factory
= factory
;
509 ImmutableCFOptions
ioptions(options
);
510 InternalKeyComparator
cmp(BytewiseComparator());
511 WriteBufferManager
wb(options
.db_write_buffer_size
);
512 autovector
<MemTable
*> to_delete
;
514 // Create MemTableList
515 int min_write_buffer_number_to_merge
= 3;
516 int max_write_buffer_number_to_maintain
= 7;
517 MemTableList
list(min_write_buffer_number_to_merge
,
518 max_write_buffer_number_to_maintain
);
520 // Create some MemTables
521 uint64_t memtable_id
= 0;
522 std::vector
<MemTable
*> tables
;
523 MutableCFOptions
mutable_cf_options(options
);
524 for (int i
= 0; i
< num_tables
; i
++) {
525 MemTable
* mem
= new MemTable(cmp
, ioptions
, mutable_cf_options
, &wb
,
526 kMaxSequenceNumber
, 0 /* column_family_id */);
527 mem
->SetID(memtable_id
++);
531 MergeContext merge_context
;
533 mem
->Add(++seq
, kTypeValue
, "key1", ToString(i
));
534 mem
->Add(++seq
, kTypeValue
, "keyN" + ToString(i
), "valueN");
535 mem
->Add(++seq
, kTypeValue
, "keyX" + ToString(i
), "value");
536 mem
->Add(++seq
, kTypeValue
, "keyM" + ToString(i
), "valueM");
537 mem
->Add(++seq
, kTypeDeletion
, "keyX" + ToString(i
), "");
539 tables
.push_back(mem
);
543 ASSERT_FALSE(list
.IsFlushPending());
544 ASSERT_FALSE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
545 autovector
<MemTable
*> to_flush
;
546 list
.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush
);
547 ASSERT_EQ(0, to_flush
.size());
549 // Request a flush even though there is nothing to flush
550 list
.FlushRequested();
551 ASSERT_FALSE(list
.IsFlushPending());
552 ASSERT_FALSE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
554 // Attempt to 'flush' to clear request for flush
555 list
.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush
);
556 ASSERT_EQ(0, to_flush
.size());
557 ASSERT_FALSE(list
.IsFlushPending());
558 ASSERT_FALSE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
560 // Request a flush again
561 list
.FlushRequested();
562 // No flush pending since the list is empty.
563 ASSERT_FALSE(list
.IsFlushPending());
564 ASSERT_FALSE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
567 list
.Add(tables
[0], &to_delete
);
568 list
.Add(tables
[1], &to_delete
);
569 ASSERT_EQ(2, list
.NumNotFlushed());
570 ASSERT_EQ(0, to_delete
.size());
572 // Even though we have less than the minimum to flush, a flush is
573 // pending since we had previously requested a flush and never called
574 // PickMemtablesToFlush() to clear the flush.
575 ASSERT_TRUE(list
.IsFlushPending());
576 ASSERT_TRUE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
578 // Pick tables to flush
579 list
.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush
);
580 ASSERT_EQ(2, to_flush
.size());
581 ASSERT_EQ(2, list
.NumNotFlushed());
582 ASSERT_FALSE(list
.IsFlushPending());
583 ASSERT_FALSE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
586 list
.RollbackMemtableFlush(to_flush
, 0);
587 ASSERT_FALSE(list
.IsFlushPending());
588 ASSERT_TRUE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
592 list
.Add(tables
[2], &to_delete
);
593 // We now have the minimum to flush regardles of whether FlushRequested()
595 ASSERT_TRUE(list
.IsFlushPending());
596 ASSERT_TRUE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
597 ASSERT_EQ(0, to_delete
.size());
599 // Pick tables to flush
600 list
.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush
);
601 ASSERT_EQ(3, to_flush
.size());
602 ASSERT_EQ(3, list
.NumNotFlushed());
603 ASSERT_FALSE(list
.IsFlushPending());
604 ASSERT_FALSE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
606 // Pick tables to flush again
607 autovector
<MemTable
*> to_flush2
;
608 list
.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2
);
609 ASSERT_EQ(0, to_flush2
.size());
610 ASSERT_EQ(3, list
.NumNotFlushed());
611 ASSERT_FALSE(list
.IsFlushPending());
612 ASSERT_FALSE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
615 list
.Add(tables
[3], &to_delete
);
616 ASSERT_FALSE(list
.IsFlushPending());
617 ASSERT_TRUE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
618 ASSERT_EQ(0, to_delete
.size());
620 // Request a flush again
621 list
.FlushRequested();
622 ASSERT_TRUE(list
.IsFlushPending());
623 ASSERT_TRUE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
625 // Pick tables to flush again
626 list
.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2
);
627 ASSERT_EQ(1, to_flush2
.size());
628 ASSERT_EQ(4, list
.NumNotFlushed());
629 ASSERT_FALSE(list
.IsFlushPending());
630 ASSERT_FALSE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
632 // Rollback first pick of tables
633 list
.RollbackMemtableFlush(to_flush
, 0);
634 ASSERT_TRUE(list
.IsFlushPending());
635 ASSERT_TRUE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
638 // Add another tables
639 list
.Add(tables
[4], &to_delete
);
640 ASSERT_EQ(5, list
.NumNotFlushed());
641 // We now have the minimum to flush regardles of whether FlushRequested()
642 ASSERT_TRUE(list
.IsFlushPending());
643 ASSERT_TRUE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
644 ASSERT_EQ(0, to_delete
.size());
646 // Pick tables to flush
647 list
.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush
);
648 // Should pick 4 of 5 since 1 table has been picked in to_flush2
649 ASSERT_EQ(4, to_flush
.size());
650 ASSERT_EQ(5, list
.NumNotFlushed());
651 ASSERT_FALSE(list
.IsFlushPending());
652 ASSERT_FALSE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
654 // Pick tables to flush again
655 autovector
<MemTable
*> to_flush3
;
656 list
.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush3
);
657 ASSERT_EQ(0, to_flush3
.size()); // nothing not in progress of being flushed
658 ASSERT_EQ(5, list
.NumNotFlushed());
659 ASSERT_FALSE(list
.IsFlushPending());
660 ASSERT_FALSE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
662 // Flush the 4 memtables that were picked in to_flush
663 s
= Mock_InstallMemtableFlushResults(&list
, mutable_cf_options
, to_flush
,
667 // Note: now to_flush contains tables[0,1,2,4]. to_flush2 contains
669 // Current implementation will only commit memtables in the order they were
670 // created. So TryInstallMemtableFlushResults will install the first 3 tables
671 // in to_flush and stop when it encounters a table not yet flushed.
672 ASSERT_EQ(2, list
.NumNotFlushed());
673 int num_in_history
= std::min(3, max_write_buffer_number_to_maintain
);
674 ASSERT_EQ(num_in_history
, list
.NumFlushed());
675 ASSERT_EQ(5 - list
.NumNotFlushed() - num_in_history
, to_delete
.size());
677 // Request a flush again. Should be nothing to flush
678 list
.FlushRequested();
679 ASSERT_FALSE(list
.IsFlushPending());
680 ASSERT_FALSE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
682 // Flush the 1 memtable that was picked in to_flush2
683 s
= MemTableListTest::Mock_InstallMemtableFlushResults(
684 &list
, mutable_cf_options
, to_flush2
, &to_delete
);
687 // This will actually install 2 tables. The 1 we told it to flush, and also
688 // tables[4] which has been waiting for tables[3] to commit.
689 ASSERT_EQ(0, list
.NumNotFlushed());
690 num_in_history
= std::min(5, max_write_buffer_number_to_maintain
);
691 ASSERT_EQ(num_in_history
, list
.NumFlushed());
692 ASSERT_EQ(5 - list
.NumNotFlushed() - num_in_history
, to_delete
.size());
694 for (const auto& m
: to_delete
) {
695 // Refcount should be 0 after calling TryInstallMemtableFlushResults.
696 // Verify this, by Ref'ing then UnRef'ing:
698 ASSERT_EQ(m
, m
->Unref());
704 list
.Add(tables
[5], &to_delete
);
705 ASSERT_EQ(1, list
.NumNotFlushed());
706 ASSERT_EQ(5, list
.GetLatestMemTableID());
708 // Pick tables to flush. The tables to pick must have ID smaller than or
709 // equal to 4. Therefore, no table will be selected in this case.
710 autovector
<MemTable
*> to_flush4
;
711 list
.FlushRequested();
712 ASSERT_TRUE(list
.HasFlushRequested());
713 list
.PickMemtablesToFlush(&memtable_id
, &to_flush4
);
714 ASSERT_TRUE(to_flush4
.empty());
715 ASSERT_EQ(1, list
.NumNotFlushed());
716 ASSERT_TRUE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
717 ASSERT_FALSE(list
.IsFlushPending());
718 ASSERT_FALSE(list
.HasFlushRequested());
720 // Pick tables to flush. The tables to pick must have ID smaller than or
721 // equal to 5. Therefore, only tables[5] will be selected.
723 list
.FlushRequested();
724 list
.PickMemtablesToFlush(&memtable_id
, &to_flush4
);
725 ASSERT_EQ(1, static_cast<int>(to_flush4
.size()));
726 ASSERT_EQ(1, list
.NumNotFlushed());
727 ASSERT_FALSE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
728 ASSERT_FALSE(list
.IsFlushPending());
731 list
.current()->Unref(&to_delete
);
733 std::min(num_tables
, max_write_buffer_number_to_maintain
);
734 ASSERT_EQ(to_delete_size
, to_delete
.size());
736 for (const auto& m
: to_delete
) {
737 // Refcount should be 0 after calling TryInstallMemtableFlushResults.
738 // Verify this, by Ref'ing then UnRef'ing:
740 ASSERT_EQ(m
, m
->Unref());
746 TEST_F(MemTableListTest
, EmptyAtomicFlusTest
) {
747 autovector
<MemTableList
*> lists
;
748 autovector
<uint32_t> cf_ids
;
749 autovector
<const MutableCFOptions
*> options_list
;
750 autovector
<const autovector
<MemTable
*>*> to_flush
;
751 autovector
<MemTable
*> to_delete
;
752 Status s
= Mock_InstallMemtableAtomicFlushResults(lists
, cf_ids
, options_list
,
753 to_flush
, &to_delete
);
755 ASSERT_TRUE(to_delete
.empty());
758 TEST_F(MemTableListTest
, AtomicFlusTest
) {
759 const int num_cfs
= 3;
760 const int num_tables_per_cf
= 2;
761 SequenceNumber seq
= 1;
763 auto factory
= std::make_shared
<SkipListFactory
>();
764 options
.memtable_factory
= factory
;
765 ImmutableCFOptions
ioptions(options
);
766 InternalKeyComparator
cmp(BytewiseComparator());
767 WriteBufferManager
wb(options
.db_write_buffer_size
);
769 // Create MemTableLists
770 int min_write_buffer_number_to_merge
= 3;
771 int max_write_buffer_number_to_maintain
= 7;
772 autovector
<MemTableList
*> lists
;
773 for (int i
= 0; i
!= num_cfs
; ++i
) {
774 lists
.emplace_back(new MemTableList(min_write_buffer_number_to_merge
,
775 max_write_buffer_number_to_maintain
));
778 autovector
<uint32_t> cf_ids
;
779 std::vector
<std::vector
<MemTable
*>> tables(num_cfs
);
780 autovector
<const MutableCFOptions
*> mutable_cf_options_list
;
782 for (auto& elem
: tables
) {
783 mutable_cf_options_list
.emplace_back(new MutableCFOptions(options
));
784 uint64_t memtable_id
= 0;
785 for (int i
= 0; i
!= num_tables_per_cf
; ++i
) {
787 new MemTable(cmp
, ioptions
, *(mutable_cf_options_list
.back()), &wb
,
788 kMaxSequenceNumber
, cf_id
);
789 mem
->SetID(memtable_id
++);
794 mem
->Add(++seq
, kTypeValue
, "key1", ToString(i
));
795 mem
->Add(++seq
, kTypeValue
, "keyN" + ToString(i
), "valueN");
796 mem
->Add(++seq
, kTypeValue
, "keyX" + ToString(i
), "value");
797 mem
->Add(++seq
, kTypeValue
, "keyM" + ToString(i
), "valueM");
798 mem
->Add(++seq
, kTypeDeletion
, "keyX" + ToString(i
), "");
802 cf_ids
.push_back(cf_id
++);
805 std::vector
<autovector
<MemTable
*>> flush_candidates(num_cfs
);
808 for (auto i
= 0; i
!= num_cfs
; ++i
) {
809 auto* list
= lists
[i
];
810 ASSERT_FALSE(list
->IsFlushPending());
811 ASSERT_FALSE(list
->imm_flush_needed
.load(std::memory_order_acquire
));
812 list
->PickMemtablesToFlush(nullptr /* memtable_id */, &flush_candidates
[i
]);
813 ASSERT_EQ(0, flush_candidates
[i
].size());
815 // Request flush even though there is nothing to flush
816 for (auto i
= 0; i
!= num_cfs
; ++i
) {
817 auto* list
= lists
[i
];
818 list
->FlushRequested();
819 ASSERT_FALSE(list
->IsFlushPending());
820 ASSERT_FALSE(list
->imm_flush_needed
.load(std::memory_order_acquire
));
822 autovector
<MemTable
*> to_delete
;
823 // Add tables to the immutable memtalbe lists associated with column families
824 for (auto i
= 0; i
!= num_cfs
; ++i
) {
825 for (auto j
= 0; j
!= num_tables_per_cf
; ++j
) {
826 lists
[i
]->Add(tables
[i
][j
], &to_delete
);
828 ASSERT_EQ(num_tables_per_cf
, lists
[i
]->NumNotFlushed());
829 ASSERT_TRUE(lists
[i
]->IsFlushPending());
830 ASSERT_TRUE(lists
[i
]->imm_flush_needed
.load(std::memory_order_acquire
));
832 std::vector
<uint64_t> flush_memtable_ids
= {1, 1, 0};
839 // Pick memtables to flush
840 for (auto i
= 0; i
!= num_cfs
; ++i
) {
841 flush_candidates
[i
].clear();
842 lists
[i
]->PickMemtablesToFlush(&flush_memtable_ids
[i
],
843 &flush_candidates
[i
]);
844 ASSERT_EQ(flush_memtable_ids
[i
] - 0 + 1,
845 static_cast<uint64_t>(flush_candidates
[i
].size()));
847 autovector
<MemTableList
*> tmp_lists
;
848 autovector
<uint32_t> tmp_cf_ids
;
849 autovector
<const MutableCFOptions
*> tmp_options_list
;
850 autovector
<const autovector
<MemTable
*>*> to_flush
;
851 for (auto i
= 0; i
!= num_cfs
; ++i
) {
852 if (!flush_candidates
[i
].empty()) {
853 to_flush
.push_back(&flush_candidates
[i
]);
854 tmp_lists
.push_back(lists
[i
]);
855 tmp_cf_ids
.push_back(i
);
856 tmp_options_list
.push_back(mutable_cf_options_list
[i
]);
859 Status s
= Mock_InstallMemtableAtomicFlushResults(
860 tmp_lists
, tmp_cf_ids
, tmp_options_list
, to_flush
, &to_delete
);
863 for (auto i
= 0; i
!= num_cfs
; ++i
) {
864 for (auto j
= 0; j
!= num_tables_per_cf
; ++j
) {
865 if (static_cast<uint64_t>(j
) <= flush_memtable_ids
[i
]) {
866 ASSERT_LT(0, tables
[i
][j
]->GetFileNumber());
870 static_cast<size_t>(num_tables_per_cf
) - flush_candidates
[i
].size(),
871 lists
[i
]->NumNotFlushed());
875 for (auto list
: lists
) {
876 list
->current()->Unref(&to_delete
);
879 for (auto& mutable_cf_options
: mutable_cf_options_list
) {
880 if (mutable_cf_options
!= nullptr) {
881 delete mutable_cf_options
;
882 mutable_cf_options
= nullptr;
885 // All memtables in tables array must have been flushed, thus ready to be
887 ASSERT_EQ(to_delete
.size(), tables
.size() * tables
.front().size());
888 for (const auto& m
: to_delete
) {
889 // Refcount should be 0 after calling InstallMemtableFlushResults.
890 // Verify this by Ref'ing and then Unref'ing.
892 ASSERT_EQ(m
, m
->Unref());
897 } // namespace rocksdb
899 int main(int argc
, char** argv
) {
900 ::testing::InitGoogleTest(&argc
, argv
);
901 return RUN_ALL_TESTS();