]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/flush_job_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / flush_job_test.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae 5
20effc67
TL
6#include "db/flush_job.h"
7
7c673cae 8#include <algorithm>
f67539c2 9#include <array>
7c673cae
FG
10#include <map>
11#include <string>
12
20effc67 13#include "db/blob/blob_index.h"
7c673cae 14#include "db/column_family.h"
f67539c2 15#include "db/db_impl/db_impl.h"
7c673cae 16#include "db/version_set.h"
f67539c2 17#include "file/writable_file_writer.h"
7c673cae
FG
18#include "rocksdb/cache.h"
19#include "rocksdb/write_buffer_manager.h"
20#include "table/mock_table.h"
f67539c2
TL
21#include "test_util/testharness.h"
22#include "test_util/testutil.h"
20effc67 23#include "util/random.h"
7c673cae 24#include "util/string_util.h"
7c673cae 25
f67539c2 26namespace ROCKSDB_NAMESPACE {
7c673cae
FG
27
28// TODO(icanadi) Mock out everything else:
29// 1. VersionSet
30// 2. Memtable
20effc67
TL
31class FlushJobTestBase : public testing::Test {
32 protected:
33 FlushJobTestBase(std::string dbname, const Comparator* ucmp)
7c673cae 34 : env_(Env::Default()),
f67539c2 35 fs_(std::make_shared<LegacyFileSystemWrapper>(env_)),
20effc67
TL
36 dbname_(std::move(dbname)),
37 ucmp_(ucmp),
7c673cae
FG
38 options_(),
39 db_options_(options_),
494da23a 40 column_family_names_({kDefaultColumnFamilyName, "foo", "bar"}),
7c673cae
FG
41 table_cache_(NewLRUCache(50000, 16)),
42 write_buffer_manager_(db_options_.db_write_buffer_size),
7c673cae 43 shutting_down_(false),
20effc67 44 mock_table_factory_(new mock::MockTableFactory()) {}
7c673cae 45
20effc67
TL
46 virtual ~FlushJobTestBase() {
47 if (getenv("KEEP_DB")) {
48 fprintf(stdout, "db is still in %s\n", dbname_.c_str());
49 } else {
50 EXPECT_OK(DestroyDir(env_, dbname_));
51 }
7c673cae
FG
52 }
53
54 void NewDB() {
f67539c2 55 SetIdentityFile(env_, dbname_);
7c673cae 56 VersionEdit new_db;
20effc67 57
7c673cae
FG
58 new_db.SetLogNumber(0);
59 new_db.SetNextFile(2);
60 new_db.SetLastSequence(0);
61
494da23a
TL
62 autovector<VersionEdit> new_cfs;
63 SequenceNumber last_seq = 1;
64 uint32_t cf_id = 1;
65 for (size_t i = 1; i != column_family_names_.size(); ++i) {
66 VersionEdit new_cf;
67 new_cf.AddColumnFamily(column_family_names_[i]);
68 new_cf.SetColumnFamily(cf_id++);
20effc67 69 new_cf.SetComparatorName(ucmp_->Name());
494da23a
TL
70 new_cf.SetLogNumber(0);
71 new_cf.SetNextFile(2);
72 new_cf.SetLastSequence(last_seq++);
73 new_cfs.emplace_back(new_cf);
74 }
75
7c673cae 76 const std::string manifest = DescriptorFileName(dbname_, 1);
494da23a 77 std::unique_ptr<WritableFile> file;
7c673cae
FG
78 Status s = env_->NewWritableFile(
79 manifest, &file, env_->OptimizeForManifestWrite(env_options_));
80 ASSERT_OK(s);
f67539c2
TL
81 std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
82 NewLegacyWritableFileWrapper(std::move(file)), manifest, EnvOptions()));
7c673cae
FG
83 {
84 log::Writer log(std::move(file_writer), 0, false);
85 std::string record;
86 new_db.EncodeTo(&record);
87 s = log.AddRecord(record);
494da23a
TL
88
89 for (const auto& e : new_cfs) {
90 record.clear();
91 e.EncodeTo(&record);
92 s = log.AddRecord(record);
93 ASSERT_OK(s);
94 }
7c673cae
FG
95 }
96 ASSERT_OK(s);
97 // Make "CURRENT" file that points to the new manifest file.
20effc67
TL
98 s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr);
99 ASSERT_OK(s);
100 }
101
102 void SetUp() override {
103 EXPECT_OK(env_->CreateDirIfMissing(dbname_));
104
105 // TODO(icanadi) Remove this once we mock out VersionSet
106 NewDB();
107
108 db_options_.env = env_;
109 db_options_.fs = fs_;
110 db_options_.db_paths.emplace_back(dbname_,
111 std::numeric_limits<uint64_t>::max());
112 db_options_.statistics = CreateDBStatistics();
113
114 cf_options_.comparator = ucmp_;
115
116 std::vector<ColumnFamilyDescriptor> column_families;
117 cf_options_.table_factory = mock_table_factory_;
118 for (const auto& cf_name : column_family_names_) {
119 column_families.emplace_back(cf_name, cf_options_);
120 }
121
122 versions_.reset(
123 new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
124 &write_buffer_manager_, &write_controller_,
125 /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
126 EXPECT_OK(versions_->Recover(column_families, false));
7c673cae
FG
127 }
128
129 Env* env_;
f67539c2 130 std::shared_ptr<FileSystem> fs_;
7c673cae 131 std::string dbname_;
20effc67 132 const Comparator* const ucmp_;
7c673cae
FG
133 EnvOptions env_options_;
134 Options options_;
135 ImmutableDBOptions db_options_;
494da23a 136 const std::vector<std::string> column_family_names_;
7c673cae
FG
137 std::shared_ptr<Cache> table_cache_;
138 WriteController write_controller_;
139 WriteBufferManager write_buffer_manager_;
140 ColumnFamilyOptions cf_options_;
141 std::unique_ptr<VersionSet> versions_;
142 InstrumentedMutex mutex_;
143 std::atomic<bool> shutting_down_;
144 std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
145};
146
20effc67
TL
147class FlushJobTest : public FlushJobTestBase {
148 public:
149 FlushJobTest()
150 : FlushJobTestBase(test::PerThreadDBPath("flush_job_test"),
151 BytewiseComparator()) {}
152};
153
7c673cae
FG
154TEST_F(FlushJobTest, Empty) {
155 JobContext job_context(0);
156 auto cfd = versions_->GetColumnFamilySet()->GetDefault();
157 EventLogger event_logger(db_options_.info_log.get());
11fdf7f2 158 SnapshotChecker* snapshot_checker = nullptr; // not relavant
20effc67
TL
159 FlushJob flush_job(
160 dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
161 *cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */,
162 env_options_, versions_.get(), &mutex_, &shutting_down_, {},
163 kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr,
164 nullptr, kNoCompression, nullptr, &event_logger, false,
165 true /* sync_output_directory */, true /* write_manifest */,
166 Env::Priority::USER, nullptr /*IOTracer*/);
7c673cae
FG
167 {
168 InstrumentedMutexLock l(&mutex_);
169 flush_job.PickMemTable();
170 ASSERT_OK(flush_job.Run());
171 }
172 job_context.Clean();
173}
174
175TEST_F(FlushJobTest, NonEmpty) {
176 JobContext job_context(0);
177 auto cfd = versions_->GetColumnFamilySet()->GetDefault();
178 auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(),
179 kMaxSequenceNumber);
180 new_mem->Ref();
181 auto inserted_keys = mock::MakeMockFile();
182 // Test data:
183 // seqno [ 1, 2 ... 8998, 8999, 9000, 9001, 9002 ... 9999 ]
184 // key [ 1001, 1002 ... 9998, 9999, 0, 1, 2 ... 999 ]
185 // range-delete "9995" -> "9999" at seqno 10000
f67539c2 186 // blob references with seqnos 10001..10006
7c673cae
FG
187 for (int i = 1; i < 10000; ++i) {
188 std::string key(ToString((i + 1000) % 10000));
189 std::string value("value" + key);
190 new_mem->Add(SequenceNumber(i), kTypeValue, key, value);
191 if ((i + 1000) % 10000 < 9995) {
192 InternalKey internal_key(key, SequenceNumber(i), kTypeValue);
20effc67 193 inserted_keys.push_back({internal_key.Encode().ToString(), value});
7c673cae
FG
194 }
195 }
f67539c2
TL
196
197 {
198 new_mem->Add(SequenceNumber(10000), kTypeRangeDeletion, "9995", "9999a");
199 InternalKey internal_key("9995", SequenceNumber(10000), kTypeRangeDeletion);
20effc67 200 inserted_keys.push_back({internal_key.Encode().ToString(), "9999a"});
f67539c2
TL
201 }
202
f67539c2
TL
203 // Note: the first two blob references will not be considered when resolving
204 // the oldest blob file referenced (the first one is inlined TTL, while the
205 // second one is TTL and thus points to a TTL blob file).
20effc67
TL
206 constexpr std::array<uint64_t, 6> blob_file_numbers{{
207 kInvalidBlobFileNumber, 5, 103, 17, 102, 101}};
f67539c2
TL
208 for (size_t i = 0; i < blob_file_numbers.size(); ++i) {
209 std::string key(ToString(i + 10001));
210 std::string blob_index;
211 if (i == 0) {
212 BlobIndex::EncodeInlinedTTL(&blob_index, /* expiration */ 1234567890ULL,
213 "foo");
214 } else if (i == 1) {
215 BlobIndex::EncodeBlobTTL(&blob_index, /* expiration */ 1234567890ULL,
216 blob_file_numbers[i], /* offset */ i << 10,
217 /* size */ i << 20, kNoCompression);
218 } else {
219 BlobIndex::EncodeBlob(&blob_index, blob_file_numbers[i],
220 /* offset */ i << 10, /* size */ i << 20,
221 kNoCompression);
222 }
223
224 const SequenceNumber seq(i + 10001);
225 new_mem->Add(seq, kTypeBlobIndex, key, blob_index);
226
227 InternalKey internal_key(key, seq, kTypeBlobIndex);
20effc67 228 inserted_keys.push_back({internal_key.Encode().ToString(), blob_index});
f67539c2 229 }
20effc67 230 mock::SortKVVector(&inserted_keys);
7c673cae
FG
231
232 autovector<MemTable*> to_delete;
233 cfd->imm()->Add(new_mem, &to_delete);
234 for (auto& m : to_delete) {
235 delete m;
236 }
237
238 EventLogger event_logger(db_options_.info_log.get());
11fdf7f2 239 SnapshotChecker* snapshot_checker = nullptr; // not relavant
20effc67
TL
240 FlushJob flush_job(
241 dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
242 *cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */,
243 env_options_, versions_.get(), &mutex_, &shutting_down_, {},
244 kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr,
245 nullptr, kNoCompression, db_options_.statistics.get(), &event_logger,
246 true, true /* sync_output_directory */, true /* write_manifest */,
247 Env::Priority::USER, nullptr /*IOTracer*/);
11fdf7f2
TL
248
249 HistogramData hist;
250 FileMetaData file_meta;
7c673cae
FG
251 mutex_.Lock();
252 flush_job.PickMemTable();
11fdf7f2 253 ASSERT_OK(flush_job.Run(nullptr, &file_meta));
7c673cae 254 mutex_.Unlock();
11fdf7f2
TL
255 db_options_.statistics->histogramData(FLUSH_TIME, &hist);
256 ASSERT_GT(hist.average, 0.0);
257
258 ASSERT_EQ(ToString(0), file_meta.smallest.user_key().ToString());
f67539c2 259 ASSERT_EQ("9999a", file_meta.largest.user_key().ToString());
11fdf7f2 260 ASSERT_EQ(1, file_meta.fd.smallest_seqno);
f67539c2
TL
261 ASSERT_EQ(10006, file_meta.fd.largest_seqno);
262 ASSERT_EQ(17, file_meta.oldest_blob_file_number);
7c673cae
FG
263 mock_table_factory_->AssertSingleFile(inserted_keys);
264 job_context.Clean();
265}
266
494da23a
TL
267TEST_F(FlushJobTest, FlushMemTablesSingleColumnFamily) {
268 const size_t num_mems = 2;
269 const size_t num_mems_to_flush = 1;
270 const size_t num_keys_per_table = 100;
271 JobContext job_context(0);
272 ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault();
273 std::vector<uint64_t> memtable_ids;
274 std::vector<MemTable*> new_mems;
275 for (size_t i = 0; i != num_mems; ++i) {
276 MemTable* mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(),
277 kMaxSequenceNumber);
278 mem->SetID(i);
279 mem->Ref();
280 new_mems.emplace_back(mem);
281 memtable_ids.push_back(mem->GetID());
282
283 for (size_t j = 0; j < num_keys_per_table; ++j) {
284 std::string key(ToString(j + i * num_keys_per_table));
285 std::string value("value" + key);
286 mem->Add(SequenceNumber(j + i * num_keys_per_table), kTypeValue, key,
287 value);
288 }
289 }
290
291 autovector<MemTable*> to_delete;
292 for (auto mem : new_mems) {
293 cfd->imm()->Add(mem, &to_delete);
294 }
295
296 EventLogger event_logger(db_options_.info_log.get());
297 SnapshotChecker* snapshot_checker = nullptr; // not relavant
298
299 assert(memtable_ids.size() == num_mems);
300 uint64_t smallest_memtable_id = memtable_ids.front();
301 uint64_t flush_memtable_id = smallest_memtable_id + num_mems_to_flush - 1;
20effc67
TL
302 FlushJob flush_job(
303 dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
304 *cfd->GetLatestMutableCFOptions(), &flush_memtable_id, env_options_,
305 versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber,
306 snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression,
307 db_options_.statistics.get(), &event_logger, true,
308 true /* sync_output_directory */, true /* write_manifest */,
309 Env::Priority::USER, nullptr /*IOTracer*/);
494da23a
TL
310 HistogramData hist;
311 FileMetaData file_meta;
312 mutex_.Lock();
313 flush_job.PickMemTable();
314 ASSERT_OK(flush_job.Run(nullptr /* prep_tracker */, &file_meta));
315 mutex_.Unlock();
316 db_options_.statistics->histogramData(FLUSH_TIME, &hist);
317 ASSERT_GT(hist.average, 0.0);
318
319 ASSERT_EQ(ToString(0), file_meta.smallest.user_key().ToString());
320 ASSERT_EQ("99", file_meta.largest.user_key().ToString());
321 ASSERT_EQ(0, file_meta.fd.smallest_seqno);
322 ASSERT_EQ(SequenceNumber(num_mems_to_flush * num_keys_per_table - 1),
323 file_meta.fd.largest_seqno);
f67539c2 324 ASSERT_EQ(kInvalidBlobFileNumber, file_meta.oldest_blob_file_number);
494da23a
TL
325
326 for (auto m : to_delete) {
327 delete m;
328 }
329 to_delete.clear();
330 job_context.Clean();
331}
332
333TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
334 autovector<ColumnFamilyData*> all_cfds;
335 for (auto cfd : *versions_->GetColumnFamilySet()) {
336 all_cfds.push_back(cfd);
337 }
338 const std::vector<size_t> num_memtables = {2, 1, 3};
339 assert(num_memtables.size() == column_family_names_.size());
340 const size_t num_keys_per_memtable = 1000;
341 JobContext job_context(0);
342 std::vector<uint64_t> memtable_ids;
343 std::vector<SequenceNumber> smallest_seqs;
344 std::vector<SequenceNumber> largest_seqs;
345 autovector<MemTable*> to_delete;
346 SequenceNumber curr_seqno = 0;
347 size_t k = 0;
348 for (auto cfd : all_cfds) {
349 smallest_seqs.push_back(curr_seqno);
350 for (size_t i = 0; i != num_memtables[k]; ++i) {
351 MemTable* mem = cfd->ConstructNewMemtable(
352 *cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber);
353 mem->SetID(i);
354 mem->Ref();
355
356 for (size_t j = 0; j != num_keys_per_memtable; ++j) {
357 std::string key(ToString(j + i * num_keys_per_memtable));
358 std::string value("value" + key);
359 mem->Add(curr_seqno++, kTypeValue, key, value);
360 }
361
362 cfd->imm()->Add(mem, &to_delete);
363 }
364 largest_seqs.push_back(curr_seqno - 1);
365 memtable_ids.push_back(num_memtables[k++] - 1);
366 }
367
368 EventLogger event_logger(db_options_.info_log.get());
369 SnapshotChecker* snapshot_checker = nullptr; // not relevant
f67539c2 370 std::vector<std::unique_ptr<FlushJob>> flush_jobs;
494da23a
TL
371 k = 0;
372 for (auto cfd : all_cfds) {
373 std::vector<SequenceNumber> snapshot_seqs;
f67539c2 374 flush_jobs.emplace_back(new FlushJob(
494da23a
TL
375 dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(),
376 &memtable_ids[k], env_options_, versions_.get(), &mutex_,
377 &shutting_down_, snapshot_seqs, kMaxSequenceNumber, snapshot_checker,
378 &job_context, nullptr, nullptr, nullptr, kNoCompression,
379 db_options_.statistics.get(), &event_logger, true,
380 false /* sync_output_directory */, false /* write_manifest */,
20effc67 381 Env::Priority::USER, nullptr /*IOTracer*/));
494da23a
TL
382 k++;
383 }
384 HistogramData hist;
385 std::vector<FileMetaData> file_metas;
386 // Call reserve to avoid auto-resizing
387 file_metas.reserve(flush_jobs.size());
388 mutex_.Lock();
389 for (auto& job : flush_jobs) {
f67539c2 390 job->PickMemTable();
494da23a
TL
391 }
392 for (auto& job : flush_jobs) {
393 FileMetaData meta;
394 // Run will release and re-acquire mutex
f67539c2 395 ASSERT_OK(job->Run(nullptr /**/, &meta));
494da23a
TL
396 file_metas.emplace_back(meta);
397 }
398 autovector<FileMetaData*> file_meta_ptrs;
399 for (auto& meta : file_metas) {
400 file_meta_ptrs.push_back(&meta);
401 }
402 autovector<const autovector<MemTable*>*> mems_list;
403 for (size_t i = 0; i != all_cfds.size(); ++i) {
f67539c2 404 const auto& mems = flush_jobs[i]->GetMemTables();
494da23a
TL
405 mems_list.push_back(&mems);
406 }
407 autovector<const MutableCFOptions*> mutable_cf_options_list;
408 for (auto cfd : all_cfds) {
409 mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
410 }
411
412 Status s = InstallMemtableAtomicFlushResults(
413 nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list,
414 versions_.get(), &mutex_, file_meta_ptrs, &job_context.memtables_to_free,
415 nullptr /* db_directory */, nullptr /* log_buffer */);
416 ASSERT_OK(s);
417
418 mutex_.Unlock();
419 db_options_.statistics->histogramData(FLUSH_TIME, &hist);
420 ASSERT_GT(hist.average, 0.0);
421 k = 0;
422 for (const auto& file_meta : file_metas) {
423 ASSERT_EQ(ToString(0), file_meta.smallest.user_key().ToString());
424 ASSERT_EQ("999", file_meta.largest.user_key()
425 .ToString()); // max key by bytewise comparator
426 ASSERT_EQ(smallest_seqs[k], file_meta.fd.smallest_seqno);
427 ASSERT_EQ(largest_seqs[k], file_meta.fd.largest_seqno);
428 // Verify that imm is empty
429 ASSERT_EQ(std::numeric_limits<uint64_t>::max(),
430 all_cfds[k]->imm()->GetEarliestMemTableID());
431 ASSERT_EQ(0, all_cfds[k]->imm()->GetLatestMemTableID());
432 ++k;
433 }
434
435 for (auto m : to_delete) {
436 delete m;
437 }
438 to_delete.clear();
439 job_context.Clean();
440}
441
7c673cae
FG
442TEST_F(FlushJobTest, Snapshots) {
443 JobContext job_context(0);
444 auto cfd = versions_->GetColumnFamilySet()->GetDefault();
445 auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(),
446 kMaxSequenceNumber);
447
7c673cae
FG
448 std::set<SequenceNumber> snapshots_set;
449 int keys = 10000;
450 int max_inserts_per_keys = 8;
451
452 Random rnd(301);
453 for (int i = 0; i < keys / 2; ++i) {
494da23a 454 snapshots_set.insert(rnd.Uniform(keys * (max_inserts_per_keys / 2)) + 1);
7c673cae 455 }
494da23a
TL
456 // set has already removed the duplicate snapshots
457 std::vector<SequenceNumber> snapshots(snapshots_set.begin(),
458 snapshots_set.end());
7c673cae
FG
459
460 new_mem->Ref();
461 SequenceNumber current_seqno = 0;
462 auto inserted_keys = mock::MakeMockFile();
463 for (int i = 1; i < keys; ++i) {
464 std::string key(ToString(i));
465 int insertions = rnd.Uniform(max_inserts_per_keys);
466 for (int j = 0; j < insertions; ++j) {
20effc67 467 std::string value(rnd.HumanReadableString(10));
7c673cae
FG
468 auto seqno = ++current_seqno;
469 new_mem->Add(SequenceNumber(seqno), kTypeValue, key, value);
470 // a key is visible only if:
471 // 1. it's the last one written (j == insertions - 1)
472 // 2. there's a snapshot pointing at it
473 bool visible = (j == insertions - 1) ||
474 (snapshots_set.find(seqno) != snapshots_set.end());
475 if (visible) {
476 InternalKey internal_key(key, seqno, kTypeValue);
20effc67 477 inserted_keys.push_back({internal_key.Encode().ToString(), value});
7c673cae
FG
478 }
479 }
480 }
20effc67 481 mock::SortKVVector(&inserted_keys);
7c673cae
FG
482
483 autovector<MemTable*> to_delete;
484 cfd->imm()->Add(new_mem, &to_delete);
485 for (auto& m : to_delete) {
486 delete m;
487 }
488
489 EventLogger event_logger(db_options_.info_log.get());
11fdf7f2 490 SnapshotChecker* snapshot_checker = nullptr; // not relavant
20effc67
TL
491 FlushJob flush_job(
492 dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
493 *cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */,
494 env_options_, versions_.get(), &mutex_, &shutting_down_, snapshots,
495 kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr,
496 nullptr, kNoCompression, db_options_.statistics.get(), &event_logger,
497 true, true /* sync_output_directory */, true /* write_manifest */,
498 Env::Priority::USER, nullptr /*IOTracer*/);
7c673cae
FG
499 mutex_.Lock();
500 flush_job.PickMemTable();
501 ASSERT_OK(flush_job.Run());
502 mutex_.Unlock();
503 mock_table_factory_->AssertSingleFile(inserted_keys);
11fdf7f2
TL
504 HistogramData hist;
505 db_options_.statistics->histogramData(FLUSH_TIME, &hist);
506 ASSERT_GT(hist.average, 0.0);
7c673cae
FG
507 job_context.Clean();
508}
509
20effc67
TL
510class FlushJobTimestampTest : public FlushJobTestBase {
511 public:
512 FlushJobTimestampTest()
513 : FlushJobTestBase(test::PerThreadDBPath("flush_job_ts_gc_test"),
514 test::ComparatorWithU64Ts()) {}
515
516 void AddKeyValueToMemtable(MemTable* memtable, std::string key, uint64_t ts,
517 SequenceNumber seq, ValueType value_type,
518 Slice value) {
519 std::string key_str(std::move(key));
520 PutFixed64(&key_str, ts);
521 memtable->Add(seq, value_type, key_str, value);
522 }
523
524 protected:
525 static constexpr uint64_t kStartTs = 10;
526 static constexpr SequenceNumber kStartSeq = 0;
527 SequenceNumber curr_seq_{kStartSeq};
528 std::atomic<uint64_t> curr_ts_{kStartTs};
529};
530
531TEST_F(FlushJobTimestampTest, AllKeysExpired) {
532 ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault();
533 autovector<MemTable*> to_delete;
534
535 {
536 MemTable* new_mem = cfd->ConstructNewMemtable(
537 *cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber);
538 new_mem->Ref();
539 for (int i = 0; i < 100; ++i) {
540 uint64_t ts = curr_ts_.fetch_add(1);
541 SequenceNumber seq = (curr_seq_++);
542 AddKeyValueToMemtable(new_mem, test::EncodeInt(0), ts, seq,
543 ValueType::kTypeValue, "0_value");
544 }
545 uint64_t ts = curr_ts_.fetch_add(1);
546 SequenceNumber seq = (curr_seq_++);
547 AddKeyValueToMemtable(new_mem, test::EncodeInt(0), ts, seq,
548 ValueType::kTypeDeletionWithTimestamp, "");
549 cfd->imm()->Add(new_mem, &to_delete);
550 }
551
552 std::vector<SequenceNumber> snapshots;
553 constexpr SnapshotChecker* const snapshot_checker = nullptr;
554 JobContext job_context(0);
555 EventLogger event_logger(db_options_.info_log.get());
556 std::string full_history_ts_low;
557 PutFixed64(&full_history_ts_low, std::numeric_limits<uint64_t>::max());
558 FlushJob flush_job(
559 dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(),
560 nullptr /* memtable_id */, env_options_, versions_.get(), &mutex_,
561 &shutting_down_, snapshots, kMaxSequenceNumber, snapshot_checker,
562 &job_context, nullptr, nullptr, nullptr, kNoCompression,
563 db_options_.statistics.get(), &event_logger, true,
564 true /* sync_output_directory */, true /* write_manifest */,
565 Env::Priority::USER, nullptr /*IOTracer*/, /*db_id=*/"",
566 /*db_session_id=*/"", full_history_ts_low);
567
568 FileMetaData fmeta;
569 mutex_.Lock();
570 flush_job.PickMemTable();
571 ASSERT_OK(flush_job.Run(/*prep_tracker=*/nullptr, &fmeta));
572 mutex_.Unlock();
573
574 {
575 std::string key = test::EncodeInt(0);
576 key.append(test::EncodeInt(curr_ts_.load(std::memory_order_relaxed) - 1));
577 InternalKey ikey(key, curr_seq_ - 1, ValueType::kTypeDeletionWithTimestamp);
578 ASSERT_EQ(ikey.Encode(), fmeta.smallest.Encode());
579 ASSERT_EQ(ikey.Encode(), fmeta.largest.Encode());
580 }
581
582 job_context.Clean();
583 ASSERT_TRUE(to_delete.empty());
584}
585
586TEST_F(FlushJobTimestampTest, NoKeyExpired) {
587 ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault();
588 autovector<MemTable*> to_delete;
589
590 {
591 MemTable* new_mem = cfd->ConstructNewMemtable(
592 *cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber);
593 new_mem->Ref();
594 for (int i = 0; i < 100; ++i) {
595 uint64_t ts = curr_ts_.fetch_add(1);
596 SequenceNumber seq = (curr_seq_++);
597 AddKeyValueToMemtable(new_mem, test::EncodeInt(0), ts, seq,
598 ValueType::kTypeValue, "0_value");
599 }
600 cfd->imm()->Add(new_mem, &to_delete);
601 }
602
603 std::vector<SequenceNumber> snapshots;
604 SnapshotChecker* const snapshot_checker = nullptr;
605 JobContext job_context(0);
606 EventLogger event_logger(db_options_.info_log.get());
607 std::string full_history_ts_low;
608 PutFixed64(&full_history_ts_low, 0);
609 FlushJob flush_job(
610 dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(),
611 nullptr /* memtable_id */, env_options_, versions_.get(), &mutex_,
612 &shutting_down_, snapshots, kMaxSequenceNumber, snapshot_checker,
613 &job_context, nullptr, nullptr, nullptr, kNoCompression,
614 db_options_.statistics.get(), &event_logger, true,
615 true /* sync_output_directory */, true /* write_manifest */,
616 Env::Priority::USER, nullptr /*IOTracer*/, /*db_id=*/"",
617 /*db_session_id=*/"", full_history_ts_low);
618
619 FileMetaData fmeta;
620 mutex_.Lock();
621 flush_job.PickMemTable();
622 ASSERT_OK(flush_job.Run(/*prep_tracker=*/nullptr, &fmeta));
623 mutex_.Unlock();
624
625 {
626 std::string ukey = test::EncodeInt(0);
627 std::string smallest_key =
628 ukey + test::EncodeInt(curr_ts_.load(std::memory_order_relaxed) - 1);
629 std::string largest_key = ukey + test::EncodeInt(kStartTs);
630 InternalKey smallest(smallest_key, curr_seq_ - 1, ValueType::kTypeValue);
631 InternalKey largest(largest_key, kStartSeq, ValueType::kTypeValue);
632 ASSERT_EQ(smallest.Encode(), fmeta.smallest.Encode());
633 ASSERT_EQ(largest.Encode(), fmeta.largest.Encode());
634 }
635 job_context.Clean();
636 ASSERT_TRUE(to_delete.empty());
637}
638
f67539c2 639} // namespace ROCKSDB_NAMESPACE
7c673cae
FG
640
641int main(int argc, char** argv) {
642 ::testing::InitGoogleTest(&argc, argv);
643 return RUN_ALL_TESTS();
644}