1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "db/flush_job.h"
13 #include "db/blob/blob_index.h"
14 #include "db/column_family.h"
15 #include "db/db_impl/db_impl.h"
16 #include "db/version_set.h"
17 #include "file/writable_file_writer.h"
18 #include "rocksdb/cache.h"
19 #include "rocksdb/file_system.h"
20 #include "rocksdb/write_buffer_manager.h"
21 #include "table/mock_table.h"
22 #include "test_util/testharness.h"
23 #include "test_util/testutil.h"
24 #include "util/random.h"
25 #include "util/string_util.h"
27 namespace ROCKSDB_NAMESPACE
{
29 // TODO(icanadi) Mock out everything else:
32 class FlushJobTestBase
: public testing::Test
{
34 FlushJobTestBase(std::string dbname
, const Comparator
* ucmp
)
35 : env_(Env::Default()),
36 fs_(env_
->GetFileSystem()),
37 dbname_(std::move(dbname
)),
40 db_options_(options_
),
41 column_family_names_({kDefaultColumnFamilyName
, "foo", "bar"}),
42 table_cache_(NewLRUCache(50000, 16)),
43 write_buffer_manager_(db_options_
.db_write_buffer_size
),
44 shutting_down_(false),
45 mock_table_factory_(new mock::MockTableFactory()) {}
47 virtual ~FlushJobTestBase() {
48 if (getenv("KEEP_DB")) {
49 fprintf(stdout
, "db is still in %s\n", dbname_
.c_str());
51 // destroy versions_ to release all file handles
53 EXPECT_OK(DestroyDir(env_
, dbname_
));
58 ASSERT_OK(SetIdentityFile(env_
, dbname_
));
61 new_db
.SetLogNumber(0);
62 new_db
.SetNextFile(2);
63 new_db
.SetLastSequence(0);
65 autovector
<VersionEdit
> new_cfs
;
66 SequenceNumber last_seq
= 1;
68 for (size_t i
= 1; i
!= column_family_names_
.size(); ++i
) {
70 new_cf
.AddColumnFamily(column_family_names_
[i
]);
71 new_cf
.SetColumnFamily(cf_id
++);
72 new_cf
.SetComparatorName(ucmp_
->Name());
73 new_cf
.SetLogNumber(0);
74 new_cf
.SetNextFile(2);
75 new_cf
.SetLastSequence(last_seq
++);
76 new_cfs
.emplace_back(new_cf
);
79 const std::string manifest
= DescriptorFileName(dbname_
, 1);
80 const auto& fs
= env_
->GetFileSystem();
81 std::unique_ptr
<WritableFileWriter
> file_writer
;
82 Status s
= WritableFileWriter::Create(
83 fs
, manifest
, fs
->OptimizeForManifestWrite(env_options_
), &file_writer
,
88 log::Writer
log(std::move(file_writer
), 0, false);
90 new_db
.EncodeTo(&record
);
91 s
= log
.AddRecord(record
);
94 for (const auto& e
: new_cfs
) {
97 s
= log
.AddRecord(record
);
102 // Make "CURRENT" file that points to the new manifest file.
103 s
= SetCurrentFile(fs_
.get(), dbname_
, 1, nullptr);
107 void SetUp() override
{
108 EXPECT_OK(env_
->CreateDirIfMissing(dbname_
));
110 // TODO(icanadi) Remove this once we mock out VersionSet
113 db_options_
.env
= env_
;
114 db_options_
.fs
= fs_
;
115 db_options_
.db_paths
.emplace_back(dbname_
,
116 std::numeric_limits
<uint64_t>::max());
117 db_options_
.statistics
= CreateDBStatistics();
119 cf_options_
.comparator
= ucmp_
;
121 std::vector
<ColumnFamilyDescriptor
> column_families
;
122 cf_options_
.table_factory
= mock_table_factory_
;
123 for (const auto& cf_name
: column_family_names_
) {
124 column_families
.emplace_back(cf_name
, cf_options_
);
128 new VersionSet(dbname_
, &db_options_
, env_options_
, table_cache_
.get(),
129 &write_buffer_manager_
, &write_controller_
,
130 /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
131 /*db_id*/ "", /*db_session_id*/ ""));
132 EXPECT_OK(versions_
->Recover(column_families
, false));
136 std::shared_ptr
<FileSystem
> fs_
;
138 const Comparator
* const ucmp_
;
139 EnvOptions env_options_
;
141 ImmutableDBOptions db_options_
;
142 const std::vector
<std::string
> column_family_names_
;
143 std::shared_ptr
<Cache
> table_cache_
;
144 WriteController write_controller_
;
145 WriteBufferManager write_buffer_manager_
;
146 ColumnFamilyOptions cf_options_
;
147 std::unique_ptr
<VersionSet
> versions_
;
148 InstrumentedMutex mutex_
;
149 std::atomic
<bool> shutting_down_
;
150 std::shared_ptr
<mock::MockTableFactory
> mock_table_factory_
;
152 SeqnoToTimeMapping empty_seqno_to_time_mapping_
;
155 class FlushJobTest
: public FlushJobTestBase
{
158 : FlushJobTestBase(test::PerThreadDBPath("flush_job_test"),
159 BytewiseComparator()) {}
162 TEST_F(FlushJobTest
, Empty
) {
163 JobContext
job_context(0);
164 auto cfd
= versions_
->GetColumnFamilySet()->GetDefault();
165 EventLogger
event_logger(db_options_
.info_log
.get());
166 SnapshotChecker
* snapshot_checker
= nullptr; // not relavant
167 FlushJob
flush_job(dbname_
, versions_
->GetColumnFamilySet()->GetDefault(),
168 db_options_
, *cfd
->GetLatestMutableCFOptions(),
169 std::numeric_limits
<uint64_t>::max() /* memtable_id */,
170 env_options_
, versions_
.get(), &mutex_
, &shutting_down_
,
171 {}, kMaxSequenceNumber
, snapshot_checker
, &job_context
,
172 nullptr, nullptr, nullptr, kNoCompression
, nullptr,
173 &event_logger
, false, true /* sync_output_directory */,
174 true /* write_manifest */, Env::Priority::USER
,
175 nullptr /*IOTracer*/, empty_seqno_to_time_mapping_
);
177 InstrumentedMutexLock
l(&mutex_
);
178 flush_job
.PickMemTable();
179 ASSERT_OK(flush_job
.Run());
184 TEST_F(FlushJobTest
, NonEmpty
) {
185 JobContext
job_context(0);
186 auto cfd
= versions_
->GetColumnFamilySet()->GetDefault();
187 auto new_mem
= cfd
->ConstructNewMemtable(*cfd
->GetLatestMutableCFOptions(),
190 auto inserted_keys
= mock::MakeMockFile();
192 // seqno [ 1, 2 ... 8998, 8999, 9000, 9001, 9002 ... 9999 ]
193 // key [ 1001, 1002 ... 9998, 9999, 0, 1, 2 ... 999 ]
194 // range-delete "9995" -> "9999" at seqno 10000
195 // blob references with seqnos 10001..10006
196 for (int i
= 1; i
< 10000; ++i
) {
197 std::string
key(std::to_string((i
+ 1000) % 10000));
198 std::string
value("value" + key
);
199 ASSERT_OK(new_mem
->Add(SequenceNumber(i
), kTypeValue
, key
, value
,
200 nullptr /* kv_prot_info */));
201 if ((i
+ 1000) % 10000 < 9995) {
202 InternalKey
internal_key(key
, SequenceNumber(i
), kTypeValue
);
203 inserted_keys
.push_back({internal_key
.Encode().ToString(), value
});
208 ASSERT_OK(new_mem
->Add(SequenceNumber(10000), kTypeRangeDeletion
, "9995",
209 "9999a", nullptr /* kv_prot_info */));
210 InternalKey
internal_key("9995", SequenceNumber(10000), kTypeRangeDeletion
);
211 inserted_keys
.push_back({internal_key
.Encode().ToString(), "9999a"});
214 // Note: the first two blob references will not be considered when resolving
215 // the oldest blob file referenced (the first one is inlined TTL, while the
216 // second one is TTL and thus points to a TTL blob file).
217 constexpr std::array
<uint64_t, 6> blob_file_numbers
{
218 {kInvalidBlobFileNumber
, 5, 103, 17, 102, 101}};
219 for (size_t i
= 0; i
< blob_file_numbers
.size(); ++i
) {
220 std::string
key(std::to_string(i
+ 10001));
221 std::string blob_index
;
223 BlobIndex::EncodeInlinedTTL(&blob_index
, /* expiration */ 1234567890ULL,
226 BlobIndex::EncodeBlobTTL(&blob_index
, /* expiration */ 1234567890ULL,
227 blob_file_numbers
[i
], /* offset */ i
<< 10,
228 /* size */ i
<< 20, kNoCompression
);
230 BlobIndex::EncodeBlob(&blob_index
, blob_file_numbers
[i
],
231 /* offset */ i
<< 10, /* size */ i
<< 20,
235 const SequenceNumber
seq(i
+ 10001);
236 ASSERT_OK(new_mem
->Add(seq
, kTypeBlobIndex
, key
, blob_index
,
237 nullptr /* kv_prot_info */));
239 InternalKey
internal_key(key
, seq
, kTypeBlobIndex
);
240 inserted_keys
.push_back({internal_key
.Encode().ToString(), blob_index
});
242 mock::SortKVVector(&inserted_keys
);
244 autovector
<MemTable
*> to_delete
;
245 new_mem
->ConstructFragmentedRangeTombstones();
246 cfd
->imm()->Add(new_mem
, &to_delete
);
247 for (auto& m
: to_delete
) {
251 EventLogger
event_logger(db_options_
.info_log
.get());
252 SnapshotChecker
* snapshot_checker
= nullptr; // not relavant
254 dbname_
, versions_
->GetColumnFamilySet()->GetDefault(), db_options_
,
255 *cfd
->GetLatestMutableCFOptions(),
256 std::numeric_limits
<uint64_t>::max() /* memtable_id */, env_options_
,
257 versions_
.get(), &mutex_
, &shutting_down_
, {}, kMaxSequenceNumber
,
258 snapshot_checker
, &job_context
, nullptr, nullptr, nullptr, kNoCompression
,
259 db_options_
.statistics
.get(), &event_logger
, true,
260 true /* sync_output_directory */, true /* write_manifest */,
261 Env::Priority::USER
, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_
);
264 FileMetaData file_meta
;
266 flush_job
.PickMemTable();
267 ASSERT_OK(flush_job
.Run(nullptr, &file_meta
));
269 db_options_
.statistics
->histogramData(FLUSH_TIME
, &hist
);
270 ASSERT_GT(hist
.average
, 0.0);
272 ASSERT_EQ(std::to_string(0), file_meta
.smallest
.user_key().ToString());
273 ASSERT_EQ("9999a", file_meta
.largest
.user_key().ToString());
274 ASSERT_EQ(1, file_meta
.fd
.smallest_seqno
);
275 ASSERT_EQ(10006, file_meta
.fd
.largest_seqno
);
276 ASSERT_EQ(17, file_meta
.oldest_blob_file_number
);
277 mock_table_factory_
->AssertSingleFile(inserted_keys
);
281 TEST_F(FlushJobTest
, FlushMemTablesSingleColumnFamily
) {
282 const size_t num_mems
= 2;
283 const size_t num_mems_to_flush
= 1;
284 const size_t num_keys_per_table
= 100;
285 JobContext
job_context(0);
286 ColumnFamilyData
* cfd
= versions_
->GetColumnFamilySet()->GetDefault();
287 std::vector
<uint64_t> memtable_ids
;
288 std::vector
<MemTable
*> new_mems
;
289 for (size_t i
= 0; i
!= num_mems
; ++i
) {
290 MemTable
* mem
= cfd
->ConstructNewMemtable(*cfd
->GetLatestMutableCFOptions(),
294 new_mems
.emplace_back(mem
);
295 memtable_ids
.push_back(mem
->GetID());
297 for (size_t j
= 0; j
< num_keys_per_table
; ++j
) {
298 std::string
key(std::to_string(j
+ i
* num_keys_per_table
));
299 std::string
value("value" + key
);
300 ASSERT_OK(mem
->Add(SequenceNumber(j
+ i
* num_keys_per_table
), kTypeValue
,
301 key
, value
, nullptr /* kv_prot_info */));
305 autovector
<MemTable
*> to_delete
;
306 for (auto mem
: new_mems
) {
307 mem
->ConstructFragmentedRangeTombstones();
308 cfd
->imm()->Add(mem
, &to_delete
);
311 EventLogger
event_logger(db_options_
.info_log
.get());
312 SnapshotChecker
* snapshot_checker
= nullptr; // not relavant
314 assert(memtable_ids
.size() == num_mems
);
315 uint64_t smallest_memtable_id
= memtable_ids
.front();
316 uint64_t flush_memtable_id
= smallest_memtable_id
+ num_mems_to_flush
- 1;
318 dbname_
, versions_
->GetColumnFamilySet()->GetDefault(), db_options_
,
319 *cfd
->GetLatestMutableCFOptions(), flush_memtable_id
, env_options_
,
320 versions_
.get(), &mutex_
, &shutting_down_
, {}, kMaxSequenceNumber
,
321 snapshot_checker
, &job_context
, nullptr, nullptr, nullptr, kNoCompression
,
322 db_options_
.statistics
.get(), &event_logger
, true,
323 true /* sync_output_directory */, true /* write_manifest */,
324 Env::Priority::USER
, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_
);
326 FileMetaData file_meta
;
328 flush_job
.PickMemTable();
329 ASSERT_OK(flush_job
.Run(nullptr /* prep_tracker */, &file_meta
));
331 db_options_
.statistics
->histogramData(FLUSH_TIME
, &hist
);
332 ASSERT_GT(hist
.average
, 0.0);
334 ASSERT_EQ(std::to_string(0), file_meta
.smallest
.user_key().ToString());
335 ASSERT_EQ("99", file_meta
.largest
.user_key().ToString());
336 ASSERT_EQ(0, file_meta
.fd
.smallest_seqno
);
337 ASSERT_EQ(SequenceNumber(num_mems_to_flush
* num_keys_per_table
- 1),
338 file_meta
.fd
.largest_seqno
);
339 ASSERT_EQ(kInvalidBlobFileNumber
, file_meta
.oldest_blob_file_number
);
341 for (auto m
: to_delete
) {
348 TEST_F(FlushJobTest
, FlushMemtablesMultipleColumnFamilies
) {
349 autovector
<ColumnFamilyData
*> all_cfds
;
350 for (auto cfd
: *versions_
->GetColumnFamilySet()) {
351 all_cfds
.push_back(cfd
);
353 const std::vector
<size_t> num_memtables
= {2, 1, 3};
354 assert(num_memtables
.size() == column_family_names_
.size());
355 const size_t num_keys_per_memtable
= 1000;
356 JobContext
job_context(0);
357 std::vector
<uint64_t> memtable_ids
;
358 std::vector
<SequenceNumber
> smallest_seqs
;
359 std::vector
<SequenceNumber
> largest_seqs
;
360 autovector
<MemTable
*> to_delete
;
361 SequenceNumber curr_seqno
= 0;
363 for (auto cfd
: all_cfds
) {
364 smallest_seqs
.push_back(curr_seqno
);
365 for (size_t i
= 0; i
!= num_memtables
[k
]; ++i
) {
366 MemTable
* mem
= cfd
->ConstructNewMemtable(
367 *cfd
->GetLatestMutableCFOptions(), kMaxSequenceNumber
);
371 for (size_t j
= 0; j
!= num_keys_per_memtable
; ++j
) {
372 std::string
key(std::to_string(j
+ i
* num_keys_per_memtable
));
373 std::string
value("value" + key
);
374 ASSERT_OK(mem
->Add(curr_seqno
++, kTypeValue
, key
, value
,
375 nullptr /* kv_prot_info */));
377 mem
->ConstructFragmentedRangeTombstones();
378 cfd
->imm()->Add(mem
, &to_delete
);
380 largest_seqs
.push_back(curr_seqno
- 1);
381 memtable_ids
.push_back(num_memtables
[k
++] - 1);
384 EventLogger
event_logger(db_options_
.info_log
.get());
385 SnapshotChecker
* snapshot_checker
= nullptr; // not relevant
386 std::vector
<std::unique_ptr
<FlushJob
>> flush_jobs
;
388 for (auto cfd
: all_cfds
) {
389 std::vector
<SequenceNumber
> snapshot_seqs
;
390 flush_jobs
.emplace_back(new FlushJob(
391 dbname_
, cfd
, db_options_
, *cfd
->GetLatestMutableCFOptions(),
392 memtable_ids
[k
], env_options_
, versions_
.get(), &mutex_
,
393 &shutting_down_
, snapshot_seqs
, kMaxSequenceNumber
, snapshot_checker
,
394 &job_context
, nullptr, nullptr, nullptr, kNoCompression
,
395 db_options_
.statistics
.get(), &event_logger
, true,
396 false /* sync_output_directory */, false /* write_manifest */,
397 Env::Priority::USER
, nullptr /*IOTracer*/,
398 empty_seqno_to_time_mapping_
));
402 std::vector
<FileMetaData
> file_metas
;
403 // Call reserve to avoid auto-resizing
404 file_metas
.reserve(flush_jobs
.size());
406 for (auto& job
: flush_jobs
) {
409 for (auto& job
: flush_jobs
) {
411 // Run will release and re-acquire mutex
412 ASSERT_OK(job
->Run(nullptr /**/, &meta
));
413 file_metas
.emplace_back(meta
);
415 autovector
<FileMetaData
*> file_meta_ptrs
;
416 for (auto& meta
: file_metas
) {
417 file_meta_ptrs
.push_back(&meta
);
419 autovector
<const autovector
<MemTable
*>*> mems_list
;
420 for (size_t i
= 0; i
!= all_cfds
.size(); ++i
) {
421 const auto& mems
= flush_jobs
[i
]->GetMemTables();
422 mems_list
.push_back(&mems
);
424 autovector
<const MutableCFOptions
*> mutable_cf_options_list
;
425 for (auto cfd
: all_cfds
) {
426 mutable_cf_options_list
.push_back(cfd
->GetLatestMutableCFOptions());
428 autovector
<std::list
<std::unique_ptr
<FlushJobInfo
>>*>
429 committed_flush_jobs_info
;
431 for (auto& job
: flush_jobs
) {
432 committed_flush_jobs_info
.push_back(job
->GetCommittedFlushJobsInfo());
434 #endif //! ROCKSDB_LITE
436 Status s
= InstallMemtableAtomicFlushResults(
437 nullptr /* imm_lists */, all_cfds
, mutable_cf_options_list
, mems_list
,
438 versions_
.get(), nullptr /* prep_tracker */, &mutex_
, file_meta_ptrs
,
439 committed_flush_jobs_info
, &job_context
.memtables_to_free
,
440 nullptr /* db_directory */, nullptr /* log_buffer */);
444 db_options_
.statistics
->histogramData(FLUSH_TIME
, &hist
);
445 ASSERT_GT(hist
.average
, 0.0);
447 for (const auto& file_meta
: file_metas
) {
448 ASSERT_EQ(std::to_string(0), file_meta
.smallest
.user_key().ToString());
449 ASSERT_EQ("999", file_meta
.largest
.user_key()
450 .ToString()); // max key by bytewise comparator
451 ASSERT_EQ(smallest_seqs
[k
], file_meta
.fd
.smallest_seqno
);
452 ASSERT_EQ(largest_seqs
[k
], file_meta
.fd
.largest_seqno
);
453 // Verify that imm is empty
454 ASSERT_EQ(std::numeric_limits
<uint64_t>::max(),
455 all_cfds
[k
]->imm()->GetEarliestMemTableID());
456 ASSERT_EQ(0, all_cfds
[k
]->imm()->GetLatestMemTableID());
460 for (auto m
: to_delete
) {
467 TEST_F(FlushJobTest
, Snapshots
) {
468 JobContext
job_context(0);
469 auto cfd
= versions_
->GetColumnFamilySet()->GetDefault();
470 auto new_mem
= cfd
->ConstructNewMemtable(*cfd
->GetLatestMutableCFOptions(),
473 std::set
<SequenceNumber
> snapshots_set
;
475 int max_inserts_per_keys
= 8;
478 for (int i
= 0; i
< keys
/ 2; ++i
) {
479 snapshots_set
.insert(rnd
.Uniform(keys
* (max_inserts_per_keys
/ 2)) + 1);
481 // set has already removed the duplicate snapshots
482 std::vector
<SequenceNumber
> snapshots(snapshots_set
.begin(),
483 snapshots_set
.end());
486 SequenceNumber current_seqno
= 0;
487 auto inserted_keys
= mock::MakeMockFile();
488 for (int i
= 1; i
< keys
; ++i
) {
489 std::string
key(std::to_string(i
));
490 int insertions
= rnd
.Uniform(max_inserts_per_keys
);
491 for (int j
= 0; j
< insertions
; ++j
) {
492 std::string
value(rnd
.HumanReadableString(10));
493 auto seqno
= ++current_seqno
;
494 ASSERT_OK(new_mem
->Add(SequenceNumber(seqno
), kTypeValue
, key
, value
,
495 nullptr /* kv_prot_info */));
496 // a key is visible only if:
497 // 1. it's the last one written (j == insertions - 1)
498 // 2. there's a snapshot pointing at it
499 bool visible
= (j
== insertions
- 1) ||
500 (snapshots_set
.find(seqno
) != snapshots_set
.end());
502 InternalKey
internal_key(key
, seqno
, kTypeValue
);
503 inserted_keys
.push_back({internal_key
.Encode().ToString(), value
});
507 mock::SortKVVector(&inserted_keys
);
509 autovector
<MemTable
*> to_delete
;
510 new_mem
->ConstructFragmentedRangeTombstones();
511 cfd
->imm()->Add(new_mem
, &to_delete
);
512 for (auto& m
: to_delete
) {
516 EventLogger
event_logger(db_options_
.info_log
.get());
517 SnapshotChecker
* snapshot_checker
= nullptr; // not relavant
519 dbname_
, versions_
->GetColumnFamilySet()->GetDefault(), db_options_
,
520 *cfd
->GetLatestMutableCFOptions(),
521 std::numeric_limits
<uint64_t>::max() /* memtable_id */, env_options_
,
522 versions_
.get(), &mutex_
, &shutting_down_
, snapshots
, kMaxSequenceNumber
,
523 snapshot_checker
, &job_context
, nullptr, nullptr, nullptr, kNoCompression
,
524 db_options_
.statistics
.get(), &event_logger
, true,
525 true /* sync_output_directory */, true /* write_manifest */,
526 Env::Priority::USER
, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_
);
528 flush_job
.PickMemTable();
529 ASSERT_OK(flush_job
.Run());
531 mock_table_factory_
->AssertSingleFile(inserted_keys
);
533 db_options_
.statistics
->histogramData(FLUSH_TIME
, &hist
);
534 ASSERT_GT(hist
.average
, 0.0);
538 TEST_F(FlushJobTest
, GetRateLimiterPriorityForWrite
) {
539 // Prepare a FlushJob that flush MemTables of Single Column Family.
540 const size_t num_mems
= 2;
541 const size_t num_mems_to_flush
= 1;
542 const size_t num_keys_per_table
= 100;
543 JobContext
job_context(0);
544 ColumnFamilyData
* cfd
= versions_
->GetColumnFamilySet()->GetDefault();
545 std::vector
<uint64_t> memtable_ids
;
546 std::vector
<MemTable
*> new_mems
;
547 for (size_t i
= 0; i
!= num_mems
; ++i
) {
548 MemTable
* mem
= cfd
->ConstructNewMemtable(*cfd
->GetLatestMutableCFOptions(),
552 new_mems
.emplace_back(mem
);
553 memtable_ids
.push_back(mem
->GetID());
555 for (size_t j
= 0; j
< num_keys_per_table
; ++j
) {
556 std::string
key(std::to_string(j
+ i
* num_keys_per_table
));
557 std::string
value("value" + key
);
558 ASSERT_OK(mem
->Add(SequenceNumber(j
+ i
* num_keys_per_table
), kTypeValue
,
559 key
, value
, nullptr /* kv_prot_info */));
563 autovector
<MemTable
*> to_delete
;
564 for (auto mem
: new_mems
) {
565 mem
->ConstructFragmentedRangeTombstones();
566 cfd
->imm()->Add(mem
, &to_delete
);
569 EventLogger
event_logger(db_options_
.info_log
.get());
570 SnapshotChecker
* snapshot_checker
= nullptr; // not relavant
572 assert(memtable_ids
.size() == num_mems
);
573 uint64_t smallest_memtable_id
= memtable_ids
.front();
574 uint64_t flush_memtable_id
= smallest_memtable_id
+ num_mems_to_flush
- 1;
576 dbname_
, versions_
->GetColumnFamilySet()->GetDefault(), db_options_
,
577 *cfd
->GetLatestMutableCFOptions(), flush_memtable_id
, env_options_
,
578 versions_
.get(), &mutex_
, &shutting_down_
, {}, kMaxSequenceNumber
,
579 snapshot_checker
, &job_context
, nullptr, nullptr, nullptr, kNoCompression
,
580 db_options_
.statistics
.get(), &event_logger
, true,
581 true /* sync_output_directory */, true /* write_manifest */,
582 Env::Priority::USER
, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_
);
584 // When the state from WriteController is normal.
585 ASSERT_EQ(flush_job
.GetRateLimiterPriorityForWrite(), Env::IO_HIGH
);
587 WriteController
* write_controller
=
588 flush_job
.versions_
->GetColumnFamilySet()->write_controller();
591 // When the state from WriteController is Delayed.
592 std::unique_ptr
<WriteControllerToken
> delay_token
=
593 write_controller
->GetDelayToken(1000000);
594 ASSERT_EQ(flush_job
.GetRateLimiterPriorityForWrite(), Env::IO_USER
);
598 // When the state from WriteController is Stopped.
599 std::unique_ptr
<WriteControllerToken
> stop_token
=
600 write_controller
->GetStopToken();
601 ASSERT_EQ(flush_job
.GetRateLimiterPriorityForWrite(), Env::IO_USER
);
605 class FlushJobTimestampTest
: public FlushJobTestBase
{
607 FlushJobTimestampTest()
608 : FlushJobTestBase(test::PerThreadDBPath("flush_job_ts_gc_test"),
609 test::BytewiseComparatorWithU64TsWrapper()) {}
611 void AddKeyValueToMemtable(MemTable
* memtable
, std::string key
, uint64_t ts
,
612 SequenceNumber seq
, ValueType value_type
,
614 std::string
key_str(std::move(key
));
615 PutFixed64(&key_str
, ts
);
616 ASSERT_OK(memtable
->Add(seq
, value_type
, key_str
, value
,
617 nullptr /* kv_prot_info */));
621 static constexpr uint64_t kStartTs
= 10;
622 static constexpr SequenceNumber kStartSeq
= 0;
623 SequenceNumber curr_seq_
{kStartSeq
};
624 std::atomic
<uint64_t> curr_ts_
{kStartTs
};
627 TEST_F(FlushJobTimestampTest
, AllKeysExpired
) {
628 ColumnFamilyData
* cfd
= versions_
->GetColumnFamilySet()->GetDefault();
629 autovector
<MemTable
*> to_delete
;
632 MemTable
* new_mem
= cfd
->ConstructNewMemtable(
633 *cfd
->GetLatestMutableCFOptions(), kMaxSequenceNumber
);
635 for (int i
= 0; i
< 100; ++i
) {
636 uint64_t ts
= curr_ts_
.fetch_add(1);
637 SequenceNumber seq
= (curr_seq_
++);
638 AddKeyValueToMemtable(new_mem
, test::EncodeInt(0), ts
, seq
,
639 ValueType::kTypeValue
, "0_value");
641 uint64_t ts
= curr_ts_
.fetch_add(1);
642 SequenceNumber seq
= (curr_seq_
++);
643 AddKeyValueToMemtable(new_mem
, test::EncodeInt(0), ts
, seq
,
644 ValueType::kTypeDeletionWithTimestamp
, "");
645 new_mem
->ConstructFragmentedRangeTombstones();
646 cfd
->imm()->Add(new_mem
, &to_delete
);
649 std::vector
<SequenceNumber
> snapshots
;
650 constexpr SnapshotChecker
* const snapshot_checker
= nullptr;
651 JobContext
job_context(0);
652 EventLogger
event_logger(db_options_
.info_log
.get());
653 std::string full_history_ts_low
;
654 PutFixed64(&full_history_ts_low
, std::numeric_limits
<uint64_t>::max());
656 dbname_
, cfd
, db_options_
, *cfd
->GetLatestMutableCFOptions(),
657 std::numeric_limits
<uint64_t>::max() /* memtable_id */, env_options_
,
658 versions_
.get(), &mutex_
, &shutting_down_
, snapshots
, kMaxSequenceNumber
,
659 snapshot_checker
, &job_context
, nullptr, nullptr, nullptr, kNoCompression
,
660 db_options_
.statistics
.get(), &event_logger
, true,
661 true /* sync_output_directory */, true /* write_manifest */,
662 Env::Priority::USER
, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_
,
664 /*db_session_id=*/"", full_history_ts_low
);
668 flush_job
.PickMemTable();
669 ASSERT_OK(flush_job
.Run(/*prep_tracker=*/nullptr, &fmeta
));
673 std::string key
= test::EncodeInt(0);
674 key
.append(test::EncodeInt(curr_ts_
.load(std::memory_order_relaxed
) - 1));
675 InternalKey
ikey(key
, curr_seq_
- 1, ValueType::kTypeDeletionWithTimestamp
);
676 ASSERT_EQ(ikey
.Encode(), fmeta
.smallest
.Encode());
677 ASSERT_EQ(ikey
.Encode(), fmeta
.largest
.Encode());
681 ASSERT_TRUE(to_delete
.empty());
684 TEST_F(FlushJobTimestampTest
, NoKeyExpired
) {
685 ColumnFamilyData
* cfd
= versions_
->GetColumnFamilySet()->GetDefault();
686 autovector
<MemTable
*> to_delete
;
689 MemTable
* new_mem
= cfd
->ConstructNewMemtable(
690 *cfd
->GetLatestMutableCFOptions(), kMaxSequenceNumber
);
692 for (int i
= 0; i
< 100; ++i
) {
693 uint64_t ts
= curr_ts_
.fetch_add(1);
694 SequenceNumber seq
= (curr_seq_
++);
695 AddKeyValueToMemtable(new_mem
, test::EncodeInt(0), ts
, seq
,
696 ValueType::kTypeValue
, "0_value");
698 new_mem
->ConstructFragmentedRangeTombstones();
699 cfd
->imm()->Add(new_mem
, &to_delete
);
702 std::vector
<SequenceNumber
> snapshots
;
703 SnapshotChecker
* const snapshot_checker
= nullptr;
704 JobContext
job_context(0);
705 EventLogger
event_logger(db_options_
.info_log
.get());
706 std::string full_history_ts_low
;
707 PutFixed64(&full_history_ts_low
, 0);
709 dbname_
, cfd
, db_options_
, *cfd
->GetLatestMutableCFOptions(),
710 std::numeric_limits
<uint64_t>::max() /* memtable_id */, env_options_
,
711 versions_
.get(), &mutex_
, &shutting_down_
, snapshots
, kMaxSequenceNumber
,
712 snapshot_checker
, &job_context
, nullptr, nullptr, nullptr, kNoCompression
,
713 db_options_
.statistics
.get(), &event_logger
, true,
714 true /* sync_output_directory */, true /* write_manifest */,
715 Env::Priority::USER
, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_
,
717 /*db_session_id=*/"", full_history_ts_low
);
721 flush_job
.PickMemTable();
722 ASSERT_OK(flush_job
.Run(/*prep_tracker=*/nullptr, &fmeta
));
726 std::string ukey
= test::EncodeInt(0);
727 std::string smallest_key
=
728 ukey
+ test::EncodeInt(curr_ts_
.load(std::memory_order_relaxed
) - 1);
729 std::string largest_key
= ukey
+ test::EncodeInt(kStartTs
);
730 InternalKey
smallest(smallest_key
, curr_seq_
- 1, ValueType::kTypeValue
);
731 InternalKey
largest(largest_key
, kStartSeq
, ValueType::kTypeValue
);
732 ASSERT_EQ(smallest
.Encode(), fmeta
.smallest
.Encode());
733 ASSERT_EQ(largest
.Encode(), fmeta
.largest
.Encode());
736 ASSERT_TRUE(to_delete
.empty());
739 } // namespace ROCKSDB_NAMESPACE
741 int main(int argc
, char** argv
) {
742 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
743 ::testing::InitGoogleTest(&argc
, argv
);
744 return RUN_ALL_TESTS();