// (found in the LICENSE.Apache file in the root directory).
#include "db/memtable_list.h"
+
#include <algorithm>
#include <string>
#include <vector>
+
#include "db/merge_context.h"
#include "db/version_set.h"
#include "db/write_controller.h"
MemTableListTest() : db(nullptr), file_number(1) {
dbname = test::PerThreadDBPath("memtable_list_test");
options.create_if_missing = true;
- DestroyDB(dbname, options);
+ EXPECT_OK(DestroyDB(dbname, options));
}
// Create a test db if not yet created
void CreateDB() {
if (db == nullptr) {
options.create_if_missing = true;
- DestroyDB(dbname, options);
+ EXPECT_OK(DestroyDB(dbname, options));
// Open DB only with default column family
ColumnFamilyOptions cf_options;
std::vector<ColumnFamilyDescriptor> cf_descs;
handles.clear();
delete db;
db = nullptr;
- DestroyDB(dbname, options, cf_descs);
+ EXPECT_OK(DestroyDB(dbname, options, cf_descs));
}
}
VersionSet versions(dbname, &immutable_db_options, env_options,
table_cache.get(), &write_buffer_manager,
&write_controller, /*block_cache_tracer=*/nullptr,
- /*io_tracer=*/nullptr);
+ /*io_tracer=*/nullptr, /*db_id*/ "",
+ /*db_session_id*/ "");
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
cf_descs.emplace_back("one", ColumnFamilyOptions());
std::list<std::unique_ptr<FlushJobInfo>> flush_jobs_info;
Status s = list->TryInstallMemtableFlushResults(
cfd, mutable_cf_options, m, &dummy_prep_tracker, &versions, &mutex,
- file_num, to_delete, nullptr, &log_buffer, &flush_jobs_info, &io_s);
+ file_num, to_delete, nullptr, &log_buffer, &flush_jobs_info);
EXPECT_OK(io_s);
return s;
}
VersionSet versions(dbname, &immutable_db_options, env_options,
table_cache.get(), &write_buffer_manager,
&write_controller, /*block_cache_tracer=*/nullptr,
- /*io_tracer=*/nullptr);
+ /*io_tracer=*/nullptr, /*db_id*/ "",
+ /*db_session_id*/ "");
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
cf_descs.emplace_back("one", ColumnFamilyOptions());
for (auto& meta : file_metas) {
file_meta_ptrs.push_back(&meta);
}
+ std::vector<std::list<std::unique_ptr<FlushJobInfo>>>
+ committed_flush_jobs_info_storage(cf_ids.size());
+ autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
+ committed_flush_jobs_info;
+ for (int i = 0; i < static_cast<int>(cf_ids.size()); ++i) {
+ committed_flush_jobs_info.push_back(
+ &committed_flush_jobs_info_storage[i]);
+ }
+
InstrumentedMutex mutex;
InstrumentedMutexLock l(&mutex);
return InstallMemtableAtomicFlushResults(
- &lists, cfds, mutable_cf_options_list, mems_list, &versions, &mutex,
- file_meta_ptrs, to_delete, nullptr, &log_buffer);
+ &lists, cfds, mutable_cf_options_list, mems_list, &versions,
+ nullptr /* prep_tracker */, &mutex, file_meta_ptrs,
+ committed_flush_jobs_info, to_delete, nullptr, &log_buffer);
}
};
ASSERT_FALSE(list.IsFlushPending());
autovector<MemTable*> mems;
- list.PickMemtablesToFlush(nullptr /* memtable_id */, &mems);
+ list.PickMemtablesToFlush(
+ std::numeric_limits<uint64_t>::max() /* memtable_id */, &mems);
ASSERT_EQ(0, mems.size());
autovector<MemTable*> to_delete;
autovector<MemTable*> to_delete;
LookupKey lkey("key1", seq);
- bool found = list.current()->Get(
- lkey, &value, /*timestamp*/nullptr, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
+ bool found = list.current()->Get(lkey, &value, /*columns=*/nullptr,
+ /*timestamp=*/nullptr, &s, &merge_context,
+ &max_covering_tombstone_seq, ReadOptions());
ASSERT_FALSE(found);
// Create a MemTable
InternalKeyComparator cmp(BytewiseComparator());
auto factory = std::make_shared<SkipListFactory>();
options.memtable_factory = factory;
- ImmutableCFOptions ioptions(options);
+ ImmutableOptions ioptions(options);
WriteBufferManager wb(options.db_write_buffer_size);
MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
mem->Ref();
// Write some keys to this memtable.
- mem->Add(++seq, kTypeDeletion, "key1", "");
- mem->Add(++seq, kTypeValue, "key2", "value2");
- mem->Add(++seq, kTypeValue, "key1", "value1");
- mem->Add(++seq, kTypeValue, "key2", "value2.2");
+ ASSERT_OK(
+ mem->Add(++seq, kTypeDeletion, "key1", "", nullptr /* kv_prot_info */));
+ ASSERT_OK(mem->Add(++seq, kTypeValue, "key2", "value2",
+ nullptr /* kv_prot_info */));
+ ASSERT_OK(mem->Add(++seq, kTypeValue, "key1", "value1",
+ nullptr /* kv_prot_info */));
+ ASSERT_OK(mem->Add(++seq, kTypeValue, "key2", "value2.2",
+ nullptr /* kv_prot_info */));
// Fetch the newly written keys
merge_context.Clear();
- found = mem->Get(LookupKey("key1", seq), &value,
- /*timestamp*/nullptr, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
+ found = mem->Get(LookupKey("key1", seq), &value, /*columns*/ nullptr,
+ /*timestamp*/ nullptr, &s, &merge_context,
+ &max_covering_tombstone_seq, ReadOptions(),
+ false /* immutable_memtable */);
ASSERT_TRUE(s.ok() && found);
ASSERT_EQ(value, "value1");
merge_context.Clear();
- found = mem->Get(LookupKey("key1", 2), &value,
- /*timestamp*/nullptr, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
+ found = mem->Get(LookupKey("key1", 2), &value, /*columns*/ nullptr,
+ /*timestamp*/ nullptr, &s, &merge_context,
+ &max_covering_tombstone_seq, ReadOptions(),
+ false /* immutable_memtable */);
// MemTable found out that this key is *not* found (at this sequence#)
ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear();
- found = mem->Get(LookupKey("key2", seq), &value,
- /*timestamp*/nullptr, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
+ found = mem->Get(LookupKey("key2", seq), &value, /*columns*/ nullptr,
+ /*timestamp*/ nullptr, &s, &merge_context,
+ &max_covering_tombstone_seq, ReadOptions(),
+ false /* immutable_memtable */);
ASSERT_TRUE(s.ok() && found);
ASSERT_EQ(value, "value2.2");
ASSERT_EQ(1, mem->num_deletes());
// Add memtable to list
+ // This is to make assert(memtable->IsFragmentedRangeTombstonesConstructed())
+ // in MemTableListVersion::GetFromList work.
+ mem->ConstructFragmentedRangeTombstones();
list.Add(mem, &to_delete);
SequenceNumber saved_seq = seq;
kMaxSequenceNumber, 0 /* column_family_id */);
mem2->Ref();
- mem2->Add(++seq, kTypeDeletion, "key1", "");
- mem2->Add(++seq, kTypeValue, "key2", "value2.3");
+ ASSERT_OK(
+ mem2->Add(++seq, kTypeDeletion, "key1", "", nullptr /* kv_prot_info */));
+ ASSERT_OK(mem2->Add(++seq, kTypeValue, "key2", "value2.3",
+ nullptr /* kv_prot_info */));
// Add second memtable to list
+ // This is to make assert(memtable->IsFragmentedRangeTombstonesConstructed())
+ // in MemTableListVersion::GetFromList work.
+ mem2->ConstructFragmentedRangeTombstones();
list.Add(mem2, &to_delete);
// Fetch keys via MemTableList
merge_context.Clear();
- found = list.current()->Get(
- LookupKey("key1", seq), &value, /*timestamp*/nullptr, &s,
- &merge_context, &max_covering_tombstone_seq, ReadOptions());
+ found =
+ list.current()->Get(LookupKey("key1", seq), &value, /*columns=*/nullptr,
+ /*timestamp=*/nullptr, &s, &merge_context,
+ &max_covering_tombstone_seq, ReadOptions());
ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear();
- found = list.current()->Get(
- LookupKey("key1", saved_seq), &value, /*timestamp*/nullptr,
- &s, &merge_context, &max_covering_tombstone_seq, ReadOptions());
+ found = list.current()->Get(LookupKey("key1", saved_seq), &value,
+ /*columns=*/nullptr, /*timestamp=*/nullptr, &s,
+ &merge_context, &max_covering_tombstone_seq,
+ ReadOptions());
ASSERT_TRUE(s.ok() && found);
ASSERT_EQ("value1", value);
merge_context.Clear();
- found = list.current()->Get(
- LookupKey("key2", seq), &value, /*timestamp*/nullptr, &s,
- &merge_context, &max_covering_tombstone_seq, ReadOptions());
+ found =
+ list.current()->Get(LookupKey("key2", seq), &value, /*columns=*/nullptr,
+ /*timestamp=*/nullptr, &s, &merge_context,
+ &max_covering_tombstone_seq, ReadOptions());
ASSERT_TRUE(s.ok() && found);
ASSERT_EQ(value, "value2.3");
merge_context.Clear();
- found = list.current()->Get(
- LookupKey("key2", 1), &value, /*timestamp*/nullptr, &s,
- &merge_context, &max_covering_tombstone_seq, ReadOptions());
+ found = list.current()->Get(LookupKey("key2", 1), &value, /*columns=*/nullptr,
+ /*timestamp=*/nullptr, &s, &merge_context,
+ &max_covering_tombstone_seq, ReadOptions());
ASSERT_FALSE(found);
ASSERT_EQ(2, list.NumNotFlushed());
// Create MemTableList
int min_write_buffer_number_to_merge = 2;
int max_write_buffer_number_to_maintain = 2;
- int64_t max_write_buffer_size_to_maintain = 2000;
+ int64_t max_write_buffer_size_to_maintain = 2 * Arena::kInlineSize;
MemTableList list(min_write_buffer_number_to_merge,
max_write_buffer_number_to_maintain,
max_write_buffer_size_to_maintain);
autovector<MemTable*> to_delete;
LookupKey lkey("key1", seq);
- bool found = list.current()->Get(
- lkey, &value, /*timestamp*/nullptr, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
+ bool found = list.current()->Get(lkey, &value, /*columns=*/nullptr,
+ /*timestamp=*/nullptr, &s, &merge_context,
+ &max_covering_tombstone_seq, ReadOptions());
ASSERT_FALSE(found);
// Create a MemTable
InternalKeyComparator cmp(BytewiseComparator());
auto factory = std::make_shared<SkipListFactory>();
options.memtable_factory = factory;
- ImmutableCFOptions ioptions(options);
+ ImmutableOptions ioptions(options);
WriteBufferManager wb(options.db_write_buffer_size);
MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
mem->Ref();
// Write some keys to this memtable.
- mem->Add(++seq, kTypeDeletion, "key1", "");
- mem->Add(++seq, kTypeValue, "key2", "value2");
- mem->Add(++seq, kTypeValue, "key2", "value2.2");
+ ASSERT_OK(
+ mem->Add(++seq, kTypeDeletion, "key1", "", nullptr /* kv_prot_info */));
+ ASSERT_OK(mem->Add(++seq, kTypeValue, "key2", "value2",
+ nullptr /* kv_prot_info */));
+ ASSERT_OK(mem->Add(++seq, kTypeValue, "key2", "value2.2",
+ nullptr /* kv_prot_info */));
// Fetch the newly written keys
merge_context.Clear();
- found = mem->Get(LookupKey("key1", seq), &value,
- /*timestamp*/nullptr, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
+ found = mem->Get(LookupKey("key1", seq), &value, /*columns*/ nullptr,
+ /*timestamp*/ nullptr, &s, &merge_context,
+ &max_covering_tombstone_seq, ReadOptions(),
+ false /* immutable_memtable */);
// MemTable found out that this key is *not* found (at this sequence#)
ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear();
- found = mem->Get(LookupKey("key2", seq), &value,
- /*timestamp*/nullptr, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
+ found = mem->Get(LookupKey("key2", seq), &value, /*columns*/ nullptr,
+ /*timestamp*/ nullptr, &s, &merge_context,
+ &max_covering_tombstone_seq, ReadOptions(),
+ false /* immutable_memtable */);
ASSERT_TRUE(s.ok() && found);
ASSERT_EQ(value, "value2.2");
// Add memtable to list
+ // This is to make assert(memtable->IsFragmentedRangeTombstonesConstructed())
+ // in MemTableListVersion::GetFromList work.
+ mem->ConstructFragmentedRangeTombstones();
list.Add(mem, &to_delete);
ASSERT_EQ(0, to_delete.size());
// Fetch keys via MemTableList
merge_context.Clear();
- found = list.current()->Get(LookupKey("key1", seq), &value,
- /*timestamp*/nullptr, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
+ found =
+ list.current()->Get(LookupKey("key1", seq), &value, /*columns=*/nullptr,
+ /*timestamp=*/nullptr, &s, &merge_context,
+ &max_covering_tombstone_seq, ReadOptions());
ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear();
- found = list.current()->Get(LookupKey("key2", seq), &value,
- /*timestamp*/nullptr, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
+ found =
+ list.current()->Get(LookupKey("key2", seq), &value, /*columns=*/nullptr,
+ /*timestamp=*/nullptr, &s, &merge_context,
+ &max_covering_tombstone_seq, ReadOptions());
ASSERT_TRUE(s.ok() && found);
ASSERT_EQ("value2.2", value);
// Flush this memtable from the list.
// (It will then be a part of the memtable history).
autovector<MemTable*> to_flush;
- list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
+ list.PickMemtablesToFlush(
+ std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush);
ASSERT_EQ(1, to_flush.size());
MutableCFOptions mutable_cf_options(options);
// Verify keys are no longer in MemTableList
merge_context.Clear();
- found = list.current()->Get(LookupKey("key1", seq), &value,
- /*timestamp*/nullptr, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
+ found =
+ list.current()->Get(LookupKey("key1", seq), &value, /*columns=*/nullptr,
+ /*timestamp=*/nullptr, &s, &merge_context,
+ &max_covering_tombstone_seq, ReadOptions());
ASSERT_FALSE(found);
merge_context.Clear();
- found = list.current()->Get(LookupKey("key2", seq), &value,
- /*timestamp*/nullptr, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
+ found =
+ list.current()->Get(LookupKey("key2", seq), &value, /*columns=*/nullptr,
+ /*timestamp=*/nullptr, &s, &merge_context,
+ &max_covering_tombstone_seq, ReadOptions());
ASSERT_FALSE(found);
// Verify keys are present in history
merge_context.Clear();
found = list.current()->GetFromHistory(
- LookupKey("key1", seq), &value, /*timestamp*/nullptr, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
+ LookupKey("key1", seq), &value, /*columns=*/nullptr,
+ /*timestamp=*/nullptr, &s, &merge_context, &max_covering_tombstone_seq,
+ ReadOptions());
ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear();
found = list.current()->GetFromHistory(
- LookupKey("key2", seq), &value, /*timestamp*/nullptr, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
+ LookupKey("key2", seq), &value, /*columns=*/nullptr,
+ /*timestamp=*/nullptr, &s, &merge_context, &max_covering_tombstone_seq,
+ ReadOptions());
ASSERT_TRUE(found);
ASSERT_EQ("value2.2", value);
kMaxSequenceNumber, 0 /* column_family_id */);
mem2->Ref();
- mem2->Add(++seq, kTypeDeletion, "key1", "");
- mem2->Add(++seq, kTypeValue, "key3", "value3");
+ ASSERT_OK(
+ mem2->Add(++seq, kTypeDeletion, "key1", "", nullptr /* kv_prot_info */));
+ ASSERT_OK(mem2->Add(++seq, kTypeValue, "key3", "value3",
+ nullptr /* kv_prot_info */));
// Add second memtable to list
+ // This is to make assert(memtable->IsFragmentedRangeTombstonesConstructed())
+ // in MemTableListVersion::GetFromList work.
+ mem2->ConstructFragmentedRangeTombstones();
list.Add(mem2, &to_delete);
ASSERT_EQ(0, to_delete.size());
to_flush.clear();
- list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
+ list.PickMemtablesToFlush(
+ std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush);
ASSERT_EQ(1, to_flush.size());
// Flush second memtable
MemTable* mem3 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb3,
kMaxSequenceNumber, 0 /* column_family_id */);
mem3->Ref();
+ // This is to make assert(memtable->IsFragmentedRangeTombstonesConstructed())
+ // in MemTableListVersion::GetFromList work.
+ mem3->ConstructFragmentedRangeTombstones();
list.Add(mem3, &to_delete);
ASSERT_EQ(1, list.NumNotFlushed());
ASSERT_EQ(1, list.NumFlushed());
// Verify keys are no longer in MemTableList
merge_context.Clear();
- found = list.current()->Get(LookupKey("key1", seq), &value,
- /*timestamp*/nullptr, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
+ found =
+ list.current()->Get(LookupKey("key1", seq), &value, /*columns=*/nullptr,
+ /*timestamp=*/nullptr, &s, &merge_context,
+ &max_covering_tombstone_seq, ReadOptions());
ASSERT_FALSE(found);
merge_context.Clear();
- found = list.current()->Get(LookupKey("key2", seq), &value,
- /*timestamp*/nullptr, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
+ found =
+ list.current()->Get(LookupKey("key2", seq), &value, /*columns=*/nullptr,
+ /*timestamp=*/nullptr, &s, &merge_context,
+ &max_covering_tombstone_seq, ReadOptions());
ASSERT_FALSE(found);
merge_context.Clear();
- found = list.current()->Get(LookupKey("key3", seq), &value,
- /*timestamp*/nullptr, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
+ found =
+ list.current()->Get(LookupKey("key3", seq), &value, /*columns=*/nullptr,
+ /*timestamp=*/nullptr, &s, &merge_context,
+ &max_covering_tombstone_seq, ReadOptions());
ASSERT_FALSE(found);
// Verify that the second memtable's keys are in the history
merge_context.Clear();
found = list.current()->GetFromHistory(
- LookupKey("key1", seq), &value, /*timestamp*/nullptr, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
+ LookupKey("key1", seq), &value, /*columns=*/nullptr,
+ /*timestamp=*/nullptr, &s, &merge_context, &max_covering_tombstone_seq,
+ ReadOptions());
ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear();
found = list.current()->GetFromHistory(
- LookupKey("key3", seq), &value, /*timestamp*/nullptr, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
+ LookupKey("key3", seq), &value, /*columns=*/nullptr,
+ /*timestamp=*/nullptr, &s, &merge_context, &max_covering_tombstone_seq,
+ ReadOptions());
ASSERT_TRUE(found);
ASSERT_EQ("value3", value);
// Verify that key2 from the first memtable is no longer in the history
merge_context.Clear();
- found = list.current()->Get(LookupKey("key2", seq), &value,
- /*timestamp*/nullptr, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
+ found =
+ list.current()->Get(LookupKey("key2", seq), &value, /*columns=*/nullptr,
+ /*timestamp=*/nullptr, &s, &merge_context,
+ &max_covering_tombstone_seq, ReadOptions());
ASSERT_FALSE(found);
// Cleanup
auto factory = std::make_shared<SkipListFactory>();
options.memtable_factory = factory;
- ImmutableCFOptions ioptions(options);
+ ImmutableOptions ioptions(options);
InternalKeyComparator cmp(BytewiseComparator());
WriteBufferManager wb(options.db_write_buffer_size);
autovector<MemTable*> to_delete;
std::string value;
MergeContext merge_context;
- mem->Add(++seq, kTypeValue, "key1", ToString(i));
- mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN");
- mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value");
- mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM");
- mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), "");
+ ASSERT_OK(mem->Add(++seq, kTypeValue, "key1", std::to_string(i),
+ nullptr /* kv_prot_info */));
+ ASSERT_OK(mem->Add(++seq, kTypeValue, "keyN" + std::to_string(i), "valueN",
+ nullptr /* kv_prot_info */));
+ ASSERT_OK(mem->Add(++seq, kTypeValue, "keyX" + std::to_string(i), "value",
+ nullptr /* kv_prot_info */));
+ ASSERT_OK(mem->Add(++seq, kTypeValue, "keyM" + std::to_string(i), "valueM",
+ nullptr /* kv_prot_info */));
+ ASSERT_OK(mem->Add(++seq, kTypeDeletion, "keyX" + std::to_string(i), "",
+ nullptr /* kv_prot_info */));
tables.push_back(mem);
}
ASSERT_FALSE(list.IsFlushPending());
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
autovector<MemTable*> to_flush;
- list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
+ list.PickMemtablesToFlush(
+ std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush);
ASSERT_EQ(0, to_flush.size());
// Request a flush even though there is nothing to flush
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
// Attempt to 'flush' to clear request for flush
- list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
+ list.PickMemtablesToFlush(
+ std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush);
ASSERT_EQ(0, to_flush.size());
ASSERT_FALSE(list.IsFlushPending());
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
// Pick tables to flush
- list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
+ list.PickMemtablesToFlush(
+ std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush);
ASSERT_EQ(2, to_flush.size());
ASSERT_EQ(2, list.NumNotFlushed());
ASSERT_FALSE(list.IsFlushPending());
ASSERT_EQ(0, to_delete.size());
// Pick tables to flush
- list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
+ list.PickMemtablesToFlush(
+ std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush);
ASSERT_EQ(3, to_flush.size());
ASSERT_EQ(3, list.NumNotFlushed());
ASSERT_FALSE(list.IsFlushPending());
// Pick tables to flush again
autovector<MemTable*> to_flush2;
- list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2);
+ list.PickMemtablesToFlush(
+ std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush2);
ASSERT_EQ(0, to_flush2.size());
ASSERT_EQ(3, list.NumNotFlushed());
ASSERT_FALSE(list.IsFlushPending());
ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
// Pick tables to flush again
- list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2);
+ list.PickMemtablesToFlush(
+ std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush2);
ASSERT_EQ(1, to_flush2.size());
ASSERT_EQ(4, list.NumNotFlushed());
ASSERT_FALSE(list.IsFlushPending());
ASSERT_EQ(0, to_delete.size());
// Pick tables to flush
- list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
- // Should pick 4 of 5 since 1 table has been picked in to_flush2
- ASSERT_EQ(4, to_flush.size());
+ list.PickMemtablesToFlush(
+ std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush);
+ // Picks three oldest memtables. The fourth oldest is picked in `to_flush2` so
+ // must be excluded. The newest (fifth oldest) is non-consecutive with the
+ // three oldest due to omitting the fourth oldest so must not be picked.
+ ASSERT_EQ(3, to_flush.size());
ASSERT_EQ(5, list.NumNotFlushed());
ASSERT_FALSE(list.IsFlushPending());
- ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
+ ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
// Pick tables to flush again
autovector<MemTable*> to_flush3;
- list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush3);
- ASSERT_EQ(0, to_flush3.size()); // nothing not in progress of being flushed
+ list.PickMemtablesToFlush(
+ std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush3);
+ // Picks newest (fifth oldest)
+ ASSERT_EQ(1, to_flush3.size());
+ ASSERT_EQ(5, list.NumNotFlushed());
+ ASSERT_FALSE(list.IsFlushPending());
+ ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
+
+ // Nothing left to flush
+ autovector<MemTable*> to_flush4;
+ list.PickMemtablesToFlush(
+ std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush4);
+ ASSERT_EQ(0, to_flush4.size());
ASSERT_EQ(5, list.NumNotFlushed());
ASSERT_FALSE(list.IsFlushPending());
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
- // Flush the 4 memtables that were picked in to_flush
+ // Flush the 3 memtables that were picked in to_flush
s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush,
&to_delete);
ASSERT_OK(s);
- // Note: now to_flush contains tables[0,1,2,4]. to_flush2 contains
- // tables[3].
+ // Note: now to_flush contains tables[0,1,2]. to_flush2 contains
+ // tables[3]. to_flush3 contains tables[4].
// Current implementation will only commit memtables in the order they were
// created. So TryInstallMemtableFlushResults will install the first 3 tables
// in to_flush and stop when it encounters a table not yet flushed.
ASSERT_FALSE(list.IsFlushPending());
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
- // Flush the 1 memtable that was picked in to_flush2
+ // Flush the 1 memtable (tables[4]) that was picked in to_flush3
+ s = MemTableListTest::Mock_InstallMemtableFlushResults(
+ &list, mutable_cf_options, to_flush3, &to_delete);
+ ASSERT_OK(s);
+
+ // This will install 0 tables since tables[4] flushed while tables[3] has not
+ // yet flushed.
+ ASSERT_EQ(2, list.NumNotFlushed());
+ ASSERT_EQ(0, to_delete.size());
+
+ // Flush the 1 memtable (tables[3]) that was picked in to_flush2
s = MemTableListTest::Mock_InstallMemtableFlushResults(
&list, mutable_cf_options, to_flush2, &to_delete);
ASSERT_OK(s);
memtable_id = 4;
// Pick tables to flush. The tables to pick must have ID smaller than or
// equal to 4. Therefore, no table will be selected in this case.
- autovector<MemTable*> to_flush4;
+ autovector<MemTable*> to_flush5;
list.FlushRequested();
ASSERT_TRUE(list.HasFlushRequested());
- list.PickMemtablesToFlush(&memtable_id, &to_flush4);
- ASSERT_TRUE(to_flush4.empty());
+ list.PickMemtablesToFlush(memtable_id, &to_flush5);
+ ASSERT_TRUE(to_flush5.empty());
ASSERT_EQ(1, list.NumNotFlushed());
ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
ASSERT_FALSE(list.IsFlushPending());
// equal to 5. Therefore, only tables[5] will be selected.
memtable_id = 5;
list.FlushRequested();
- list.PickMemtablesToFlush(&memtable_id, &to_flush4);
- ASSERT_EQ(1, static_cast<int>(to_flush4.size()));
+ list.PickMemtablesToFlush(memtable_id, &to_flush5);
+ ASSERT_EQ(1, static_cast<int>(to_flush5.size()));
ASSERT_EQ(1, list.NumNotFlushed());
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
ASSERT_FALSE(list.IsFlushPending());
auto factory = std::make_shared<SkipListFactory>();
options.memtable_factory = factory;
- ImmutableCFOptions ioptions(options);
+ ImmutableOptions ioptions(options);
InternalKeyComparator cmp(BytewiseComparator());
WriteBufferManager wb(options.db_write_buffer_size);
std::string value;
- mem->Add(++seq, kTypeValue, "key1", ToString(i));
- mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN");
- mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value");
- mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM");
- mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), "");
+ ASSERT_OK(mem->Add(++seq, kTypeValue, "key1", std::to_string(i),
+ nullptr /* kv_prot_info */));
+ ASSERT_OK(mem->Add(++seq, kTypeValue, "keyN" + std::to_string(i),
+ "valueN", nullptr /* kv_prot_info */));
+ ASSERT_OK(mem->Add(++seq, kTypeValue, "keyX" + std::to_string(i), "value",
+ nullptr /* kv_prot_info */));
+ ASSERT_OK(mem->Add(++seq, kTypeValue, "keyM" + std::to_string(i),
+ "valueM", nullptr /* kv_prot_info */));
+ ASSERT_OK(mem->Add(++seq, kTypeDeletion, "keyX" + std::to_string(i), "",
+ nullptr /* kv_prot_info */));
elem.push_back(mem);
}
auto* list = lists[i];
ASSERT_FALSE(list->IsFlushPending());
ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire));
- list->PickMemtablesToFlush(nullptr /* memtable_id */, &flush_candidates[i]);
+ list->PickMemtablesToFlush(
+ std::numeric_limits<uint64_t>::max() /* memtable_id */,
+ &flush_candidates[i]);
ASSERT_EQ(0, flush_candidates[i].size());
}
// Request flush even though there is nothing to flush
// Pick memtables to flush
for (auto i = 0; i != num_cfs; ++i) {
flush_candidates[i].clear();
- lists[i]->PickMemtablesToFlush(&flush_memtable_ids[i],
- &flush_candidates[i]);
+ lists[i]->PickMemtablesToFlush(flush_memtable_ids[i], &flush_candidates[i]);
ASSERT_EQ(flush_memtable_ids[i] - 0 + 1,
static_cast<uint64_t>(flush_candidates[i].size()));
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}