]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/flush_job_test.cc
buildsys: switch source download to quincy
[ceph.git] / ceph / src / rocksdb / db / flush_job_test.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5
6#include <algorithm>
f67539c2 7#include <array>
7c673cae
FG
8#include <map>
9#include <string>
10
f67539c2 11#include "db/blob_index.h"
7c673cae 12#include "db/column_family.h"
f67539c2 13#include "db/db_impl/db_impl.h"
7c673cae
FG
14#include "db/flush_job.h"
15#include "db/version_set.h"
f67539c2 16#include "file/writable_file_writer.h"
7c673cae
FG
17#include "rocksdb/cache.h"
18#include "rocksdb/write_buffer_manager.h"
19#include "table/mock_table.h"
f67539c2
TL
20#include "test_util/testharness.h"
21#include "test_util/testutil.h"
7c673cae 22#include "util/string_util.h"
7c673cae 23
f67539c2 24namespace ROCKSDB_NAMESPACE {
7c673cae
FG
25
26// TODO(icanadi) Mock out everything else:
27// 1. VersionSet
28// 2. Memtable
29class FlushJobTest : public testing::Test {
30 public:
31 FlushJobTest()
32 : env_(Env::Default()),
f67539c2 33 fs_(std::make_shared<LegacyFileSystemWrapper>(env_)),
11fdf7f2 34 dbname_(test::PerThreadDBPath("flush_job_test")),
7c673cae
FG
35 options_(),
36 db_options_(options_),
494da23a 37 column_family_names_({kDefaultColumnFamilyName, "foo", "bar"}),
7c673cae
FG
38 table_cache_(NewLRUCache(50000, 16)),
39 write_buffer_manager_(db_options_.db_write_buffer_size),
7c673cae
FG
40 shutting_down_(false),
41 mock_table_factory_(new mock::MockTableFactory()) {
42 EXPECT_OK(env_->CreateDirIfMissing(dbname_));
43 db_options_.db_paths.emplace_back(dbname_,
44 std::numeric_limits<uint64_t>::max());
f67539c2 45 db_options_.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
7c673cae
FG
46 // TODO(icanadi) Remove this once we mock out VersionSet
47 NewDB();
48 std::vector<ColumnFamilyDescriptor> column_families;
49 cf_options_.table_factory = mock_table_factory_;
494da23a
TL
50 for (const auto& cf_name : column_family_names_) {
51 column_families.emplace_back(cf_name, cf_options_);
52 }
7c673cae 53
f67539c2
TL
54 db_options_.env = env_;
55 db_options_.fs = fs_;
56 versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,
57 table_cache_.get(), &write_buffer_manager_,
58 &write_controller_,
59 /*block_cache_tracer=*/nullptr));
7c673cae
FG
60 EXPECT_OK(versions_->Recover(column_families, false));
61 }
62
63 void NewDB() {
f67539c2 64 SetIdentityFile(env_, dbname_);
7c673cae 65 VersionEdit new_db;
f67539c2
TL
66 if (db_options_.write_dbid_to_manifest) {
67 DBImpl* impl = new DBImpl(DBOptions(), dbname_);
68 std::string db_id;
69 impl->GetDbIdentityFromIdentityFile(&db_id);
70 new_db.SetDBId(db_id);
71 }
7c673cae
FG
72 new_db.SetLogNumber(0);
73 new_db.SetNextFile(2);
74 new_db.SetLastSequence(0);
75
494da23a
TL
76 autovector<VersionEdit> new_cfs;
77 SequenceNumber last_seq = 1;
78 uint32_t cf_id = 1;
79 for (size_t i = 1; i != column_family_names_.size(); ++i) {
80 VersionEdit new_cf;
81 new_cf.AddColumnFamily(column_family_names_[i]);
82 new_cf.SetColumnFamily(cf_id++);
83 new_cf.SetLogNumber(0);
84 new_cf.SetNextFile(2);
85 new_cf.SetLastSequence(last_seq++);
86 new_cfs.emplace_back(new_cf);
87 }
88
7c673cae 89 const std::string manifest = DescriptorFileName(dbname_, 1);
494da23a 90 std::unique_ptr<WritableFile> file;
7c673cae
FG
91 Status s = env_->NewWritableFile(
92 manifest, &file, env_->OptimizeForManifestWrite(env_options_));
93 ASSERT_OK(s);
f67539c2
TL
94 std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
95 NewLegacyWritableFileWrapper(std::move(file)), manifest, EnvOptions()));
7c673cae
FG
96 {
97 log::Writer log(std::move(file_writer), 0, false);
98 std::string record;
99 new_db.EncodeTo(&record);
100 s = log.AddRecord(record);
494da23a
TL
101
102 for (const auto& e : new_cfs) {
103 record.clear();
104 e.EncodeTo(&record);
105 s = log.AddRecord(record);
106 ASSERT_OK(s);
107 }
7c673cae
FG
108 }
109 ASSERT_OK(s);
110 // Make "CURRENT" file that points to the new manifest file.
111 s = SetCurrentFile(env_, dbname_, 1, nullptr);
112 }
113
114 Env* env_;
f67539c2 115 std::shared_ptr<FileSystem> fs_;
7c673cae
FG
116 std::string dbname_;
117 EnvOptions env_options_;
118 Options options_;
119 ImmutableDBOptions db_options_;
494da23a 120 const std::vector<std::string> column_family_names_;
7c673cae
FG
121 std::shared_ptr<Cache> table_cache_;
122 WriteController write_controller_;
123 WriteBufferManager write_buffer_manager_;
124 ColumnFamilyOptions cf_options_;
125 std::unique_ptr<VersionSet> versions_;
126 InstrumentedMutex mutex_;
127 std::atomic<bool> shutting_down_;
128 std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
129};
130
131TEST_F(FlushJobTest, Empty) {
132 JobContext job_context(0);
133 auto cfd = versions_->GetColumnFamilySet()->GetDefault();
134 EventLogger event_logger(db_options_.info_log.get());
11fdf7f2 135 SnapshotChecker* snapshot_checker = nullptr; // not relavant
494da23a
TL
136 FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
137 db_options_, *cfd->GetLatestMutableCFOptions(),
138 nullptr /* memtable_id */, env_options_, versions_.get(),
139 &mutex_, &shutting_down_, {}, kMaxSequenceNumber,
140 snapshot_checker, &job_context, nullptr, nullptr, nullptr,
141 kNoCompression, nullptr, &event_logger, false,
142 true /* sync_output_directory */,
143 true /* write_manifest */, Env::Priority::USER);
7c673cae
FG
144 {
145 InstrumentedMutexLock l(&mutex_);
146 flush_job.PickMemTable();
147 ASSERT_OK(flush_job.Run());
148 }
149 job_context.Clean();
150}
151
152TEST_F(FlushJobTest, NonEmpty) {
153 JobContext job_context(0);
154 auto cfd = versions_->GetColumnFamilySet()->GetDefault();
155 auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(),
156 kMaxSequenceNumber);
157 new_mem->Ref();
158 auto inserted_keys = mock::MakeMockFile();
159 // Test data:
160 // seqno [ 1, 2 ... 8998, 8999, 9000, 9001, 9002 ... 9999 ]
161 // key [ 1001, 1002 ... 9998, 9999, 0, 1, 2 ... 999 ]
162 // range-delete "9995" -> "9999" at seqno 10000
f67539c2 163 // blob references with seqnos 10001..10006
7c673cae
FG
164 for (int i = 1; i < 10000; ++i) {
165 std::string key(ToString((i + 1000) % 10000));
166 std::string value("value" + key);
167 new_mem->Add(SequenceNumber(i), kTypeValue, key, value);
168 if ((i + 1000) % 10000 < 9995) {
169 InternalKey internal_key(key, SequenceNumber(i), kTypeValue);
170 inserted_keys.insert({internal_key.Encode().ToString(), value});
171 }
172 }
f67539c2
TL
173
174 {
175 new_mem->Add(SequenceNumber(10000), kTypeRangeDeletion, "9995", "9999a");
176 InternalKey internal_key("9995", SequenceNumber(10000), kTypeRangeDeletion);
177 inserted_keys.insert({internal_key.Encode().ToString(), "9999a"});
178 }
179
180#ifndef ROCKSDB_LITE
181 // Note: the first two blob references will not be considered when resolving
182 // the oldest blob file referenced (the first one is inlined TTL, while the
183 // second one is TTL and thus points to a TTL blob file).
184 constexpr std::array<uint64_t, 6> blob_file_numbers{
185 kInvalidBlobFileNumber, 5, 103, 17, 102, 101};
186 for (size_t i = 0; i < blob_file_numbers.size(); ++i) {
187 std::string key(ToString(i + 10001));
188 std::string blob_index;
189 if (i == 0) {
190 BlobIndex::EncodeInlinedTTL(&blob_index, /* expiration */ 1234567890ULL,
191 "foo");
192 } else if (i == 1) {
193 BlobIndex::EncodeBlobTTL(&blob_index, /* expiration */ 1234567890ULL,
194 blob_file_numbers[i], /* offset */ i << 10,
195 /* size */ i << 20, kNoCompression);
196 } else {
197 BlobIndex::EncodeBlob(&blob_index, blob_file_numbers[i],
198 /* offset */ i << 10, /* size */ i << 20,
199 kNoCompression);
200 }
201
202 const SequenceNumber seq(i + 10001);
203 new_mem->Add(seq, kTypeBlobIndex, key, blob_index);
204
205 InternalKey internal_key(key, seq, kTypeBlobIndex);
206 inserted_keys.emplace_hint(inserted_keys.end(),
207 internal_key.Encode().ToString(), blob_index);
208 }
209#endif
7c673cae
FG
210
211 autovector<MemTable*> to_delete;
212 cfd->imm()->Add(new_mem, &to_delete);
213 for (auto& m : to_delete) {
214 delete m;
215 }
216
217 EventLogger event_logger(db_options_.info_log.get());
11fdf7f2 218 SnapshotChecker* snapshot_checker = nullptr; // not relavant
7c673cae
FG
219 FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
220 db_options_, *cfd->GetLatestMutableCFOptions(),
494da23a
TL
221 nullptr /* memtable_id */, env_options_, versions_.get(),
222 &mutex_, &shutting_down_, {}, kMaxSequenceNumber,
223 snapshot_checker, &job_context, nullptr, nullptr, nullptr,
224 kNoCompression, db_options_.statistics.get(),
225 &event_logger, true, true /* sync_output_directory */,
226 true /* write_manifest */, Env::Priority::USER);
11fdf7f2
TL
227
228 HistogramData hist;
229 FileMetaData file_meta;
7c673cae
FG
230 mutex_.Lock();
231 flush_job.PickMemTable();
11fdf7f2 232 ASSERT_OK(flush_job.Run(nullptr, &file_meta));
7c673cae 233 mutex_.Unlock();
11fdf7f2
TL
234 db_options_.statistics->histogramData(FLUSH_TIME, &hist);
235 ASSERT_GT(hist.average, 0.0);
236
237 ASSERT_EQ(ToString(0), file_meta.smallest.user_key().ToString());
f67539c2 238 ASSERT_EQ("9999a", file_meta.largest.user_key().ToString());
11fdf7f2 239 ASSERT_EQ(1, file_meta.fd.smallest_seqno);
f67539c2
TL
240#ifndef ROCKSDB_LITE
241 ASSERT_EQ(10006, file_meta.fd.largest_seqno);
242 ASSERT_EQ(17, file_meta.oldest_blob_file_number);
243#else
244 ASSERT_EQ(10000, file_meta.fd.largest_seqno);
245#endif
7c673cae
FG
246 mock_table_factory_->AssertSingleFile(inserted_keys);
247 job_context.Clean();
248}
249
494da23a
TL
250TEST_F(FlushJobTest, FlushMemTablesSingleColumnFamily) {
251 const size_t num_mems = 2;
252 const size_t num_mems_to_flush = 1;
253 const size_t num_keys_per_table = 100;
254 JobContext job_context(0);
255 ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault();
256 std::vector<uint64_t> memtable_ids;
257 std::vector<MemTable*> new_mems;
258 for (size_t i = 0; i != num_mems; ++i) {
259 MemTable* mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(),
260 kMaxSequenceNumber);
261 mem->SetID(i);
262 mem->Ref();
263 new_mems.emplace_back(mem);
264 memtable_ids.push_back(mem->GetID());
265
266 for (size_t j = 0; j < num_keys_per_table; ++j) {
267 std::string key(ToString(j + i * num_keys_per_table));
268 std::string value("value" + key);
269 mem->Add(SequenceNumber(j + i * num_keys_per_table), kTypeValue, key,
270 value);
271 }
272 }
273
274 autovector<MemTable*> to_delete;
275 for (auto mem : new_mems) {
276 cfd->imm()->Add(mem, &to_delete);
277 }
278
279 EventLogger event_logger(db_options_.info_log.get());
280 SnapshotChecker* snapshot_checker = nullptr; // not relavant
281
282 assert(memtable_ids.size() == num_mems);
283 uint64_t smallest_memtable_id = memtable_ids.front();
284 uint64_t flush_memtable_id = smallest_memtable_id + num_mems_to_flush - 1;
285
286 FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
287 db_options_, *cfd->GetLatestMutableCFOptions(),
288 &flush_memtable_id, env_options_, versions_.get(), &mutex_,
289 &shutting_down_, {}, kMaxSequenceNumber, snapshot_checker,
290 &job_context, nullptr, nullptr, nullptr, kNoCompression,
291 db_options_.statistics.get(), &event_logger, true,
292 true /* sync_output_directory */,
293 true /* write_manifest */, Env::Priority::USER);
294 HistogramData hist;
295 FileMetaData file_meta;
296 mutex_.Lock();
297 flush_job.PickMemTable();
298 ASSERT_OK(flush_job.Run(nullptr /* prep_tracker */, &file_meta));
299 mutex_.Unlock();
300 db_options_.statistics->histogramData(FLUSH_TIME, &hist);
301 ASSERT_GT(hist.average, 0.0);
302
303 ASSERT_EQ(ToString(0), file_meta.smallest.user_key().ToString());
304 ASSERT_EQ("99", file_meta.largest.user_key().ToString());
305 ASSERT_EQ(0, file_meta.fd.smallest_seqno);
306 ASSERT_EQ(SequenceNumber(num_mems_to_flush * num_keys_per_table - 1),
307 file_meta.fd.largest_seqno);
f67539c2 308 ASSERT_EQ(kInvalidBlobFileNumber, file_meta.oldest_blob_file_number);
494da23a
TL
309
310 for (auto m : to_delete) {
311 delete m;
312 }
313 to_delete.clear();
314 job_context.Clean();
315}
316
317TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
318 autovector<ColumnFamilyData*> all_cfds;
319 for (auto cfd : *versions_->GetColumnFamilySet()) {
320 all_cfds.push_back(cfd);
321 }
322 const std::vector<size_t> num_memtables = {2, 1, 3};
323 assert(num_memtables.size() == column_family_names_.size());
324 const size_t num_keys_per_memtable = 1000;
325 JobContext job_context(0);
326 std::vector<uint64_t> memtable_ids;
327 std::vector<SequenceNumber> smallest_seqs;
328 std::vector<SequenceNumber> largest_seqs;
329 autovector<MemTable*> to_delete;
330 SequenceNumber curr_seqno = 0;
331 size_t k = 0;
332 for (auto cfd : all_cfds) {
333 smallest_seqs.push_back(curr_seqno);
334 for (size_t i = 0; i != num_memtables[k]; ++i) {
335 MemTable* mem = cfd->ConstructNewMemtable(
336 *cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber);
337 mem->SetID(i);
338 mem->Ref();
339
340 for (size_t j = 0; j != num_keys_per_memtable; ++j) {
341 std::string key(ToString(j + i * num_keys_per_memtable));
342 std::string value("value" + key);
343 mem->Add(curr_seqno++, kTypeValue, key, value);
344 }
345
346 cfd->imm()->Add(mem, &to_delete);
347 }
348 largest_seqs.push_back(curr_seqno - 1);
349 memtable_ids.push_back(num_memtables[k++] - 1);
350 }
351
352 EventLogger event_logger(db_options_.info_log.get());
353 SnapshotChecker* snapshot_checker = nullptr; // not relevant
f67539c2 354 std::vector<std::unique_ptr<FlushJob>> flush_jobs;
494da23a
TL
355 k = 0;
356 for (auto cfd : all_cfds) {
357 std::vector<SequenceNumber> snapshot_seqs;
f67539c2 358 flush_jobs.emplace_back(new FlushJob(
494da23a
TL
359 dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(),
360 &memtable_ids[k], env_options_, versions_.get(), &mutex_,
361 &shutting_down_, snapshot_seqs, kMaxSequenceNumber, snapshot_checker,
362 &job_context, nullptr, nullptr, nullptr, kNoCompression,
363 db_options_.statistics.get(), &event_logger, true,
364 false /* sync_output_directory */, false /* write_manifest */,
f67539c2 365 Env::Priority::USER));
494da23a
TL
366 k++;
367 }
368 HistogramData hist;
369 std::vector<FileMetaData> file_metas;
370 // Call reserve to avoid auto-resizing
371 file_metas.reserve(flush_jobs.size());
372 mutex_.Lock();
373 for (auto& job : flush_jobs) {
f67539c2 374 job->PickMemTable();
494da23a
TL
375 }
376 for (auto& job : flush_jobs) {
377 FileMetaData meta;
378 // Run will release and re-acquire mutex
f67539c2 379 ASSERT_OK(job->Run(nullptr /**/, &meta));
494da23a
TL
380 file_metas.emplace_back(meta);
381 }
382 autovector<FileMetaData*> file_meta_ptrs;
383 for (auto& meta : file_metas) {
384 file_meta_ptrs.push_back(&meta);
385 }
386 autovector<const autovector<MemTable*>*> mems_list;
387 for (size_t i = 0; i != all_cfds.size(); ++i) {
f67539c2 388 const auto& mems = flush_jobs[i]->GetMemTables();
494da23a
TL
389 mems_list.push_back(&mems);
390 }
391 autovector<const MutableCFOptions*> mutable_cf_options_list;
392 for (auto cfd : all_cfds) {
393 mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
394 }
395
396 Status s = InstallMemtableAtomicFlushResults(
397 nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list,
398 versions_.get(), &mutex_, file_meta_ptrs, &job_context.memtables_to_free,
399 nullptr /* db_directory */, nullptr /* log_buffer */);
400 ASSERT_OK(s);
401
402 mutex_.Unlock();
403 db_options_.statistics->histogramData(FLUSH_TIME, &hist);
404 ASSERT_GT(hist.average, 0.0);
405 k = 0;
406 for (const auto& file_meta : file_metas) {
407 ASSERT_EQ(ToString(0), file_meta.smallest.user_key().ToString());
408 ASSERT_EQ("999", file_meta.largest.user_key()
409 .ToString()); // max key by bytewise comparator
410 ASSERT_EQ(smallest_seqs[k], file_meta.fd.smallest_seqno);
411 ASSERT_EQ(largest_seqs[k], file_meta.fd.largest_seqno);
412 // Verify that imm is empty
413 ASSERT_EQ(std::numeric_limits<uint64_t>::max(),
414 all_cfds[k]->imm()->GetEarliestMemTableID());
415 ASSERT_EQ(0, all_cfds[k]->imm()->GetLatestMemTableID());
416 ++k;
417 }
418
419 for (auto m : to_delete) {
420 delete m;
421 }
422 to_delete.clear();
423 job_context.Clean();
424}
425
7c673cae
FG
426TEST_F(FlushJobTest, Snapshots) {
427 JobContext job_context(0);
428 auto cfd = versions_->GetColumnFamilySet()->GetDefault();
429 auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(),
430 kMaxSequenceNumber);
431
7c673cae
FG
432 std::set<SequenceNumber> snapshots_set;
433 int keys = 10000;
434 int max_inserts_per_keys = 8;
435
436 Random rnd(301);
437 for (int i = 0; i < keys / 2; ++i) {
494da23a 438 snapshots_set.insert(rnd.Uniform(keys * (max_inserts_per_keys / 2)) + 1);
7c673cae 439 }
494da23a
TL
440 // set has already removed the duplicate snapshots
441 std::vector<SequenceNumber> snapshots(snapshots_set.begin(),
442 snapshots_set.end());
7c673cae
FG
443
444 new_mem->Ref();
445 SequenceNumber current_seqno = 0;
446 auto inserted_keys = mock::MakeMockFile();
447 for (int i = 1; i < keys; ++i) {
448 std::string key(ToString(i));
449 int insertions = rnd.Uniform(max_inserts_per_keys);
450 for (int j = 0; j < insertions; ++j) {
451 std::string value(test::RandomHumanReadableString(&rnd, 10));
452 auto seqno = ++current_seqno;
453 new_mem->Add(SequenceNumber(seqno), kTypeValue, key, value);
454 // a key is visible only if:
455 // 1. it's the last one written (j == insertions - 1)
456 // 2. there's a snapshot pointing at it
457 bool visible = (j == insertions - 1) ||
458 (snapshots_set.find(seqno) != snapshots_set.end());
459 if (visible) {
460 InternalKey internal_key(key, seqno, kTypeValue);
461 inserted_keys.insert({internal_key.Encode().ToString(), value});
462 }
463 }
464 }
465
466 autovector<MemTable*> to_delete;
467 cfd->imm()->Add(new_mem, &to_delete);
468 for (auto& m : to_delete) {
469 delete m;
470 }
471
472 EventLogger event_logger(db_options_.info_log.get());
11fdf7f2
TL
473 SnapshotChecker* snapshot_checker = nullptr; // not relavant
474 FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
475 db_options_, *cfd->GetLatestMutableCFOptions(),
494da23a
TL
476 nullptr /* memtable_id */, env_options_, versions_.get(),
477 &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber,
478 snapshot_checker, &job_context, nullptr, nullptr, nullptr,
479 kNoCompression, db_options_.statistics.get(),
480 &event_logger, true, true /* sync_output_directory */,
481 true /* write_manifest */, Env::Priority::USER);
7c673cae
FG
482 mutex_.Lock();
483 flush_job.PickMemTable();
484 ASSERT_OK(flush_job.Run());
485 mutex_.Unlock();
486 mock_table_factory_->AssertSingleFile(inserted_keys);
11fdf7f2
TL
487 HistogramData hist;
488 db_options_.statistics->histogramData(FLUSH_TIME, &hist);
489 ASSERT_GT(hist.average, 0.0);
7c673cae
FG
490 job_context.Clean();
491}
492
f67539c2 493} // namespace ROCKSDB_NAMESPACE
7c673cae
FG
494
495int main(int argc, char** argv) {
496 ::testing::InitGoogleTest(&argc, argv);
497 return RUN_ALL_TESTS();
498}