]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/flush_job_test.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / rocksdb / db / flush_job_test.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae 5
20effc67
TL
6#include "db/flush_job.h"
7
7c673cae 8#include <algorithm>
f67539c2 9#include <array>
7c673cae
FG
10#include <map>
11#include <string>
12
20effc67 13#include "db/blob/blob_index.h"
7c673cae 14#include "db/column_family.h"
f67539c2 15#include "db/db_impl/db_impl.h"
7c673cae 16#include "db/version_set.h"
f67539c2 17#include "file/writable_file_writer.h"
7c673cae 18#include "rocksdb/cache.h"
1e59de90 19#include "rocksdb/file_system.h"
7c673cae
FG
20#include "rocksdb/write_buffer_manager.h"
21#include "table/mock_table.h"
f67539c2
TL
22#include "test_util/testharness.h"
23#include "test_util/testutil.h"
20effc67 24#include "util/random.h"
7c673cae 25#include "util/string_util.h"
7c673cae 26
f67539c2 27namespace ROCKSDB_NAMESPACE {
7c673cae
FG
28
29// TODO(icanadi) Mock out everything else:
30// 1. VersionSet
31// 2. Memtable
20effc67
TL
32class FlushJobTestBase : public testing::Test {
33 protected:
34 FlushJobTestBase(std::string dbname, const Comparator* ucmp)
7c673cae 35 : env_(Env::Default()),
1e59de90 36 fs_(env_->GetFileSystem()),
20effc67
TL
37 dbname_(std::move(dbname)),
38 ucmp_(ucmp),
7c673cae
FG
39 options_(),
40 db_options_(options_),
494da23a 41 column_family_names_({kDefaultColumnFamilyName, "foo", "bar"}),
7c673cae
FG
42 table_cache_(NewLRUCache(50000, 16)),
43 write_buffer_manager_(db_options_.db_write_buffer_size),
7c673cae 44 shutting_down_(false),
20effc67 45 mock_table_factory_(new mock::MockTableFactory()) {}
7c673cae 46
20effc67
TL
47 virtual ~FlushJobTestBase() {
48 if (getenv("KEEP_DB")) {
49 fprintf(stdout, "db is still in %s\n", dbname_.c_str());
50 } else {
1e59de90
TL
51 // destroy versions_ to release all file handles
52 versions_.reset();
20effc67
TL
53 EXPECT_OK(DestroyDir(env_, dbname_));
54 }
7c673cae
FG
55 }
56
57 void NewDB() {
1e59de90 58 ASSERT_OK(SetIdentityFile(env_, dbname_));
7c673cae 59 VersionEdit new_db;
20effc67 60
7c673cae
FG
61 new_db.SetLogNumber(0);
62 new_db.SetNextFile(2);
63 new_db.SetLastSequence(0);
64
494da23a
TL
65 autovector<VersionEdit> new_cfs;
66 SequenceNumber last_seq = 1;
67 uint32_t cf_id = 1;
68 for (size_t i = 1; i != column_family_names_.size(); ++i) {
69 VersionEdit new_cf;
70 new_cf.AddColumnFamily(column_family_names_[i]);
71 new_cf.SetColumnFamily(cf_id++);
20effc67 72 new_cf.SetComparatorName(ucmp_->Name());
494da23a
TL
73 new_cf.SetLogNumber(0);
74 new_cf.SetNextFile(2);
75 new_cf.SetLastSequence(last_seq++);
76 new_cfs.emplace_back(new_cf);
77 }
78
7c673cae 79 const std::string manifest = DescriptorFileName(dbname_, 1);
1e59de90
TL
80 const auto& fs = env_->GetFileSystem();
81 std::unique_ptr<WritableFileWriter> file_writer;
82 Status s = WritableFileWriter::Create(
83 fs, manifest, fs->OptimizeForManifestWrite(env_options_), &file_writer,
84 nullptr);
7c673cae 85 ASSERT_OK(s);
1e59de90 86
7c673cae
FG
87 {
88 log::Writer log(std::move(file_writer), 0, false);
89 std::string record;
90 new_db.EncodeTo(&record);
91 s = log.AddRecord(record);
1e59de90 92 ASSERT_OK(s);
494da23a
TL
93
94 for (const auto& e : new_cfs) {
95 record.clear();
96 e.EncodeTo(&record);
97 s = log.AddRecord(record);
98 ASSERT_OK(s);
99 }
7c673cae
FG
100 }
101 ASSERT_OK(s);
102 // Make "CURRENT" file that points to the new manifest file.
20effc67
TL
103 s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr);
104 ASSERT_OK(s);
105 }
106
107 void SetUp() override {
108 EXPECT_OK(env_->CreateDirIfMissing(dbname_));
109
110 // TODO(icanadi) Remove this once we mock out VersionSet
111 NewDB();
112
113 db_options_.env = env_;
114 db_options_.fs = fs_;
115 db_options_.db_paths.emplace_back(dbname_,
116 std::numeric_limits<uint64_t>::max());
117 db_options_.statistics = CreateDBStatistics();
118
119 cf_options_.comparator = ucmp_;
120
121 std::vector<ColumnFamilyDescriptor> column_families;
122 cf_options_.table_factory = mock_table_factory_;
123 for (const auto& cf_name : column_family_names_) {
124 column_families.emplace_back(cf_name, cf_options_);
125 }
126
127 versions_.reset(
128 new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
129 &write_buffer_manager_, &write_controller_,
1e59de90
TL
130 /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
131 /*db_id*/ "", /*db_session_id*/ ""));
20effc67 132 EXPECT_OK(versions_->Recover(column_families, false));
7c673cae
FG
133 }
134
135 Env* env_;
f67539c2 136 std::shared_ptr<FileSystem> fs_;
7c673cae 137 std::string dbname_;
20effc67 138 const Comparator* const ucmp_;
7c673cae
FG
139 EnvOptions env_options_;
140 Options options_;
141 ImmutableDBOptions db_options_;
494da23a 142 const std::vector<std::string> column_family_names_;
7c673cae
FG
143 std::shared_ptr<Cache> table_cache_;
144 WriteController write_controller_;
145 WriteBufferManager write_buffer_manager_;
146 ColumnFamilyOptions cf_options_;
147 std::unique_ptr<VersionSet> versions_;
148 InstrumentedMutex mutex_;
149 std::atomic<bool> shutting_down_;
150 std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
1e59de90
TL
151
152 SeqnoToTimeMapping empty_seqno_to_time_mapping_;
7c673cae
FG
153};
154
20effc67
TL
155class FlushJobTest : public FlushJobTestBase {
156 public:
157 FlushJobTest()
158 : FlushJobTestBase(test::PerThreadDBPath("flush_job_test"),
159 BytewiseComparator()) {}
160};
161
7c673cae
FG
162TEST_F(FlushJobTest, Empty) {
163 JobContext job_context(0);
164 auto cfd = versions_->GetColumnFamilySet()->GetDefault();
165 EventLogger event_logger(db_options_.info_log.get());
11fdf7f2 166 SnapshotChecker* snapshot_checker = nullptr; // not relavant
1e59de90
TL
167 FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
168 db_options_, *cfd->GetLatestMutableCFOptions(),
169 std::numeric_limits<uint64_t>::max() /* memtable_id */,
170 env_options_, versions_.get(), &mutex_, &shutting_down_,
171 {}, kMaxSequenceNumber, snapshot_checker, &job_context,
172 nullptr, nullptr, nullptr, kNoCompression, nullptr,
173 &event_logger, false, true /* sync_output_directory */,
174 true /* write_manifest */, Env::Priority::USER,
175 nullptr /*IOTracer*/, empty_seqno_to_time_mapping_);
7c673cae
FG
176 {
177 InstrumentedMutexLock l(&mutex_);
178 flush_job.PickMemTable();
179 ASSERT_OK(flush_job.Run());
180 }
181 job_context.Clean();
182}
183
184TEST_F(FlushJobTest, NonEmpty) {
185 JobContext job_context(0);
186 auto cfd = versions_->GetColumnFamilySet()->GetDefault();
187 auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(),
188 kMaxSequenceNumber);
189 new_mem->Ref();
190 auto inserted_keys = mock::MakeMockFile();
191 // Test data:
192 // seqno [ 1, 2 ... 8998, 8999, 9000, 9001, 9002 ... 9999 ]
193 // key [ 1001, 1002 ... 9998, 9999, 0, 1, 2 ... 999 ]
194 // range-delete "9995" -> "9999" at seqno 10000
f67539c2 195 // blob references with seqnos 10001..10006
7c673cae 196 for (int i = 1; i < 10000; ++i) {
1e59de90 197 std::string key(std::to_string((i + 1000) % 10000));
7c673cae 198 std::string value("value" + key);
1e59de90
TL
199 ASSERT_OK(new_mem->Add(SequenceNumber(i), kTypeValue, key, value,
200 nullptr /* kv_prot_info */));
7c673cae
FG
201 if ((i + 1000) % 10000 < 9995) {
202 InternalKey internal_key(key, SequenceNumber(i), kTypeValue);
20effc67 203 inserted_keys.push_back({internal_key.Encode().ToString(), value});
7c673cae
FG
204 }
205 }
f67539c2
TL
206
207 {
1e59de90
TL
208 ASSERT_OK(new_mem->Add(SequenceNumber(10000), kTypeRangeDeletion, "9995",
209 "9999a", nullptr /* kv_prot_info */));
f67539c2 210 InternalKey internal_key("9995", SequenceNumber(10000), kTypeRangeDeletion);
20effc67 211 inserted_keys.push_back({internal_key.Encode().ToString(), "9999a"});
f67539c2
TL
212 }
213
f67539c2
TL
214 // Note: the first two blob references will not be considered when resolving
215 // the oldest blob file referenced (the first one is inlined TTL, while the
216 // second one is TTL and thus points to a TTL blob file).
1e59de90
TL
217 constexpr std::array<uint64_t, 6> blob_file_numbers{
218 {kInvalidBlobFileNumber, 5, 103, 17, 102, 101}};
f67539c2 219 for (size_t i = 0; i < blob_file_numbers.size(); ++i) {
1e59de90 220 std::string key(std::to_string(i + 10001));
f67539c2
TL
221 std::string blob_index;
222 if (i == 0) {
223 BlobIndex::EncodeInlinedTTL(&blob_index, /* expiration */ 1234567890ULL,
224 "foo");
225 } else if (i == 1) {
226 BlobIndex::EncodeBlobTTL(&blob_index, /* expiration */ 1234567890ULL,
227 blob_file_numbers[i], /* offset */ i << 10,
228 /* size */ i << 20, kNoCompression);
229 } else {
230 BlobIndex::EncodeBlob(&blob_index, blob_file_numbers[i],
231 /* offset */ i << 10, /* size */ i << 20,
232 kNoCompression);
233 }
234
235 const SequenceNumber seq(i + 10001);
1e59de90
TL
236 ASSERT_OK(new_mem->Add(seq, kTypeBlobIndex, key, blob_index,
237 nullptr /* kv_prot_info */));
f67539c2
TL
238
239 InternalKey internal_key(key, seq, kTypeBlobIndex);
20effc67 240 inserted_keys.push_back({internal_key.Encode().ToString(), blob_index});
f67539c2 241 }
20effc67 242 mock::SortKVVector(&inserted_keys);
7c673cae
FG
243
244 autovector<MemTable*> to_delete;
1e59de90 245 new_mem->ConstructFragmentedRangeTombstones();
7c673cae
FG
246 cfd->imm()->Add(new_mem, &to_delete);
247 for (auto& m : to_delete) {
248 delete m;
249 }
250
251 EventLogger event_logger(db_options_.info_log.get());
11fdf7f2 252 SnapshotChecker* snapshot_checker = nullptr; // not relavant
20effc67
TL
253 FlushJob flush_job(
254 dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
1e59de90
TL
255 *cfd->GetLatestMutableCFOptions(),
256 std::numeric_limits<uint64_t>::max() /* memtable_id */, env_options_,
257 versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber,
258 snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression,
259 db_options_.statistics.get(), &event_logger, true,
260 true /* sync_output_directory */, true /* write_manifest */,
261 Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_);
11fdf7f2
TL
262
263 HistogramData hist;
264 FileMetaData file_meta;
7c673cae
FG
265 mutex_.Lock();
266 flush_job.PickMemTable();
11fdf7f2 267 ASSERT_OK(flush_job.Run(nullptr, &file_meta));
7c673cae 268 mutex_.Unlock();
11fdf7f2
TL
269 db_options_.statistics->histogramData(FLUSH_TIME, &hist);
270 ASSERT_GT(hist.average, 0.0);
271
1e59de90 272 ASSERT_EQ(std::to_string(0), file_meta.smallest.user_key().ToString());
f67539c2 273 ASSERT_EQ("9999a", file_meta.largest.user_key().ToString());
11fdf7f2 274 ASSERT_EQ(1, file_meta.fd.smallest_seqno);
f67539c2
TL
275 ASSERT_EQ(10006, file_meta.fd.largest_seqno);
276 ASSERT_EQ(17, file_meta.oldest_blob_file_number);
7c673cae
FG
277 mock_table_factory_->AssertSingleFile(inserted_keys);
278 job_context.Clean();
279}
280
494da23a
TL
281TEST_F(FlushJobTest, FlushMemTablesSingleColumnFamily) {
282 const size_t num_mems = 2;
283 const size_t num_mems_to_flush = 1;
284 const size_t num_keys_per_table = 100;
285 JobContext job_context(0);
286 ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault();
287 std::vector<uint64_t> memtable_ids;
288 std::vector<MemTable*> new_mems;
289 for (size_t i = 0; i != num_mems; ++i) {
290 MemTable* mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(),
291 kMaxSequenceNumber);
292 mem->SetID(i);
293 mem->Ref();
294 new_mems.emplace_back(mem);
295 memtable_ids.push_back(mem->GetID());
296
297 for (size_t j = 0; j < num_keys_per_table; ++j) {
1e59de90 298 std::string key(std::to_string(j + i * num_keys_per_table));
494da23a 299 std::string value("value" + key);
1e59de90
TL
300 ASSERT_OK(mem->Add(SequenceNumber(j + i * num_keys_per_table), kTypeValue,
301 key, value, nullptr /* kv_prot_info */));
494da23a
TL
302 }
303 }
304
305 autovector<MemTable*> to_delete;
306 for (auto mem : new_mems) {
1e59de90 307 mem->ConstructFragmentedRangeTombstones();
494da23a
TL
308 cfd->imm()->Add(mem, &to_delete);
309 }
310
311 EventLogger event_logger(db_options_.info_log.get());
312 SnapshotChecker* snapshot_checker = nullptr; // not relavant
313
314 assert(memtable_ids.size() == num_mems);
315 uint64_t smallest_memtable_id = memtable_ids.front();
316 uint64_t flush_memtable_id = smallest_memtable_id + num_mems_to_flush - 1;
20effc67
TL
317 FlushJob flush_job(
318 dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
1e59de90 319 *cfd->GetLatestMutableCFOptions(), flush_memtable_id, env_options_,
20effc67
TL
320 versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber,
321 snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression,
322 db_options_.statistics.get(), &event_logger, true,
323 true /* sync_output_directory */, true /* write_manifest */,
1e59de90 324 Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_);
494da23a
TL
325 HistogramData hist;
326 FileMetaData file_meta;
327 mutex_.Lock();
328 flush_job.PickMemTable();
329 ASSERT_OK(flush_job.Run(nullptr /* prep_tracker */, &file_meta));
330 mutex_.Unlock();
331 db_options_.statistics->histogramData(FLUSH_TIME, &hist);
332 ASSERT_GT(hist.average, 0.0);
333
1e59de90 334 ASSERT_EQ(std::to_string(0), file_meta.smallest.user_key().ToString());
494da23a
TL
335 ASSERT_EQ("99", file_meta.largest.user_key().ToString());
336 ASSERT_EQ(0, file_meta.fd.smallest_seqno);
337 ASSERT_EQ(SequenceNumber(num_mems_to_flush * num_keys_per_table - 1),
338 file_meta.fd.largest_seqno);
f67539c2 339 ASSERT_EQ(kInvalidBlobFileNumber, file_meta.oldest_blob_file_number);
494da23a
TL
340
341 for (auto m : to_delete) {
342 delete m;
343 }
344 to_delete.clear();
345 job_context.Clean();
346}
347
348TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
349 autovector<ColumnFamilyData*> all_cfds;
350 for (auto cfd : *versions_->GetColumnFamilySet()) {
351 all_cfds.push_back(cfd);
352 }
353 const std::vector<size_t> num_memtables = {2, 1, 3};
354 assert(num_memtables.size() == column_family_names_.size());
355 const size_t num_keys_per_memtable = 1000;
356 JobContext job_context(0);
357 std::vector<uint64_t> memtable_ids;
358 std::vector<SequenceNumber> smallest_seqs;
359 std::vector<SequenceNumber> largest_seqs;
360 autovector<MemTable*> to_delete;
361 SequenceNumber curr_seqno = 0;
362 size_t k = 0;
363 for (auto cfd : all_cfds) {
364 smallest_seqs.push_back(curr_seqno);
365 for (size_t i = 0; i != num_memtables[k]; ++i) {
366 MemTable* mem = cfd->ConstructNewMemtable(
367 *cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber);
368 mem->SetID(i);
369 mem->Ref();
370
371 for (size_t j = 0; j != num_keys_per_memtable; ++j) {
1e59de90 372 std::string key(std::to_string(j + i * num_keys_per_memtable));
494da23a 373 std::string value("value" + key);
1e59de90
TL
374 ASSERT_OK(mem->Add(curr_seqno++, kTypeValue, key, value,
375 nullptr /* kv_prot_info */));
494da23a 376 }
1e59de90 377 mem->ConstructFragmentedRangeTombstones();
494da23a
TL
378 cfd->imm()->Add(mem, &to_delete);
379 }
380 largest_seqs.push_back(curr_seqno - 1);
381 memtable_ids.push_back(num_memtables[k++] - 1);
382 }
383
384 EventLogger event_logger(db_options_.info_log.get());
385 SnapshotChecker* snapshot_checker = nullptr; // not relevant
f67539c2 386 std::vector<std::unique_ptr<FlushJob>> flush_jobs;
494da23a
TL
387 k = 0;
388 for (auto cfd : all_cfds) {
389 std::vector<SequenceNumber> snapshot_seqs;
f67539c2 390 flush_jobs.emplace_back(new FlushJob(
494da23a 391 dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(),
1e59de90 392 memtable_ids[k], env_options_, versions_.get(), &mutex_,
494da23a
TL
393 &shutting_down_, snapshot_seqs, kMaxSequenceNumber, snapshot_checker,
394 &job_context, nullptr, nullptr, nullptr, kNoCompression,
395 db_options_.statistics.get(), &event_logger, true,
396 false /* sync_output_directory */, false /* write_manifest */,
1e59de90
TL
397 Env::Priority::USER, nullptr /*IOTracer*/,
398 empty_seqno_to_time_mapping_));
494da23a
TL
399 k++;
400 }
401 HistogramData hist;
402 std::vector<FileMetaData> file_metas;
403 // Call reserve to avoid auto-resizing
404 file_metas.reserve(flush_jobs.size());
405 mutex_.Lock();
406 for (auto& job : flush_jobs) {
f67539c2 407 job->PickMemTable();
494da23a
TL
408 }
409 for (auto& job : flush_jobs) {
410 FileMetaData meta;
411 // Run will release and re-acquire mutex
f67539c2 412 ASSERT_OK(job->Run(nullptr /**/, &meta));
494da23a
TL
413 file_metas.emplace_back(meta);
414 }
415 autovector<FileMetaData*> file_meta_ptrs;
416 for (auto& meta : file_metas) {
417 file_meta_ptrs.push_back(&meta);
418 }
419 autovector<const autovector<MemTable*>*> mems_list;
420 for (size_t i = 0; i != all_cfds.size(); ++i) {
f67539c2 421 const auto& mems = flush_jobs[i]->GetMemTables();
494da23a
TL
422 mems_list.push_back(&mems);
423 }
424 autovector<const MutableCFOptions*> mutable_cf_options_list;
425 for (auto cfd : all_cfds) {
426 mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
427 }
1e59de90
TL
428 autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
429 committed_flush_jobs_info;
430#ifndef ROCKSDB_LITE
431 for (auto& job : flush_jobs) {
432 committed_flush_jobs_info.push_back(job->GetCommittedFlushJobsInfo());
433 }
434#endif //! ROCKSDB_LITE
494da23a
TL
435
436 Status s = InstallMemtableAtomicFlushResults(
437 nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list,
1e59de90
TL
438 versions_.get(), nullptr /* prep_tracker */, &mutex_, file_meta_ptrs,
439 committed_flush_jobs_info, &job_context.memtables_to_free,
494da23a
TL
440 nullptr /* db_directory */, nullptr /* log_buffer */);
441 ASSERT_OK(s);
442
443 mutex_.Unlock();
444 db_options_.statistics->histogramData(FLUSH_TIME, &hist);
445 ASSERT_GT(hist.average, 0.0);
446 k = 0;
447 for (const auto& file_meta : file_metas) {
1e59de90 448 ASSERT_EQ(std::to_string(0), file_meta.smallest.user_key().ToString());
494da23a
TL
449 ASSERT_EQ("999", file_meta.largest.user_key()
450 .ToString()); // max key by bytewise comparator
451 ASSERT_EQ(smallest_seqs[k], file_meta.fd.smallest_seqno);
452 ASSERT_EQ(largest_seqs[k], file_meta.fd.largest_seqno);
453 // Verify that imm is empty
454 ASSERT_EQ(std::numeric_limits<uint64_t>::max(),
455 all_cfds[k]->imm()->GetEarliestMemTableID());
456 ASSERT_EQ(0, all_cfds[k]->imm()->GetLatestMemTableID());
457 ++k;
458 }
459
460 for (auto m : to_delete) {
461 delete m;
462 }
463 to_delete.clear();
464 job_context.Clean();
465}
466
7c673cae
FG
467TEST_F(FlushJobTest, Snapshots) {
468 JobContext job_context(0);
469 auto cfd = versions_->GetColumnFamilySet()->GetDefault();
470 auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(),
471 kMaxSequenceNumber);
472
7c673cae
FG
473 std::set<SequenceNumber> snapshots_set;
474 int keys = 10000;
475 int max_inserts_per_keys = 8;
476
477 Random rnd(301);
478 for (int i = 0; i < keys / 2; ++i) {
494da23a 479 snapshots_set.insert(rnd.Uniform(keys * (max_inserts_per_keys / 2)) + 1);
7c673cae 480 }
494da23a
TL
481 // set has already removed the duplicate snapshots
482 std::vector<SequenceNumber> snapshots(snapshots_set.begin(),
483 snapshots_set.end());
7c673cae
FG
484
485 new_mem->Ref();
486 SequenceNumber current_seqno = 0;
487 auto inserted_keys = mock::MakeMockFile();
488 for (int i = 1; i < keys; ++i) {
1e59de90 489 std::string key(std::to_string(i));
7c673cae
FG
490 int insertions = rnd.Uniform(max_inserts_per_keys);
491 for (int j = 0; j < insertions; ++j) {
20effc67 492 std::string value(rnd.HumanReadableString(10));
7c673cae 493 auto seqno = ++current_seqno;
1e59de90
TL
494 ASSERT_OK(new_mem->Add(SequenceNumber(seqno), kTypeValue, key, value,
495 nullptr /* kv_prot_info */));
7c673cae
FG
496 // a key is visible only if:
497 // 1. it's the last one written (j == insertions - 1)
498 // 2. there's a snapshot pointing at it
499 bool visible = (j == insertions - 1) ||
500 (snapshots_set.find(seqno) != snapshots_set.end());
501 if (visible) {
502 InternalKey internal_key(key, seqno, kTypeValue);
20effc67 503 inserted_keys.push_back({internal_key.Encode().ToString(), value});
7c673cae
FG
504 }
505 }
506 }
20effc67 507 mock::SortKVVector(&inserted_keys);
7c673cae
FG
508
509 autovector<MemTable*> to_delete;
1e59de90 510 new_mem->ConstructFragmentedRangeTombstones();
7c673cae
FG
511 cfd->imm()->Add(new_mem, &to_delete);
512 for (auto& m : to_delete) {
513 delete m;
514 }
515
516 EventLogger event_logger(db_options_.info_log.get());
11fdf7f2 517 SnapshotChecker* snapshot_checker = nullptr; // not relavant
20effc67
TL
518 FlushJob flush_job(
519 dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
1e59de90
TL
520 *cfd->GetLatestMutableCFOptions(),
521 std::numeric_limits<uint64_t>::max() /* memtable_id */, env_options_,
522 versions_.get(), &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber,
523 snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression,
524 db_options_.statistics.get(), &event_logger, true,
525 true /* sync_output_directory */, true /* write_manifest */,
526 Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_);
7c673cae
FG
527 mutex_.Lock();
528 flush_job.PickMemTable();
529 ASSERT_OK(flush_job.Run());
530 mutex_.Unlock();
531 mock_table_factory_->AssertSingleFile(inserted_keys);
11fdf7f2
TL
532 HistogramData hist;
533 db_options_.statistics->histogramData(FLUSH_TIME, &hist);
534 ASSERT_GT(hist.average, 0.0);
7c673cae
FG
535 job_context.Clean();
536}
537
1e59de90
TL
538TEST_F(FlushJobTest, GetRateLimiterPriorityForWrite) {
539 // Prepare a FlushJob that flush MemTables of Single Column Family.
540 const size_t num_mems = 2;
541 const size_t num_mems_to_flush = 1;
542 const size_t num_keys_per_table = 100;
543 JobContext job_context(0);
544 ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault();
545 std::vector<uint64_t> memtable_ids;
546 std::vector<MemTable*> new_mems;
547 for (size_t i = 0; i != num_mems; ++i) {
548 MemTable* mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(),
549 kMaxSequenceNumber);
550 mem->SetID(i);
551 mem->Ref();
552 new_mems.emplace_back(mem);
553 memtable_ids.push_back(mem->GetID());
554
555 for (size_t j = 0; j < num_keys_per_table; ++j) {
556 std::string key(std::to_string(j + i * num_keys_per_table));
557 std::string value("value" + key);
558 ASSERT_OK(mem->Add(SequenceNumber(j + i * num_keys_per_table), kTypeValue,
559 key, value, nullptr /* kv_prot_info */));
560 }
561 }
562
563 autovector<MemTable*> to_delete;
564 for (auto mem : new_mems) {
565 mem->ConstructFragmentedRangeTombstones();
566 cfd->imm()->Add(mem, &to_delete);
567 }
568
569 EventLogger event_logger(db_options_.info_log.get());
570 SnapshotChecker* snapshot_checker = nullptr; // not relavant
571
572 assert(memtable_ids.size() == num_mems);
573 uint64_t smallest_memtable_id = memtable_ids.front();
574 uint64_t flush_memtable_id = smallest_memtable_id + num_mems_to_flush - 1;
575 FlushJob flush_job(
576 dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
577 *cfd->GetLatestMutableCFOptions(), flush_memtable_id, env_options_,
578 versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber,
579 snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression,
580 db_options_.statistics.get(), &event_logger, true,
581 true /* sync_output_directory */, true /* write_manifest */,
582 Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_);
583
584 // When the state from WriteController is normal.
585 ASSERT_EQ(flush_job.GetRateLimiterPriorityForWrite(), Env::IO_HIGH);
586
587 WriteController* write_controller =
588 flush_job.versions_->GetColumnFamilySet()->write_controller();
589
590 {
591 // When the state from WriteController is Delayed.
592 std::unique_ptr<WriteControllerToken> delay_token =
593 write_controller->GetDelayToken(1000000);
594 ASSERT_EQ(flush_job.GetRateLimiterPriorityForWrite(), Env::IO_USER);
595 }
596
597 {
598 // When the state from WriteController is Stopped.
599 std::unique_ptr<WriteControllerToken> stop_token =
600 write_controller->GetStopToken();
601 ASSERT_EQ(flush_job.GetRateLimiterPriorityForWrite(), Env::IO_USER);
602 }
603}
604
20effc67
TL
605class FlushJobTimestampTest : public FlushJobTestBase {
606 public:
607 FlushJobTimestampTest()
608 : FlushJobTestBase(test::PerThreadDBPath("flush_job_ts_gc_test"),
1e59de90 609 test::BytewiseComparatorWithU64TsWrapper()) {}
20effc67
TL
610
611 void AddKeyValueToMemtable(MemTable* memtable, std::string key, uint64_t ts,
612 SequenceNumber seq, ValueType value_type,
613 Slice value) {
614 std::string key_str(std::move(key));
615 PutFixed64(&key_str, ts);
1e59de90
TL
616 ASSERT_OK(memtable->Add(seq, value_type, key_str, value,
617 nullptr /* kv_prot_info */));
20effc67
TL
618 }
619
620 protected:
621 static constexpr uint64_t kStartTs = 10;
622 static constexpr SequenceNumber kStartSeq = 0;
623 SequenceNumber curr_seq_{kStartSeq};
624 std::atomic<uint64_t> curr_ts_{kStartTs};
625};
626
627TEST_F(FlushJobTimestampTest, AllKeysExpired) {
628 ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault();
629 autovector<MemTable*> to_delete;
630
631 {
632 MemTable* new_mem = cfd->ConstructNewMemtable(
633 *cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber);
634 new_mem->Ref();
635 for (int i = 0; i < 100; ++i) {
636 uint64_t ts = curr_ts_.fetch_add(1);
637 SequenceNumber seq = (curr_seq_++);
638 AddKeyValueToMemtable(new_mem, test::EncodeInt(0), ts, seq,
639 ValueType::kTypeValue, "0_value");
640 }
641 uint64_t ts = curr_ts_.fetch_add(1);
642 SequenceNumber seq = (curr_seq_++);
643 AddKeyValueToMemtable(new_mem, test::EncodeInt(0), ts, seq,
644 ValueType::kTypeDeletionWithTimestamp, "");
1e59de90 645 new_mem->ConstructFragmentedRangeTombstones();
20effc67
TL
646 cfd->imm()->Add(new_mem, &to_delete);
647 }
648
649 std::vector<SequenceNumber> snapshots;
650 constexpr SnapshotChecker* const snapshot_checker = nullptr;
651 JobContext job_context(0);
652 EventLogger event_logger(db_options_.info_log.get());
653 std::string full_history_ts_low;
654 PutFixed64(&full_history_ts_low, std::numeric_limits<uint64_t>::max());
655 FlushJob flush_job(
656 dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(),
1e59de90
TL
657 std::numeric_limits<uint64_t>::max() /* memtable_id */, env_options_,
658 versions_.get(), &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber,
659 snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression,
20effc67
TL
660 db_options_.statistics.get(), &event_logger, true,
661 true /* sync_output_directory */, true /* write_manifest */,
1e59de90
TL
662 Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_,
663 /*db_id=*/"",
20effc67
TL
664 /*db_session_id=*/"", full_history_ts_low);
665
666 FileMetaData fmeta;
667 mutex_.Lock();
668 flush_job.PickMemTable();
669 ASSERT_OK(flush_job.Run(/*prep_tracker=*/nullptr, &fmeta));
670 mutex_.Unlock();
671
672 {
673 std::string key = test::EncodeInt(0);
674 key.append(test::EncodeInt(curr_ts_.load(std::memory_order_relaxed) - 1));
675 InternalKey ikey(key, curr_seq_ - 1, ValueType::kTypeDeletionWithTimestamp);
676 ASSERT_EQ(ikey.Encode(), fmeta.smallest.Encode());
677 ASSERT_EQ(ikey.Encode(), fmeta.largest.Encode());
678 }
679
680 job_context.Clean();
681 ASSERT_TRUE(to_delete.empty());
682}
683
684TEST_F(FlushJobTimestampTest, NoKeyExpired) {
685 ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault();
686 autovector<MemTable*> to_delete;
687
688 {
689 MemTable* new_mem = cfd->ConstructNewMemtable(
690 *cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber);
691 new_mem->Ref();
692 for (int i = 0; i < 100; ++i) {
693 uint64_t ts = curr_ts_.fetch_add(1);
694 SequenceNumber seq = (curr_seq_++);
695 AddKeyValueToMemtable(new_mem, test::EncodeInt(0), ts, seq,
696 ValueType::kTypeValue, "0_value");
697 }
1e59de90 698 new_mem->ConstructFragmentedRangeTombstones();
20effc67
TL
699 cfd->imm()->Add(new_mem, &to_delete);
700 }
701
702 std::vector<SequenceNumber> snapshots;
703 SnapshotChecker* const snapshot_checker = nullptr;
704 JobContext job_context(0);
705 EventLogger event_logger(db_options_.info_log.get());
706 std::string full_history_ts_low;
707 PutFixed64(&full_history_ts_low, 0);
708 FlushJob flush_job(
709 dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(),
1e59de90
TL
710 std::numeric_limits<uint64_t>::max() /* memtable_id */, env_options_,
711 versions_.get(), &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber,
712 snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression,
20effc67
TL
713 db_options_.statistics.get(), &event_logger, true,
714 true /* sync_output_directory */, true /* write_manifest */,
1e59de90
TL
715 Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_,
716 /*db_id=*/"",
20effc67
TL
717 /*db_session_id=*/"", full_history_ts_low);
718
719 FileMetaData fmeta;
720 mutex_.Lock();
721 flush_job.PickMemTable();
722 ASSERT_OK(flush_job.Run(/*prep_tracker=*/nullptr, &fmeta));
723 mutex_.Unlock();
724
725 {
726 std::string ukey = test::EncodeInt(0);
727 std::string smallest_key =
728 ukey + test::EncodeInt(curr_ts_.load(std::memory_order_relaxed) - 1);
729 std::string largest_key = ukey + test::EncodeInt(kStartTs);
730 InternalKey smallest(smallest_key, curr_seq_ - 1, ValueType::kTypeValue);
731 InternalKey largest(largest_key, kStartSeq, ValueType::kTypeValue);
732 ASSERT_EQ(smallest.Encode(), fmeta.smallest.Encode());
733 ASSERT_EQ(largest.Encode(), fmeta.largest.Encode());
734 }
735 job_context.Clean();
736 ASSERT_TRUE(to_delete.empty());
737}
738
f67539c2 739} // namespace ROCKSDB_NAMESPACE
7c673cae
FG
740
741int main(int argc, char** argv) {
1e59de90 742 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
7c673cae
FG
743 ::testing::InitGoogleTest(&argc, argv);
744 return RUN_ALL_TESTS();
745}