1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "db/memtable_list.h"
10 #include "db/merge_context.h"
11 #include "db/version_set.h"
12 #include "db/write_controller.h"
13 #include "rocksdb/db.h"
14 #include "rocksdb/status.h"
15 #include "rocksdb/write_buffer_manager.h"
16 #include "test_util/testharness.h"
17 #include "test_util/testutil.h"
18 #include "util/string_util.h"
20 namespace ROCKSDB_NAMESPACE
{
22 class MemTableListTest
: public testing::Test
{
27 std::vector
<ColumnFamilyHandle
*> handles
;
28 std::atomic
<uint64_t> file_number
;
30 MemTableListTest() : db(nullptr), file_number(1) {
31 dbname
= test::PerThreadDBPath("memtable_list_test");
32 options
.create_if_missing
= true;
33 DestroyDB(dbname
, options
);
36 // Create a test db if not yet created
39 options
.create_if_missing
= true;
40 DestroyDB(dbname
, options
);
41 // Open DB only with default column family
42 ColumnFamilyOptions cf_options
;
43 std::vector
<ColumnFamilyDescriptor
> cf_descs
;
44 cf_descs
.emplace_back(kDefaultColumnFamilyName
, cf_options
);
45 Status s
= DB::Open(options
, dbname
, cf_descs
, &handles
, &db
);
48 ColumnFamilyOptions cf_opt1
, cf_opt2
;
49 cf_opt1
.cf_paths
.emplace_back(dbname
+ "_one_1",
50 std::numeric_limits
<uint64_t>::max());
51 cf_opt2
.cf_paths
.emplace_back(dbname
+ "_two_1",
52 std::numeric_limits
<uint64_t>::max());
53 int sz
= static_cast<int>(handles
.size());
54 handles
.resize(sz
+ 2);
55 s
= db
->CreateColumnFamily(cf_opt1
, "one", &handles
[1]);
57 s
= db
->CreateColumnFamily(cf_opt2
, "two", &handles
[2]);
60 cf_descs
.emplace_back("one", cf_options
);
61 cf_descs
.emplace_back("two", cf_options
);
65 ~MemTableListTest() override
{
67 std::vector
<ColumnFamilyDescriptor
> cf_descs(handles
.size());
68 for (int i
= 0; i
!= static_cast<int>(handles
.size()); ++i
) {
69 handles
[i
]->GetDescriptor(&cf_descs
[i
]);
71 for (auto h
: handles
) {
73 db
->DestroyColumnFamilyHandle(h
);
79 DestroyDB(dbname
, options
, cf_descs
);
83 // Calls MemTableList::TryInstallMemtableFlushResults() and sets up all
84 // structures needed to call this function.
85 Status
Mock_InstallMemtableFlushResults(
86 MemTableList
* list
, const MutableCFOptions
& mutable_cf_options
,
87 const autovector
<MemTable
*>& m
, autovector
<MemTable
*>* to_delete
) {
88 // Create a mock Logger
89 test::NullLogger logger
;
90 LogBuffer
log_buffer(DEBUG_LEVEL
, &logger
);
93 // Create a mock VersionSet
95 db_options
.file_system
= FileSystem::Default();
96 ImmutableDBOptions
immutable_db_options(db_options
);
97 EnvOptions env_options
;
98 std::shared_ptr
<Cache
> table_cache(NewLRUCache(50000, 16));
99 WriteBufferManager
write_buffer_manager(db_options
.db_write_buffer_size
);
100 WriteController
write_controller(10000000u);
102 VersionSet
versions(dbname
, &immutable_db_options
, env_options
,
103 table_cache
.get(), &write_buffer_manager
,
104 &write_controller
, /*block_cache_tracer=*/nullptr);
105 std::vector
<ColumnFamilyDescriptor
> cf_descs
;
106 cf_descs
.emplace_back(kDefaultColumnFamilyName
, ColumnFamilyOptions());
107 cf_descs
.emplace_back("one", ColumnFamilyOptions());
108 cf_descs
.emplace_back("two", ColumnFamilyOptions());
110 EXPECT_OK(versions
.Recover(cf_descs
, false));
112 // Create mock default ColumnFamilyData
113 auto column_family_set
= versions
.GetColumnFamilySet();
114 LogsWithPrepTracker dummy_prep_tracker
;
115 auto cfd
= column_family_set
->GetDefault();
116 EXPECT_TRUE(nullptr != cfd
);
117 uint64_t file_num
= file_number
.fetch_add(1);
118 // Create dummy mutex.
119 InstrumentedMutex mutex
;
120 InstrumentedMutexLock
l(&mutex
);
121 std::list
<std::unique_ptr
<FlushJobInfo
>> flush_jobs_info
;
122 Status s
= list
->TryInstallMemtableFlushResults(
123 cfd
, mutable_cf_options
, m
, &dummy_prep_tracker
, &versions
, &mutex
,
124 file_num
, to_delete
, nullptr, &log_buffer
, &flush_jobs_info
);
128 // Calls MemTableList::InstallMemtableFlushResults() and sets up all
129 // structures needed to call this function.
130 Status
Mock_InstallMemtableAtomicFlushResults(
131 autovector
<MemTableList
*>& lists
, const autovector
<uint32_t>& cf_ids
,
132 const autovector
<const MutableCFOptions
*>& mutable_cf_options_list
,
133 const autovector
<const autovector
<MemTable
*>*>& mems_list
,
134 autovector
<MemTable
*>* to_delete
) {
135 // Create a mock Logger
136 test::NullLogger logger
;
137 LogBuffer
log_buffer(DEBUG_LEVEL
, &logger
);
140 // Create a mock VersionSet
141 DBOptions db_options
;
142 db_options
.file_system
.reset(new LegacyFileSystemWrapper(db_options
.env
));
144 ImmutableDBOptions
immutable_db_options(db_options
);
145 EnvOptions env_options
;
146 std::shared_ptr
<Cache
> table_cache(NewLRUCache(50000, 16));
147 WriteBufferManager
write_buffer_manager(db_options
.db_write_buffer_size
);
148 WriteController
write_controller(10000000u);
150 VersionSet
versions(dbname
, &immutable_db_options
, env_options
,
151 table_cache
.get(), &write_buffer_manager
,
152 &write_controller
, /*block_cache_tracer=*/nullptr);
153 std::vector
<ColumnFamilyDescriptor
> cf_descs
;
154 cf_descs
.emplace_back(kDefaultColumnFamilyName
, ColumnFamilyOptions());
155 cf_descs
.emplace_back("one", ColumnFamilyOptions());
156 cf_descs
.emplace_back("two", ColumnFamilyOptions());
157 EXPECT_OK(versions
.Recover(cf_descs
, false));
159 // Create mock default ColumnFamilyData
161 auto column_family_set
= versions
.GetColumnFamilySet();
163 LogsWithPrepTracker dummy_prep_tracker
;
164 autovector
<ColumnFamilyData
*> cfds
;
165 for (int i
= 0; i
!= static_cast<int>(cf_ids
.size()); ++i
) {
166 cfds
.emplace_back(column_family_set
->GetColumnFamily(cf_ids
[i
]));
167 EXPECT_NE(nullptr, cfds
[i
]);
169 std::vector
<FileMetaData
> file_metas
;
170 file_metas
.reserve(cf_ids
.size());
171 for (size_t i
= 0; i
!= cf_ids
.size(); ++i
) {
173 uint64_t file_num
= file_number
.fetch_add(1);
174 meta
.fd
= FileDescriptor(file_num
, 0, 0);
175 file_metas
.emplace_back(meta
);
177 autovector
<FileMetaData
*> file_meta_ptrs
;
178 for (auto& meta
: file_metas
) {
179 file_meta_ptrs
.push_back(&meta
);
181 InstrumentedMutex mutex
;
182 InstrumentedMutexLock
l(&mutex
);
183 return InstallMemtableAtomicFlushResults(
184 &lists
, cfds
, mutable_cf_options_list
, mems_list
, &versions
, &mutex
,
185 file_meta_ptrs
, to_delete
, nullptr, &log_buffer
);
189 TEST_F(MemTableListTest
, Empty
) {
190 // Create an empty MemTableList and validate basic functions.
191 MemTableList
list(1, 0, 0);
193 ASSERT_EQ(0, list
.NumNotFlushed());
194 ASSERT_FALSE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
195 ASSERT_FALSE(list
.IsFlushPending());
197 autovector
<MemTable
*> mems
;
198 list
.PickMemtablesToFlush(nullptr /* memtable_id */, &mems
);
199 ASSERT_EQ(0, mems
.size());
201 autovector
<MemTable
*> to_delete
;
202 list
.current()->Unref(&to_delete
);
203 ASSERT_EQ(0, to_delete
.size());
206 TEST_F(MemTableListTest
, GetTest
) {
207 // Create MemTableList
208 int min_write_buffer_number_to_merge
= 2;
209 int max_write_buffer_number_to_maintain
= 0;
210 int64_t max_write_buffer_size_to_maintain
= 0;
211 MemTableList
list(min_write_buffer_number_to_merge
,
212 max_write_buffer_number_to_maintain
,
213 max_write_buffer_size_to_maintain
);
215 SequenceNumber seq
= 1;
218 MergeContext merge_context
;
219 InternalKeyComparator
ikey_cmp(options
.comparator
);
220 SequenceNumber max_covering_tombstone_seq
= 0;
221 autovector
<MemTable
*> to_delete
;
223 LookupKey
lkey("key1", seq
);
224 bool found
= list
.current()->Get(lkey
, &value
, &s
, &merge_context
,
225 &max_covering_tombstone_seq
, ReadOptions());
229 InternalKeyComparator
cmp(BytewiseComparator());
230 auto factory
= std::make_shared
<SkipListFactory
>();
231 options
.memtable_factory
= factory
;
232 ImmutableCFOptions
ioptions(options
);
234 WriteBufferManager
wb(options
.db_write_buffer_size
);
235 MemTable
* mem
= new MemTable(cmp
, ioptions
, MutableCFOptions(options
), &wb
,
236 kMaxSequenceNumber
, 0 /* column_family_id */);
239 // Write some keys to this memtable.
240 mem
->Add(++seq
, kTypeDeletion
, "key1", "");
241 mem
->Add(++seq
, kTypeValue
, "key2", "value2");
242 mem
->Add(++seq
, kTypeValue
, "key1", "value1");
243 mem
->Add(++seq
, kTypeValue
, "key2", "value2.2");
245 // Fetch the newly written keys
246 merge_context
.Clear();
247 found
= mem
->Get(LookupKey("key1", seq
), &value
, &s
, &merge_context
,
248 &max_covering_tombstone_seq
, ReadOptions());
249 ASSERT_TRUE(s
.ok() && found
);
250 ASSERT_EQ(value
, "value1");
252 merge_context
.Clear();
253 found
= mem
->Get(LookupKey("key1", 2), &value
, &s
, &merge_context
,
254 &max_covering_tombstone_seq
, ReadOptions());
255 // MemTable found out that this key is *not* found (at this sequence#)
256 ASSERT_TRUE(found
&& s
.IsNotFound());
258 merge_context
.Clear();
259 found
= mem
->Get(LookupKey("key2", seq
), &value
, &s
, &merge_context
,
260 &max_covering_tombstone_seq
, ReadOptions());
261 ASSERT_TRUE(s
.ok() && found
);
262 ASSERT_EQ(value
, "value2.2");
264 ASSERT_EQ(4, mem
->num_entries());
265 ASSERT_EQ(1, mem
->num_deletes());
267 // Add memtable to list
268 list
.Add(mem
, &to_delete
);
270 SequenceNumber saved_seq
= seq
;
272 // Create another memtable and write some keys to it
273 WriteBufferManager
wb2(options
.db_write_buffer_size
);
274 MemTable
* mem2
= new MemTable(cmp
, ioptions
, MutableCFOptions(options
), &wb2
,
275 kMaxSequenceNumber
, 0 /* column_family_id */);
278 mem2
->Add(++seq
, kTypeDeletion
, "key1", "");
279 mem2
->Add(++seq
, kTypeValue
, "key2", "value2.3");
281 // Add second memtable to list
282 list
.Add(mem2
, &to_delete
);
284 // Fetch keys via MemTableList
285 merge_context
.Clear();
287 list
.current()->Get(LookupKey("key1", seq
), &value
, &s
, &merge_context
,
288 &max_covering_tombstone_seq
, ReadOptions());
289 ASSERT_TRUE(found
&& s
.IsNotFound());
291 merge_context
.Clear();
292 found
= list
.current()->Get(LookupKey("key1", saved_seq
), &value
, &s
,
293 &merge_context
, &max_covering_tombstone_seq
,
295 ASSERT_TRUE(s
.ok() && found
);
296 ASSERT_EQ("value1", value
);
298 merge_context
.Clear();
300 list
.current()->Get(LookupKey("key2", seq
), &value
, &s
, &merge_context
,
301 &max_covering_tombstone_seq
, ReadOptions());
302 ASSERT_TRUE(s
.ok() && found
);
303 ASSERT_EQ(value
, "value2.3");
305 merge_context
.Clear();
306 found
= list
.current()->Get(LookupKey("key2", 1), &value
, &s
, &merge_context
,
307 &max_covering_tombstone_seq
, ReadOptions());
310 ASSERT_EQ(2, list
.NumNotFlushed());
312 list
.current()->Unref(&to_delete
);
313 for (MemTable
* m
: to_delete
) {
318 TEST_F(MemTableListTest
, GetFromHistoryTest
) {
319 // Create MemTableList
320 int min_write_buffer_number_to_merge
= 2;
321 int max_write_buffer_number_to_maintain
= 2;
322 int64_t max_write_buffer_size_to_maintain
= 2000;
323 MemTableList
list(min_write_buffer_number_to_merge
,
324 max_write_buffer_number_to_maintain
,
325 max_write_buffer_size_to_maintain
);
327 SequenceNumber seq
= 1;
330 MergeContext merge_context
;
331 InternalKeyComparator
ikey_cmp(options
.comparator
);
332 SequenceNumber max_covering_tombstone_seq
= 0;
333 autovector
<MemTable
*> to_delete
;
335 LookupKey
lkey("key1", seq
);
336 bool found
= list
.current()->Get(lkey
, &value
, &s
, &merge_context
,
337 &max_covering_tombstone_seq
, ReadOptions());
341 InternalKeyComparator
cmp(BytewiseComparator());
342 auto factory
= std::make_shared
<SkipListFactory
>();
343 options
.memtable_factory
= factory
;
344 ImmutableCFOptions
ioptions(options
);
346 WriteBufferManager
wb(options
.db_write_buffer_size
);
347 MemTable
* mem
= new MemTable(cmp
, ioptions
, MutableCFOptions(options
), &wb
,
348 kMaxSequenceNumber
, 0 /* column_family_id */);
351 // Write some keys to this memtable.
352 mem
->Add(++seq
, kTypeDeletion
, "key1", "");
353 mem
->Add(++seq
, kTypeValue
, "key2", "value2");
354 mem
->Add(++seq
, kTypeValue
, "key2", "value2.2");
356 // Fetch the newly written keys
357 merge_context
.Clear();
358 found
= mem
->Get(LookupKey("key1", seq
), &value
, &s
, &merge_context
,
359 &max_covering_tombstone_seq
, ReadOptions());
360 // MemTable found out that this key is *not* found (at this sequence#)
361 ASSERT_TRUE(found
&& s
.IsNotFound());
363 merge_context
.Clear();
364 found
= mem
->Get(LookupKey("key2", seq
), &value
, &s
, &merge_context
,
365 &max_covering_tombstone_seq
, ReadOptions());
366 ASSERT_TRUE(s
.ok() && found
);
367 ASSERT_EQ(value
, "value2.2");
369 // Add memtable to list
370 list
.Add(mem
, &to_delete
);
371 ASSERT_EQ(0, to_delete
.size());
373 // Fetch keys via MemTableList
374 merge_context
.Clear();
376 list
.current()->Get(LookupKey("key1", seq
), &value
, &s
, &merge_context
,
377 &max_covering_tombstone_seq
, ReadOptions());
378 ASSERT_TRUE(found
&& s
.IsNotFound());
380 merge_context
.Clear();
382 list
.current()->Get(LookupKey("key2", seq
), &value
, &s
, &merge_context
,
383 &max_covering_tombstone_seq
, ReadOptions());
384 ASSERT_TRUE(s
.ok() && found
);
385 ASSERT_EQ("value2.2", value
);
387 // Flush this memtable from the list.
388 // (It will then be a part of the memtable history).
389 autovector
<MemTable
*> to_flush
;
390 list
.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush
);
391 ASSERT_EQ(1, to_flush
.size());
393 MutableCFOptions
mutable_cf_options(options
);
394 s
= Mock_InstallMemtableFlushResults(&list
, mutable_cf_options
, to_flush
,
397 ASSERT_EQ(0, list
.NumNotFlushed());
398 ASSERT_EQ(1, list
.NumFlushed());
399 ASSERT_EQ(0, to_delete
.size());
401 // Verify keys are no longer in MemTableList
402 merge_context
.Clear();
404 list
.current()->Get(LookupKey("key1", seq
), &value
, &s
, &merge_context
,
405 &max_covering_tombstone_seq
, ReadOptions());
408 merge_context
.Clear();
410 list
.current()->Get(LookupKey("key2", seq
), &value
, &s
, &merge_context
,
411 &max_covering_tombstone_seq
, ReadOptions());
414 // Verify keys are present in history
415 merge_context
.Clear();
416 found
= list
.current()->GetFromHistory(
417 LookupKey("key1", seq
), &value
, &s
, &merge_context
,
418 &max_covering_tombstone_seq
, ReadOptions());
419 ASSERT_TRUE(found
&& s
.IsNotFound());
421 merge_context
.Clear();
422 found
= list
.current()->GetFromHistory(
423 LookupKey("key2", seq
), &value
, &s
, &merge_context
,
424 &max_covering_tombstone_seq
, ReadOptions());
426 ASSERT_EQ("value2.2", value
);
428 // Create another memtable and write some keys to it
429 WriteBufferManager
wb2(options
.db_write_buffer_size
);
430 MemTable
* mem2
= new MemTable(cmp
, ioptions
, MutableCFOptions(options
), &wb2
,
431 kMaxSequenceNumber
, 0 /* column_family_id */);
434 mem2
->Add(++seq
, kTypeDeletion
, "key1", "");
435 mem2
->Add(++seq
, kTypeValue
, "key3", "value3");
437 // Add second memtable to list
438 list
.Add(mem2
, &to_delete
);
439 ASSERT_EQ(0, to_delete
.size());
442 list
.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush
);
443 ASSERT_EQ(1, to_flush
.size());
445 // Flush second memtable
446 s
= Mock_InstallMemtableFlushResults(&list
, mutable_cf_options
, to_flush
,
449 ASSERT_EQ(0, list
.NumNotFlushed());
450 ASSERT_EQ(2, list
.NumFlushed());
451 ASSERT_EQ(0, to_delete
.size());
453 // Add a third memtable to push the first memtable out of the history
454 WriteBufferManager
wb3(options
.db_write_buffer_size
);
455 MemTable
* mem3
= new MemTable(cmp
, ioptions
, MutableCFOptions(options
), &wb3
,
456 kMaxSequenceNumber
, 0 /* column_family_id */);
458 list
.Add(mem3
, &to_delete
);
459 ASSERT_EQ(1, list
.NumNotFlushed());
460 ASSERT_EQ(1, list
.NumFlushed());
461 ASSERT_EQ(1, to_delete
.size());
463 // Verify keys are no longer in MemTableList
464 merge_context
.Clear();
466 list
.current()->Get(LookupKey("key1", seq
), &value
, &s
, &merge_context
,
467 &max_covering_tombstone_seq
, ReadOptions());
470 merge_context
.Clear();
472 list
.current()->Get(LookupKey("key2", seq
), &value
, &s
, &merge_context
,
473 &max_covering_tombstone_seq
, ReadOptions());
476 merge_context
.Clear();
478 list
.current()->Get(LookupKey("key3", seq
), &value
, &s
, &merge_context
,
479 &max_covering_tombstone_seq
, ReadOptions());
482 // Verify that the second memtable's keys are in the history
483 merge_context
.Clear();
484 found
= list
.current()->GetFromHistory(
485 LookupKey("key1", seq
), &value
, &s
, &merge_context
,
486 &max_covering_tombstone_seq
, ReadOptions());
487 ASSERT_TRUE(found
&& s
.IsNotFound());
489 merge_context
.Clear();
490 found
= list
.current()->GetFromHistory(
491 LookupKey("key3", seq
), &value
, &s
, &merge_context
,
492 &max_covering_tombstone_seq
, ReadOptions());
494 ASSERT_EQ("value3", value
);
496 // Verify that key2 from the first memtable is no longer in the history
497 merge_context
.Clear();
499 list
.current()->Get(LookupKey("key2", seq
), &value
, &s
, &merge_context
,
500 &max_covering_tombstone_seq
, ReadOptions());
504 list
.current()->Unref(&to_delete
);
505 ASSERT_EQ(3, to_delete
.size());
506 for (MemTable
* m
: to_delete
) {
511 TEST_F(MemTableListTest
, FlushPendingTest
) {
512 const int num_tables
= 6;
513 SequenceNumber seq
= 1;
516 auto factory
= std::make_shared
<SkipListFactory
>();
517 options
.memtable_factory
= factory
;
518 ImmutableCFOptions
ioptions(options
);
519 InternalKeyComparator
cmp(BytewiseComparator());
520 WriteBufferManager
wb(options
.db_write_buffer_size
);
521 autovector
<MemTable
*> to_delete
;
523 // Create MemTableList
524 int min_write_buffer_number_to_merge
= 3;
525 int max_write_buffer_number_to_maintain
= 7;
526 int64_t max_write_buffer_size_to_maintain
=
527 7 * static_cast<int>(options
.write_buffer_size
);
528 MemTableList
list(min_write_buffer_number_to_merge
,
529 max_write_buffer_number_to_maintain
,
530 max_write_buffer_size_to_maintain
);
532 // Create some MemTables
533 uint64_t memtable_id
= 0;
534 std::vector
<MemTable
*> tables
;
535 MutableCFOptions
mutable_cf_options(options
);
536 for (int i
= 0; i
< num_tables
; i
++) {
537 MemTable
* mem
= new MemTable(cmp
, ioptions
, mutable_cf_options
, &wb
,
538 kMaxSequenceNumber
, 0 /* column_family_id */);
539 mem
->SetID(memtable_id
++);
543 MergeContext merge_context
;
545 mem
->Add(++seq
, kTypeValue
, "key1", ToString(i
));
546 mem
->Add(++seq
, kTypeValue
, "keyN" + ToString(i
), "valueN");
547 mem
->Add(++seq
, kTypeValue
, "keyX" + ToString(i
), "value");
548 mem
->Add(++seq
, kTypeValue
, "keyM" + ToString(i
), "valueM");
549 mem
->Add(++seq
, kTypeDeletion
, "keyX" + ToString(i
), "");
551 tables
.push_back(mem
);
555 ASSERT_FALSE(list
.IsFlushPending());
556 ASSERT_FALSE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
557 autovector
<MemTable
*> to_flush
;
558 list
.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush
);
559 ASSERT_EQ(0, to_flush
.size());
561 // Request a flush even though there is nothing to flush
562 list
.FlushRequested();
563 ASSERT_FALSE(list
.IsFlushPending());
564 ASSERT_FALSE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
566 // Attempt to 'flush' to clear request for flush
567 list
.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush
);
568 ASSERT_EQ(0, to_flush
.size());
569 ASSERT_FALSE(list
.IsFlushPending());
570 ASSERT_FALSE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
572 // Request a flush again
573 list
.FlushRequested();
574 // No flush pending since the list is empty.
575 ASSERT_FALSE(list
.IsFlushPending());
576 ASSERT_FALSE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
579 list
.Add(tables
[0], &to_delete
);
580 list
.Add(tables
[1], &to_delete
);
581 ASSERT_EQ(2, list
.NumNotFlushed());
582 ASSERT_EQ(0, to_delete
.size());
584 // Even though we have less than the minimum to flush, a flush is
585 // pending since we had previously requested a flush and never called
586 // PickMemtablesToFlush() to clear the flush.
587 ASSERT_TRUE(list
.IsFlushPending());
588 ASSERT_TRUE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
590 // Pick tables to flush
591 list
.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush
);
592 ASSERT_EQ(2, to_flush
.size());
593 ASSERT_EQ(2, list
.NumNotFlushed());
594 ASSERT_FALSE(list
.IsFlushPending());
595 ASSERT_FALSE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
598 list
.RollbackMemtableFlush(to_flush
, 0);
599 ASSERT_FALSE(list
.IsFlushPending());
600 ASSERT_TRUE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
604 list
.Add(tables
[2], &to_delete
);
605 // We now have the minimum to flush regardles of whether FlushRequested()
607 ASSERT_TRUE(list
.IsFlushPending());
608 ASSERT_TRUE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
609 ASSERT_EQ(0, to_delete
.size());
611 // Pick tables to flush
612 list
.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush
);
613 ASSERT_EQ(3, to_flush
.size());
614 ASSERT_EQ(3, list
.NumNotFlushed());
615 ASSERT_FALSE(list
.IsFlushPending());
616 ASSERT_FALSE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
618 // Pick tables to flush again
619 autovector
<MemTable
*> to_flush2
;
620 list
.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2
);
621 ASSERT_EQ(0, to_flush2
.size());
622 ASSERT_EQ(3, list
.NumNotFlushed());
623 ASSERT_FALSE(list
.IsFlushPending());
624 ASSERT_FALSE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
627 list
.Add(tables
[3], &to_delete
);
628 ASSERT_FALSE(list
.IsFlushPending());
629 ASSERT_TRUE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
630 ASSERT_EQ(0, to_delete
.size());
632 // Request a flush again
633 list
.FlushRequested();
634 ASSERT_TRUE(list
.IsFlushPending());
635 ASSERT_TRUE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
637 // Pick tables to flush again
638 list
.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2
);
639 ASSERT_EQ(1, to_flush2
.size());
640 ASSERT_EQ(4, list
.NumNotFlushed());
641 ASSERT_FALSE(list
.IsFlushPending());
642 ASSERT_FALSE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
644 // Rollback first pick of tables
645 list
.RollbackMemtableFlush(to_flush
, 0);
646 ASSERT_TRUE(list
.IsFlushPending());
647 ASSERT_TRUE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
650 // Add another tables
651 list
.Add(tables
[4], &to_delete
);
652 ASSERT_EQ(5, list
.NumNotFlushed());
653 // We now have the minimum to flush regardles of whether FlushRequested()
654 ASSERT_TRUE(list
.IsFlushPending());
655 ASSERT_TRUE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
656 ASSERT_EQ(0, to_delete
.size());
658 // Pick tables to flush
659 list
.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush
);
660 // Should pick 4 of 5 since 1 table has been picked in to_flush2
661 ASSERT_EQ(4, to_flush
.size());
662 ASSERT_EQ(5, list
.NumNotFlushed());
663 ASSERT_FALSE(list
.IsFlushPending());
664 ASSERT_FALSE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
666 // Pick tables to flush again
667 autovector
<MemTable
*> to_flush3
;
668 list
.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush3
);
669 ASSERT_EQ(0, to_flush3
.size()); // nothing not in progress of being flushed
670 ASSERT_EQ(5, list
.NumNotFlushed());
671 ASSERT_FALSE(list
.IsFlushPending());
672 ASSERT_FALSE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
674 // Flush the 4 memtables that were picked in to_flush
675 s
= Mock_InstallMemtableFlushResults(&list
, mutable_cf_options
, to_flush
,
679 // Note: now to_flush contains tables[0,1,2,4]. to_flush2 contains
681 // Current implementation will only commit memtables in the order they were
682 // created. So TryInstallMemtableFlushResults will install the first 3 tables
683 // in to_flush and stop when it encounters a table not yet flushed.
684 ASSERT_EQ(2, list
.NumNotFlushed());
686 std::min(3, static_cast<int>(max_write_buffer_size_to_maintain
) /
687 static_cast<int>(options
.write_buffer_size
));
688 ASSERT_EQ(num_in_history
, list
.NumFlushed());
689 ASSERT_EQ(5 - list
.NumNotFlushed() - num_in_history
, to_delete
.size());
691 // Request a flush again. Should be nothing to flush
692 list
.FlushRequested();
693 ASSERT_FALSE(list
.IsFlushPending());
694 ASSERT_FALSE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
696 // Flush the 1 memtable that was picked in to_flush2
697 s
= MemTableListTest::Mock_InstallMemtableFlushResults(
698 &list
, mutable_cf_options
, to_flush2
, &to_delete
);
701 // This will actually install 2 tables. The 1 we told it to flush, and also
702 // tables[4] which has been waiting for tables[3] to commit.
703 ASSERT_EQ(0, list
.NumNotFlushed());
705 std::min(5, static_cast<int>(max_write_buffer_size_to_maintain
) /
706 static_cast<int>(options
.write_buffer_size
));
707 ASSERT_EQ(num_in_history
, list
.NumFlushed());
708 ASSERT_EQ(5 - list
.NumNotFlushed() - num_in_history
, to_delete
.size());
710 for (const auto& m
: to_delete
) {
711 // Refcount should be 0 after calling TryInstallMemtableFlushResults.
712 // Verify this, by Ref'ing then UnRef'ing:
714 ASSERT_EQ(m
, m
->Unref());
720 list
.Add(tables
[5], &to_delete
);
721 ASSERT_EQ(1, list
.NumNotFlushed());
722 ASSERT_EQ(5, list
.GetLatestMemTableID());
724 // Pick tables to flush. The tables to pick must have ID smaller than or
725 // equal to 4. Therefore, no table will be selected in this case.
726 autovector
<MemTable
*> to_flush4
;
727 list
.FlushRequested();
728 ASSERT_TRUE(list
.HasFlushRequested());
729 list
.PickMemtablesToFlush(&memtable_id
, &to_flush4
);
730 ASSERT_TRUE(to_flush4
.empty());
731 ASSERT_EQ(1, list
.NumNotFlushed());
732 ASSERT_TRUE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
733 ASSERT_FALSE(list
.IsFlushPending());
734 ASSERT_FALSE(list
.HasFlushRequested());
736 // Pick tables to flush. The tables to pick must have ID smaller than or
737 // equal to 5. Therefore, only tables[5] will be selected.
739 list
.FlushRequested();
740 list
.PickMemtablesToFlush(&memtable_id
, &to_flush4
);
741 ASSERT_EQ(1, static_cast<int>(to_flush4
.size()));
742 ASSERT_EQ(1, list
.NumNotFlushed());
743 ASSERT_FALSE(list
.imm_flush_needed
.load(std::memory_order_acquire
));
744 ASSERT_FALSE(list
.IsFlushPending());
747 list
.current()->Unref(&to_delete
);
749 std::min(num_tables
, static_cast<int>(max_write_buffer_size_to_maintain
) /
750 static_cast<int>(options
.write_buffer_size
));
751 ASSERT_EQ(to_delete_size
, to_delete
.size());
753 for (const auto& m
: to_delete
) {
754 // Refcount should be 0 after calling TryInstallMemtableFlushResults.
755 // Verify this, by Ref'ing then UnRef'ing:
757 ASSERT_EQ(m
, m
->Unref());
763 TEST_F(MemTableListTest
, EmptyAtomicFlusTest
) {
764 autovector
<MemTableList
*> lists
;
765 autovector
<uint32_t> cf_ids
;
766 autovector
<const MutableCFOptions
*> options_list
;
767 autovector
<const autovector
<MemTable
*>*> to_flush
;
768 autovector
<MemTable
*> to_delete
;
769 Status s
= Mock_InstallMemtableAtomicFlushResults(lists
, cf_ids
, options_list
,
770 to_flush
, &to_delete
);
772 ASSERT_TRUE(to_delete
.empty());
775 TEST_F(MemTableListTest
, AtomicFlusTest
) {
776 const int num_cfs
= 3;
777 const int num_tables_per_cf
= 2;
778 SequenceNumber seq
= 1;
780 auto factory
= std::make_shared
<SkipListFactory
>();
781 options
.memtable_factory
= factory
;
782 ImmutableCFOptions
ioptions(options
);
783 InternalKeyComparator
cmp(BytewiseComparator());
784 WriteBufferManager
wb(options
.db_write_buffer_size
);
786 // Create MemTableLists
787 int min_write_buffer_number_to_merge
= 3;
788 int max_write_buffer_number_to_maintain
= 7;
789 int64_t max_write_buffer_size_to_maintain
=
790 7 * static_cast<int64_t>(options
.write_buffer_size
);
791 autovector
<MemTableList
*> lists
;
792 for (int i
= 0; i
!= num_cfs
; ++i
) {
793 lists
.emplace_back(new MemTableList(min_write_buffer_number_to_merge
,
794 max_write_buffer_number_to_maintain
,
795 max_write_buffer_size_to_maintain
));
798 autovector
<uint32_t> cf_ids
;
799 std::vector
<std::vector
<MemTable
*>> tables(num_cfs
);
800 autovector
<const MutableCFOptions
*> mutable_cf_options_list
;
802 for (auto& elem
: tables
) {
803 mutable_cf_options_list
.emplace_back(new MutableCFOptions(options
));
804 uint64_t memtable_id
= 0;
805 for (int i
= 0; i
!= num_tables_per_cf
; ++i
) {
807 new MemTable(cmp
, ioptions
, *(mutable_cf_options_list
.back()), &wb
,
808 kMaxSequenceNumber
, cf_id
);
809 mem
->SetID(memtable_id
++);
814 mem
->Add(++seq
, kTypeValue
, "key1", ToString(i
));
815 mem
->Add(++seq
, kTypeValue
, "keyN" + ToString(i
), "valueN");
816 mem
->Add(++seq
, kTypeValue
, "keyX" + ToString(i
), "value");
817 mem
->Add(++seq
, kTypeValue
, "keyM" + ToString(i
), "valueM");
818 mem
->Add(++seq
, kTypeDeletion
, "keyX" + ToString(i
), "");
822 cf_ids
.push_back(cf_id
++);
825 std::vector
<autovector
<MemTable
*>> flush_candidates(num_cfs
);
828 for (auto i
= 0; i
!= num_cfs
; ++i
) {
829 auto* list
= lists
[i
];
830 ASSERT_FALSE(list
->IsFlushPending());
831 ASSERT_FALSE(list
->imm_flush_needed
.load(std::memory_order_acquire
));
832 list
->PickMemtablesToFlush(nullptr /* memtable_id */, &flush_candidates
[i
]);
833 ASSERT_EQ(0, flush_candidates
[i
].size());
835 // Request flush even though there is nothing to flush
836 for (auto i
= 0; i
!= num_cfs
; ++i
) {
837 auto* list
= lists
[i
];
838 list
->FlushRequested();
839 ASSERT_FALSE(list
->IsFlushPending());
840 ASSERT_FALSE(list
->imm_flush_needed
.load(std::memory_order_acquire
));
842 autovector
<MemTable
*> to_delete
;
843 // Add tables to the immutable memtalbe lists associated with column families
844 for (auto i
= 0; i
!= num_cfs
; ++i
) {
845 for (auto j
= 0; j
!= num_tables_per_cf
; ++j
) {
846 lists
[i
]->Add(tables
[i
][j
], &to_delete
);
848 ASSERT_EQ(num_tables_per_cf
, lists
[i
]->NumNotFlushed());
849 ASSERT_TRUE(lists
[i
]->IsFlushPending());
850 ASSERT_TRUE(lists
[i
]->imm_flush_needed
.load(std::memory_order_acquire
));
852 std::vector
<uint64_t> flush_memtable_ids
= {1, 1, 0};
859 // Pick memtables to flush
860 for (auto i
= 0; i
!= num_cfs
; ++i
) {
861 flush_candidates
[i
].clear();
862 lists
[i
]->PickMemtablesToFlush(&flush_memtable_ids
[i
],
863 &flush_candidates
[i
]);
864 ASSERT_EQ(flush_memtable_ids
[i
] - 0 + 1,
865 static_cast<uint64_t>(flush_candidates
[i
].size()));
867 autovector
<MemTableList
*> tmp_lists
;
868 autovector
<uint32_t> tmp_cf_ids
;
869 autovector
<const MutableCFOptions
*> tmp_options_list
;
870 autovector
<const autovector
<MemTable
*>*> to_flush
;
871 for (auto i
= 0; i
!= num_cfs
; ++i
) {
872 if (!flush_candidates
[i
].empty()) {
873 to_flush
.push_back(&flush_candidates
[i
]);
874 tmp_lists
.push_back(lists
[i
]);
875 tmp_cf_ids
.push_back(i
);
876 tmp_options_list
.push_back(mutable_cf_options_list
[i
]);
879 Status s
= Mock_InstallMemtableAtomicFlushResults(
880 tmp_lists
, tmp_cf_ids
, tmp_options_list
, to_flush
, &to_delete
);
883 for (auto i
= 0; i
!= num_cfs
; ++i
) {
884 for (auto j
= 0; j
!= num_tables_per_cf
; ++j
) {
885 if (static_cast<uint64_t>(j
) <= flush_memtable_ids
[i
]) {
886 ASSERT_LT(0, tables
[i
][j
]->GetFileNumber());
890 static_cast<size_t>(num_tables_per_cf
) - flush_candidates
[i
].size(),
891 lists
[i
]->NumNotFlushed());
895 for (auto list
: lists
) {
896 list
->current()->Unref(&to_delete
);
899 for (auto& mutable_cf_options
: mutable_cf_options_list
) {
900 if (mutable_cf_options
!= nullptr) {
901 delete mutable_cf_options
;
902 mutable_cf_options
= nullptr;
905 // All memtables in tables array must have been flushed, thus ready to be
907 ASSERT_EQ(to_delete
.size(), tables
.size() * tables
.front().size());
908 for (const auto& m
: to_delete
) {
909 // Refcount should be 0 after calling InstallMemtableFlushResults.
910 // Verify this by Ref'ing and then Unref'ing.
912 ASSERT_EQ(m
, m
->Unref());
917 } // namespace ROCKSDB_NAMESPACE
919 int main(int argc
, char** argv
) {
920 ::testing::InitGoogleTest(&argc
, argv
);
921 return RUN_ALL_TESTS();