]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae | 5 | |
20effc67 TL |
6 | #include "db/flush_job.h" |
7 | ||
7c673cae | 8 | #include <algorithm> |
f67539c2 | 9 | #include <array> |
7c673cae FG |
10 | #include <map> |
11 | #include <string> | |
12 | ||
20effc67 | 13 | #include "db/blob/blob_index.h" |
7c673cae | 14 | #include "db/column_family.h" |
f67539c2 | 15 | #include "db/db_impl/db_impl.h" |
7c673cae | 16 | #include "db/version_set.h" |
f67539c2 | 17 | #include "file/writable_file_writer.h" |
7c673cae | 18 | #include "rocksdb/cache.h" |
1e59de90 | 19 | #include "rocksdb/file_system.h" |
7c673cae FG |
20 | #include "rocksdb/write_buffer_manager.h" |
21 | #include "table/mock_table.h" | |
f67539c2 TL |
22 | #include "test_util/testharness.h" |
23 | #include "test_util/testutil.h" | |
20effc67 | 24 | #include "util/random.h" |
7c673cae | 25 | #include "util/string_util.h" |
7c673cae | 26 | |
f67539c2 | 27 | namespace ROCKSDB_NAMESPACE { |
7c673cae FG |
28 | |
29 | // TODO(icanadi) Mock out everything else: | |
30 | // 1. VersionSet | |
31 | // 2. Memtable | |
20effc67 TL |
32 | class FlushJobTestBase : public testing::Test { |
33 | protected: | |
34 | FlushJobTestBase(std::string dbname, const Comparator* ucmp) | |
7c673cae | 35 | : env_(Env::Default()), |
1e59de90 | 36 | fs_(env_->GetFileSystem()), |
20effc67 TL |
37 | dbname_(std::move(dbname)), |
38 | ucmp_(ucmp), | |
7c673cae FG |
39 | options_(), |
40 | db_options_(options_), | |
494da23a | 41 | column_family_names_({kDefaultColumnFamilyName, "foo", "bar"}), |
7c673cae FG |
42 | table_cache_(NewLRUCache(50000, 16)), |
43 | write_buffer_manager_(db_options_.db_write_buffer_size), | |
7c673cae | 44 | shutting_down_(false), |
20effc67 | 45 | mock_table_factory_(new mock::MockTableFactory()) {} |
7c673cae | 46 | |
20effc67 TL |
47 | virtual ~FlushJobTestBase() { |
48 | if (getenv("KEEP_DB")) { | |
49 | fprintf(stdout, "db is still in %s\n", dbname_.c_str()); | |
50 | } else { | |
1e59de90 TL |
51 | // destroy versions_ to release all file handles |
52 | versions_.reset(); | |
20effc67 TL |
53 | EXPECT_OK(DestroyDir(env_, dbname_)); |
54 | } | |
7c673cae FG |
55 | } |
56 | ||
57 | void NewDB() { | |
1e59de90 | 58 | ASSERT_OK(SetIdentityFile(env_, dbname_)); |
7c673cae | 59 | VersionEdit new_db; |
20effc67 | 60 | |
7c673cae FG |
61 | new_db.SetLogNumber(0); |
62 | new_db.SetNextFile(2); | |
63 | new_db.SetLastSequence(0); | |
64 | ||
494da23a TL |
65 | autovector<VersionEdit> new_cfs; |
66 | SequenceNumber last_seq = 1; | |
67 | uint32_t cf_id = 1; | |
68 | for (size_t i = 1; i != column_family_names_.size(); ++i) { | |
69 | VersionEdit new_cf; | |
70 | new_cf.AddColumnFamily(column_family_names_[i]); | |
71 | new_cf.SetColumnFamily(cf_id++); | |
20effc67 | 72 | new_cf.SetComparatorName(ucmp_->Name()); |
494da23a TL |
73 | new_cf.SetLogNumber(0); |
74 | new_cf.SetNextFile(2); | |
75 | new_cf.SetLastSequence(last_seq++); | |
76 | new_cfs.emplace_back(new_cf); | |
77 | } | |
78 | ||
7c673cae | 79 | const std::string manifest = DescriptorFileName(dbname_, 1); |
1e59de90 TL |
80 | const auto& fs = env_->GetFileSystem(); |
81 | std::unique_ptr<WritableFileWriter> file_writer; | |
82 | Status s = WritableFileWriter::Create( | |
83 | fs, manifest, fs->OptimizeForManifestWrite(env_options_), &file_writer, | |
84 | nullptr); | |
7c673cae | 85 | ASSERT_OK(s); |
1e59de90 | 86 | |
7c673cae FG |
87 | { |
88 | log::Writer log(std::move(file_writer), 0, false); | |
89 | std::string record; | |
90 | new_db.EncodeTo(&record); | |
91 | s = log.AddRecord(record); | |
1e59de90 | 92 | ASSERT_OK(s); |
494da23a TL |
93 | |
94 | for (const auto& e : new_cfs) { | |
95 | record.clear(); | |
96 | e.EncodeTo(&record); | |
97 | s = log.AddRecord(record); | |
98 | ASSERT_OK(s); | |
99 | } | |
7c673cae FG |
100 | } |
101 | ASSERT_OK(s); | |
102 | // Make "CURRENT" file that points to the new manifest file. | |
20effc67 TL |
103 | s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr); |
104 | ASSERT_OK(s); | |
105 | } | |
106 | ||
107 | void SetUp() override { | |
108 | EXPECT_OK(env_->CreateDirIfMissing(dbname_)); | |
109 | ||
110 | // TODO(icanadi) Remove this once we mock out VersionSet | |
111 | NewDB(); | |
112 | ||
113 | db_options_.env = env_; | |
114 | db_options_.fs = fs_; | |
115 | db_options_.db_paths.emplace_back(dbname_, | |
116 | std::numeric_limits<uint64_t>::max()); | |
117 | db_options_.statistics = CreateDBStatistics(); | |
118 | ||
119 | cf_options_.comparator = ucmp_; | |
120 | ||
121 | std::vector<ColumnFamilyDescriptor> column_families; | |
122 | cf_options_.table_factory = mock_table_factory_; | |
123 | for (const auto& cf_name : column_family_names_) { | |
124 | column_families.emplace_back(cf_name, cf_options_); | |
125 | } | |
126 | ||
127 | versions_.reset( | |
128 | new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), | |
129 | &write_buffer_manager_, &write_controller_, | |
1e59de90 TL |
130 | /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, |
131 | /*db_id*/ "", /*db_session_id*/ "")); | |
20effc67 | 132 | EXPECT_OK(versions_->Recover(column_families, false)); |
7c673cae FG |
133 | } |
134 | ||
135 | Env* env_; | |
f67539c2 | 136 | std::shared_ptr<FileSystem> fs_; |
7c673cae | 137 | std::string dbname_; |
20effc67 | 138 | const Comparator* const ucmp_; |
7c673cae FG |
139 | EnvOptions env_options_; |
140 | Options options_; | |
141 | ImmutableDBOptions db_options_; | |
494da23a | 142 | const std::vector<std::string> column_family_names_; |
7c673cae FG |
143 | std::shared_ptr<Cache> table_cache_; |
144 | WriteController write_controller_; | |
145 | WriteBufferManager write_buffer_manager_; | |
146 | ColumnFamilyOptions cf_options_; | |
147 | std::unique_ptr<VersionSet> versions_; | |
148 | InstrumentedMutex mutex_; | |
149 | std::atomic<bool> shutting_down_; | |
150 | std::shared_ptr<mock::MockTableFactory> mock_table_factory_; | |
1e59de90 TL |
151 | |
152 | SeqnoToTimeMapping empty_seqno_to_time_mapping_; | |
7c673cae FG |
153 | }; |
154 | ||
20effc67 TL |
155 | class FlushJobTest : public FlushJobTestBase { |
156 | public: | |
157 | FlushJobTest() | |
158 | : FlushJobTestBase(test::PerThreadDBPath("flush_job_test"), | |
159 | BytewiseComparator()) {} | |
160 | }; | |
161 | ||
7c673cae FG |
162 | TEST_F(FlushJobTest, Empty) { |
163 | JobContext job_context(0); | |
164 | auto cfd = versions_->GetColumnFamilySet()->GetDefault(); | |
165 | EventLogger event_logger(db_options_.info_log.get()); | |
11fdf7f2 | 166 | SnapshotChecker* snapshot_checker = nullptr; // not relavant |
1e59de90 TL |
167 | FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), |
168 | db_options_, *cfd->GetLatestMutableCFOptions(), | |
169 | std::numeric_limits<uint64_t>::max() /* memtable_id */, | |
170 | env_options_, versions_.get(), &mutex_, &shutting_down_, | |
171 | {}, kMaxSequenceNumber, snapshot_checker, &job_context, | |
172 | nullptr, nullptr, nullptr, kNoCompression, nullptr, | |
173 | &event_logger, false, true /* sync_output_directory */, | |
174 | true /* write_manifest */, Env::Priority::USER, | |
175 | nullptr /*IOTracer*/, empty_seqno_to_time_mapping_); | |
7c673cae FG |
176 | { |
177 | InstrumentedMutexLock l(&mutex_); | |
178 | flush_job.PickMemTable(); | |
179 | ASSERT_OK(flush_job.Run()); | |
180 | } | |
181 | job_context.Clean(); | |
182 | } | |
183 | ||
184 | TEST_F(FlushJobTest, NonEmpty) { | |
185 | JobContext job_context(0); | |
186 | auto cfd = versions_->GetColumnFamilySet()->GetDefault(); | |
187 | auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(), | |
188 | kMaxSequenceNumber); | |
189 | new_mem->Ref(); | |
190 | auto inserted_keys = mock::MakeMockFile(); | |
191 | // Test data: | |
192 | // seqno [ 1, 2 ... 8998, 8999, 9000, 9001, 9002 ... 9999 ] | |
193 | // key [ 1001, 1002 ... 9998, 9999, 0, 1, 2 ... 999 ] | |
194 | // range-delete "9995" -> "9999" at seqno 10000 | |
f67539c2 | 195 | // blob references with seqnos 10001..10006 |
7c673cae | 196 | for (int i = 1; i < 10000; ++i) { |
1e59de90 | 197 | std::string key(std::to_string((i + 1000) % 10000)); |
7c673cae | 198 | std::string value("value" + key); |
1e59de90 TL |
199 | ASSERT_OK(new_mem->Add(SequenceNumber(i), kTypeValue, key, value, |
200 | nullptr /* kv_prot_info */)); | |
7c673cae FG |
201 | if ((i + 1000) % 10000 < 9995) { |
202 | InternalKey internal_key(key, SequenceNumber(i), kTypeValue); | |
20effc67 | 203 | inserted_keys.push_back({internal_key.Encode().ToString(), value}); |
7c673cae FG |
204 | } |
205 | } | |
f67539c2 TL |
206 | |
207 | { | |
1e59de90 TL |
208 | ASSERT_OK(new_mem->Add(SequenceNumber(10000), kTypeRangeDeletion, "9995", |
209 | "9999a", nullptr /* kv_prot_info */)); | |
f67539c2 | 210 | InternalKey internal_key("9995", SequenceNumber(10000), kTypeRangeDeletion); |
20effc67 | 211 | inserted_keys.push_back({internal_key.Encode().ToString(), "9999a"}); |
f67539c2 TL |
212 | } |
213 | ||
f67539c2 TL |
214 | // Note: the first two blob references will not be considered when resolving |
215 | // the oldest blob file referenced (the first one is inlined TTL, while the | |
216 | // second one is TTL and thus points to a TTL blob file). | |
1e59de90 TL |
217 | constexpr std::array<uint64_t, 6> blob_file_numbers{ |
218 | {kInvalidBlobFileNumber, 5, 103, 17, 102, 101}}; | |
f67539c2 | 219 | for (size_t i = 0; i < blob_file_numbers.size(); ++i) { |
1e59de90 | 220 | std::string key(std::to_string(i + 10001)); |
f67539c2 TL |
221 | std::string blob_index; |
222 | if (i == 0) { | |
223 | BlobIndex::EncodeInlinedTTL(&blob_index, /* expiration */ 1234567890ULL, | |
224 | "foo"); | |
225 | } else if (i == 1) { | |
226 | BlobIndex::EncodeBlobTTL(&blob_index, /* expiration */ 1234567890ULL, | |
227 | blob_file_numbers[i], /* offset */ i << 10, | |
228 | /* size */ i << 20, kNoCompression); | |
229 | } else { | |
230 | BlobIndex::EncodeBlob(&blob_index, blob_file_numbers[i], | |
231 | /* offset */ i << 10, /* size */ i << 20, | |
232 | kNoCompression); | |
233 | } | |
234 | ||
235 | const SequenceNumber seq(i + 10001); | |
1e59de90 TL |
236 | ASSERT_OK(new_mem->Add(seq, kTypeBlobIndex, key, blob_index, |
237 | nullptr /* kv_prot_info */)); | |
f67539c2 TL |
238 | |
239 | InternalKey internal_key(key, seq, kTypeBlobIndex); | |
20effc67 | 240 | inserted_keys.push_back({internal_key.Encode().ToString(), blob_index}); |
f67539c2 | 241 | } |
20effc67 | 242 | mock::SortKVVector(&inserted_keys); |
7c673cae FG |
243 | |
244 | autovector<MemTable*> to_delete; | |
1e59de90 | 245 | new_mem->ConstructFragmentedRangeTombstones(); |
7c673cae FG |
246 | cfd->imm()->Add(new_mem, &to_delete); |
247 | for (auto& m : to_delete) { | |
248 | delete m; | |
249 | } | |
250 | ||
251 | EventLogger event_logger(db_options_.info_log.get()); | |
11fdf7f2 | 252 | SnapshotChecker* snapshot_checker = nullptr; // not relavant |
20effc67 TL |
253 | FlushJob flush_job( |
254 | dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, | |
1e59de90 TL |
255 | *cfd->GetLatestMutableCFOptions(), |
256 | std::numeric_limits<uint64_t>::max() /* memtable_id */, env_options_, | |
257 | versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber, | |
258 | snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, | |
259 | db_options_.statistics.get(), &event_logger, true, | |
260 | true /* sync_output_directory */, true /* write_manifest */, | |
261 | Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_); | |
11fdf7f2 TL |
262 | |
263 | HistogramData hist; | |
264 | FileMetaData file_meta; | |
7c673cae FG |
265 | mutex_.Lock(); |
266 | flush_job.PickMemTable(); | |
11fdf7f2 | 267 | ASSERT_OK(flush_job.Run(nullptr, &file_meta)); |
7c673cae | 268 | mutex_.Unlock(); |
11fdf7f2 TL |
269 | db_options_.statistics->histogramData(FLUSH_TIME, &hist); |
270 | ASSERT_GT(hist.average, 0.0); | |
271 | ||
1e59de90 | 272 | ASSERT_EQ(std::to_string(0), file_meta.smallest.user_key().ToString()); |
f67539c2 | 273 | ASSERT_EQ("9999a", file_meta.largest.user_key().ToString()); |
11fdf7f2 | 274 | ASSERT_EQ(1, file_meta.fd.smallest_seqno); |
f67539c2 TL |
275 | ASSERT_EQ(10006, file_meta.fd.largest_seqno); |
276 | ASSERT_EQ(17, file_meta.oldest_blob_file_number); | |
7c673cae FG |
277 | mock_table_factory_->AssertSingleFile(inserted_keys); |
278 | job_context.Clean(); | |
279 | } | |
280 | ||
494da23a TL |
281 | TEST_F(FlushJobTest, FlushMemTablesSingleColumnFamily) { |
282 | const size_t num_mems = 2; | |
283 | const size_t num_mems_to_flush = 1; | |
284 | const size_t num_keys_per_table = 100; | |
285 | JobContext job_context(0); | |
286 | ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault(); | |
287 | std::vector<uint64_t> memtable_ids; | |
288 | std::vector<MemTable*> new_mems; | |
289 | for (size_t i = 0; i != num_mems; ++i) { | |
290 | MemTable* mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(), | |
291 | kMaxSequenceNumber); | |
292 | mem->SetID(i); | |
293 | mem->Ref(); | |
294 | new_mems.emplace_back(mem); | |
295 | memtable_ids.push_back(mem->GetID()); | |
296 | ||
297 | for (size_t j = 0; j < num_keys_per_table; ++j) { | |
1e59de90 | 298 | std::string key(std::to_string(j + i * num_keys_per_table)); |
494da23a | 299 | std::string value("value" + key); |
1e59de90 TL |
300 | ASSERT_OK(mem->Add(SequenceNumber(j + i * num_keys_per_table), kTypeValue, |
301 | key, value, nullptr /* kv_prot_info */)); | |
494da23a TL |
302 | } |
303 | } | |
304 | ||
305 | autovector<MemTable*> to_delete; | |
306 | for (auto mem : new_mems) { | |
1e59de90 | 307 | mem->ConstructFragmentedRangeTombstones(); |
494da23a TL |
308 | cfd->imm()->Add(mem, &to_delete); |
309 | } | |
310 | ||
311 | EventLogger event_logger(db_options_.info_log.get()); | |
312 | SnapshotChecker* snapshot_checker = nullptr; // not relavant | |
313 | ||
314 | assert(memtable_ids.size() == num_mems); | |
315 | uint64_t smallest_memtable_id = memtable_ids.front(); | |
316 | uint64_t flush_memtable_id = smallest_memtable_id + num_mems_to_flush - 1; | |
20effc67 TL |
317 | FlushJob flush_job( |
318 | dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, | |
1e59de90 | 319 | *cfd->GetLatestMutableCFOptions(), flush_memtable_id, env_options_, |
20effc67 TL |
320 | versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber, |
321 | snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, | |
322 | db_options_.statistics.get(), &event_logger, true, | |
323 | true /* sync_output_directory */, true /* write_manifest */, | |
1e59de90 | 324 | Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_); |
494da23a TL |
325 | HistogramData hist; |
326 | FileMetaData file_meta; | |
327 | mutex_.Lock(); | |
328 | flush_job.PickMemTable(); | |
329 | ASSERT_OK(flush_job.Run(nullptr /* prep_tracker */, &file_meta)); | |
330 | mutex_.Unlock(); | |
331 | db_options_.statistics->histogramData(FLUSH_TIME, &hist); | |
332 | ASSERT_GT(hist.average, 0.0); | |
333 | ||
1e59de90 | 334 | ASSERT_EQ(std::to_string(0), file_meta.smallest.user_key().ToString()); |
494da23a TL |
335 | ASSERT_EQ("99", file_meta.largest.user_key().ToString()); |
336 | ASSERT_EQ(0, file_meta.fd.smallest_seqno); | |
337 | ASSERT_EQ(SequenceNumber(num_mems_to_flush * num_keys_per_table - 1), | |
338 | file_meta.fd.largest_seqno); | |
f67539c2 | 339 | ASSERT_EQ(kInvalidBlobFileNumber, file_meta.oldest_blob_file_number); |
494da23a TL |
340 | |
341 | for (auto m : to_delete) { | |
342 | delete m; | |
343 | } | |
344 | to_delete.clear(); | |
345 | job_context.Clean(); | |
346 | } | |
347 | ||
348 | TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { | |
349 | autovector<ColumnFamilyData*> all_cfds; | |
350 | for (auto cfd : *versions_->GetColumnFamilySet()) { | |
351 | all_cfds.push_back(cfd); | |
352 | } | |
353 | const std::vector<size_t> num_memtables = {2, 1, 3}; | |
354 | assert(num_memtables.size() == column_family_names_.size()); | |
355 | const size_t num_keys_per_memtable = 1000; | |
356 | JobContext job_context(0); | |
357 | std::vector<uint64_t> memtable_ids; | |
358 | std::vector<SequenceNumber> smallest_seqs; | |
359 | std::vector<SequenceNumber> largest_seqs; | |
360 | autovector<MemTable*> to_delete; | |
361 | SequenceNumber curr_seqno = 0; | |
362 | size_t k = 0; | |
363 | for (auto cfd : all_cfds) { | |
364 | smallest_seqs.push_back(curr_seqno); | |
365 | for (size_t i = 0; i != num_memtables[k]; ++i) { | |
366 | MemTable* mem = cfd->ConstructNewMemtable( | |
367 | *cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber); | |
368 | mem->SetID(i); | |
369 | mem->Ref(); | |
370 | ||
371 | for (size_t j = 0; j != num_keys_per_memtable; ++j) { | |
1e59de90 | 372 | std::string key(std::to_string(j + i * num_keys_per_memtable)); |
494da23a | 373 | std::string value("value" + key); |
1e59de90 TL |
374 | ASSERT_OK(mem->Add(curr_seqno++, kTypeValue, key, value, |
375 | nullptr /* kv_prot_info */)); | |
494da23a | 376 | } |
1e59de90 | 377 | mem->ConstructFragmentedRangeTombstones(); |
494da23a TL |
378 | cfd->imm()->Add(mem, &to_delete); |
379 | } | |
380 | largest_seqs.push_back(curr_seqno - 1); | |
381 | memtable_ids.push_back(num_memtables[k++] - 1); | |
382 | } | |
383 | ||
384 | EventLogger event_logger(db_options_.info_log.get()); | |
385 | SnapshotChecker* snapshot_checker = nullptr; // not relevant | |
f67539c2 | 386 | std::vector<std::unique_ptr<FlushJob>> flush_jobs; |
494da23a TL |
387 | k = 0; |
388 | for (auto cfd : all_cfds) { | |
389 | std::vector<SequenceNumber> snapshot_seqs; | |
f67539c2 | 390 | flush_jobs.emplace_back(new FlushJob( |
494da23a | 391 | dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(), |
1e59de90 | 392 | memtable_ids[k], env_options_, versions_.get(), &mutex_, |
494da23a TL |
393 | &shutting_down_, snapshot_seqs, kMaxSequenceNumber, snapshot_checker, |
394 | &job_context, nullptr, nullptr, nullptr, kNoCompression, | |
395 | db_options_.statistics.get(), &event_logger, true, | |
396 | false /* sync_output_directory */, false /* write_manifest */, | |
1e59de90 TL |
397 | Env::Priority::USER, nullptr /*IOTracer*/, |
398 | empty_seqno_to_time_mapping_)); | |
494da23a TL |
399 | k++; |
400 | } | |
401 | HistogramData hist; | |
402 | std::vector<FileMetaData> file_metas; | |
403 | // Call reserve to avoid auto-resizing | |
404 | file_metas.reserve(flush_jobs.size()); | |
405 | mutex_.Lock(); | |
406 | for (auto& job : flush_jobs) { | |
f67539c2 | 407 | job->PickMemTable(); |
494da23a TL |
408 | } |
409 | for (auto& job : flush_jobs) { | |
410 | FileMetaData meta; | |
411 | // Run will release and re-acquire mutex | |
f67539c2 | 412 | ASSERT_OK(job->Run(nullptr /**/, &meta)); |
494da23a TL |
413 | file_metas.emplace_back(meta); |
414 | } | |
415 | autovector<FileMetaData*> file_meta_ptrs; | |
416 | for (auto& meta : file_metas) { | |
417 | file_meta_ptrs.push_back(&meta); | |
418 | } | |
419 | autovector<const autovector<MemTable*>*> mems_list; | |
420 | for (size_t i = 0; i != all_cfds.size(); ++i) { | |
f67539c2 | 421 | const auto& mems = flush_jobs[i]->GetMemTables(); |
494da23a TL |
422 | mems_list.push_back(&mems); |
423 | } | |
424 | autovector<const MutableCFOptions*> mutable_cf_options_list; | |
425 | for (auto cfd : all_cfds) { | |
426 | mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions()); | |
427 | } | |
1e59de90 TL |
428 | autovector<std::list<std::unique_ptr<FlushJobInfo>>*> |
429 | committed_flush_jobs_info; | |
430 | #ifndef ROCKSDB_LITE | |
431 | for (auto& job : flush_jobs) { | |
432 | committed_flush_jobs_info.push_back(job->GetCommittedFlushJobsInfo()); | |
433 | } | |
434 | #endif //! ROCKSDB_LITE | |
494da23a TL |
435 | |
436 | Status s = InstallMemtableAtomicFlushResults( | |
437 | nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list, | |
1e59de90 TL |
438 | versions_.get(), nullptr /* prep_tracker */, &mutex_, file_meta_ptrs, |
439 | committed_flush_jobs_info, &job_context.memtables_to_free, | |
494da23a TL |
440 | nullptr /* db_directory */, nullptr /* log_buffer */); |
441 | ASSERT_OK(s); | |
442 | ||
443 | mutex_.Unlock(); | |
444 | db_options_.statistics->histogramData(FLUSH_TIME, &hist); | |
445 | ASSERT_GT(hist.average, 0.0); | |
446 | k = 0; | |
447 | for (const auto& file_meta : file_metas) { | |
1e59de90 | 448 | ASSERT_EQ(std::to_string(0), file_meta.smallest.user_key().ToString()); |
494da23a TL |
449 | ASSERT_EQ("999", file_meta.largest.user_key() |
450 | .ToString()); // max key by bytewise comparator | |
451 | ASSERT_EQ(smallest_seqs[k], file_meta.fd.smallest_seqno); | |
452 | ASSERT_EQ(largest_seqs[k], file_meta.fd.largest_seqno); | |
453 | // Verify that imm is empty | |
454 | ASSERT_EQ(std::numeric_limits<uint64_t>::max(), | |
455 | all_cfds[k]->imm()->GetEarliestMemTableID()); | |
456 | ASSERT_EQ(0, all_cfds[k]->imm()->GetLatestMemTableID()); | |
457 | ++k; | |
458 | } | |
459 | ||
460 | for (auto m : to_delete) { | |
461 | delete m; | |
462 | } | |
463 | to_delete.clear(); | |
464 | job_context.Clean(); | |
465 | } | |
466 | ||
7c673cae FG |
467 | TEST_F(FlushJobTest, Snapshots) { |
468 | JobContext job_context(0); | |
469 | auto cfd = versions_->GetColumnFamilySet()->GetDefault(); | |
470 | auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(), | |
471 | kMaxSequenceNumber); | |
472 | ||
7c673cae FG |
473 | std::set<SequenceNumber> snapshots_set; |
474 | int keys = 10000; | |
475 | int max_inserts_per_keys = 8; | |
476 | ||
477 | Random rnd(301); | |
478 | for (int i = 0; i < keys / 2; ++i) { | |
494da23a | 479 | snapshots_set.insert(rnd.Uniform(keys * (max_inserts_per_keys / 2)) + 1); |
7c673cae | 480 | } |
494da23a TL |
481 | // set has already removed the duplicate snapshots |
482 | std::vector<SequenceNumber> snapshots(snapshots_set.begin(), | |
483 | snapshots_set.end()); | |
7c673cae FG |
484 | |
485 | new_mem->Ref(); | |
486 | SequenceNumber current_seqno = 0; | |
487 | auto inserted_keys = mock::MakeMockFile(); | |
488 | for (int i = 1; i < keys; ++i) { | |
1e59de90 | 489 | std::string key(std::to_string(i)); |
7c673cae FG |
490 | int insertions = rnd.Uniform(max_inserts_per_keys); |
491 | for (int j = 0; j < insertions; ++j) { | |
20effc67 | 492 | std::string value(rnd.HumanReadableString(10)); |
7c673cae | 493 | auto seqno = ++current_seqno; |
1e59de90 TL |
494 | ASSERT_OK(new_mem->Add(SequenceNumber(seqno), kTypeValue, key, value, |
495 | nullptr /* kv_prot_info */)); | |
7c673cae FG |
496 | // a key is visible only if: |
497 | // 1. it's the last one written (j == insertions - 1) | |
498 | // 2. there's a snapshot pointing at it | |
499 | bool visible = (j == insertions - 1) || | |
500 | (snapshots_set.find(seqno) != snapshots_set.end()); | |
501 | if (visible) { | |
502 | InternalKey internal_key(key, seqno, kTypeValue); | |
20effc67 | 503 | inserted_keys.push_back({internal_key.Encode().ToString(), value}); |
7c673cae FG |
504 | } |
505 | } | |
506 | } | |
20effc67 | 507 | mock::SortKVVector(&inserted_keys); |
7c673cae FG |
508 | |
509 | autovector<MemTable*> to_delete; | |
1e59de90 | 510 | new_mem->ConstructFragmentedRangeTombstones(); |
7c673cae FG |
511 | cfd->imm()->Add(new_mem, &to_delete); |
512 | for (auto& m : to_delete) { | |
513 | delete m; | |
514 | } | |
515 | ||
516 | EventLogger event_logger(db_options_.info_log.get()); | |
11fdf7f2 | 517 | SnapshotChecker* snapshot_checker = nullptr; // not relavant |
20effc67 TL |
518 | FlushJob flush_job( |
519 | dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, | |
1e59de90 TL |
520 | *cfd->GetLatestMutableCFOptions(), |
521 | std::numeric_limits<uint64_t>::max() /* memtable_id */, env_options_, | |
522 | versions_.get(), &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber, | |
523 | snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, | |
524 | db_options_.statistics.get(), &event_logger, true, | |
525 | true /* sync_output_directory */, true /* write_manifest */, | |
526 | Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_); | |
7c673cae FG |
527 | mutex_.Lock(); |
528 | flush_job.PickMemTable(); | |
529 | ASSERT_OK(flush_job.Run()); | |
530 | mutex_.Unlock(); | |
531 | mock_table_factory_->AssertSingleFile(inserted_keys); | |
11fdf7f2 TL |
532 | HistogramData hist; |
533 | db_options_.statistics->histogramData(FLUSH_TIME, &hist); | |
534 | ASSERT_GT(hist.average, 0.0); | |
7c673cae FG |
535 | job_context.Clean(); |
536 | } | |
537 | ||
1e59de90 TL |
538 | TEST_F(FlushJobTest, GetRateLimiterPriorityForWrite) { |
539 | // Prepare a FlushJob that flush MemTables of Single Column Family. | |
540 | const size_t num_mems = 2; | |
541 | const size_t num_mems_to_flush = 1; | |
542 | const size_t num_keys_per_table = 100; | |
543 | JobContext job_context(0); | |
544 | ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault(); | |
545 | std::vector<uint64_t> memtable_ids; | |
546 | std::vector<MemTable*> new_mems; | |
547 | for (size_t i = 0; i != num_mems; ++i) { | |
548 | MemTable* mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(), | |
549 | kMaxSequenceNumber); | |
550 | mem->SetID(i); | |
551 | mem->Ref(); | |
552 | new_mems.emplace_back(mem); | |
553 | memtable_ids.push_back(mem->GetID()); | |
554 | ||
555 | for (size_t j = 0; j < num_keys_per_table; ++j) { | |
556 | std::string key(std::to_string(j + i * num_keys_per_table)); | |
557 | std::string value("value" + key); | |
558 | ASSERT_OK(mem->Add(SequenceNumber(j + i * num_keys_per_table), kTypeValue, | |
559 | key, value, nullptr /* kv_prot_info */)); | |
560 | } | |
561 | } | |
562 | ||
563 | autovector<MemTable*> to_delete; | |
564 | for (auto mem : new_mems) { | |
565 | mem->ConstructFragmentedRangeTombstones(); | |
566 | cfd->imm()->Add(mem, &to_delete); | |
567 | } | |
568 | ||
569 | EventLogger event_logger(db_options_.info_log.get()); | |
570 | SnapshotChecker* snapshot_checker = nullptr; // not relavant | |
571 | ||
572 | assert(memtable_ids.size() == num_mems); | |
573 | uint64_t smallest_memtable_id = memtable_ids.front(); | |
574 | uint64_t flush_memtable_id = smallest_memtable_id + num_mems_to_flush - 1; | |
575 | FlushJob flush_job( | |
576 | dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, | |
577 | *cfd->GetLatestMutableCFOptions(), flush_memtable_id, env_options_, | |
578 | versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber, | |
579 | snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, | |
580 | db_options_.statistics.get(), &event_logger, true, | |
581 | true /* sync_output_directory */, true /* write_manifest */, | |
582 | Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_); | |
583 | ||
584 | // When the state from WriteController is normal. | |
585 | ASSERT_EQ(flush_job.GetRateLimiterPriorityForWrite(), Env::IO_HIGH); | |
586 | ||
587 | WriteController* write_controller = | |
588 | flush_job.versions_->GetColumnFamilySet()->write_controller(); | |
589 | ||
590 | { | |
591 | // When the state from WriteController is Delayed. | |
592 | std::unique_ptr<WriteControllerToken> delay_token = | |
593 | write_controller->GetDelayToken(1000000); | |
594 | ASSERT_EQ(flush_job.GetRateLimiterPriorityForWrite(), Env::IO_USER); | |
595 | } | |
596 | ||
597 | { | |
598 | // When the state from WriteController is Stopped. | |
599 | std::unique_ptr<WriteControllerToken> stop_token = | |
600 | write_controller->GetStopToken(); | |
601 | ASSERT_EQ(flush_job.GetRateLimiterPriorityForWrite(), Env::IO_USER); | |
602 | } | |
603 | } | |
604 | ||
20effc67 TL |
605 | class FlushJobTimestampTest : public FlushJobTestBase { |
606 | public: | |
607 | FlushJobTimestampTest() | |
608 | : FlushJobTestBase(test::PerThreadDBPath("flush_job_ts_gc_test"), | |
1e59de90 | 609 | test::BytewiseComparatorWithU64TsWrapper()) {} |
20effc67 TL |
610 | |
611 | void AddKeyValueToMemtable(MemTable* memtable, std::string key, uint64_t ts, | |
612 | SequenceNumber seq, ValueType value_type, | |
613 | Slice value) { | |
614 | std::string key_str(std::move(key)); | |
615 | PutFixed64(&key_str, ts); | |
1e59de90 TL |
616 | ASSERT_OK(memtable->Add(seq, value_type, key_str, value, |
617 | nullptr /* kv_prot_info */)); | |
20effc67 TL |
618 | } |
619 | ||
620 | protected: | |
621 | static constexpr uint64_t kStartTs = 10; | |
622 | static constexpr SequenceNumber kStartSeq = 0; | |
623 | SequenceNumber curr_seq_{kStartSeq}; | |
624 | std::atomic<uint64_t> curr_ts_{kStartTs}; | |
625 | }; | |
626 | ||
627 | TEST_F(FlushJobTimestampTest, AllKeysExpired) { | |
628 | ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault(); | |
629 | autovector<MemTable*> to_delete; | |
630 | ||
631 | { | |
632 | MemTable* new_mem = cfd->ConstructNewMemtable( | |
633 | *cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber); | |
634 | new_mem->Ref(); | |
635 | for (int i = 0; i < 100; ++i) { | |
636 | uint64_t ts = curr_ts_.fetch_add(1); | |
637 | SequenceNumber seq = (curr_seq_++); | |
638 | AddKeyValueToMemtable(new_mem, test::EncodeInt(0), ts, seq, | |
639 | ValueType::kTypeValue, "0_value"); | |
640 | } | |
641 | uint64_t ts = curr_ts_.fetch_add(1); | |
642 | SequenceNumber seq = (curr_seq_++); | |
643 | AddKeyValueToMemtable(new_mem, test::EncodeInt(0), ts, seq, | |
644 | ValueType::kTypeDeletionWithTimestamp, ""); | |
1e59de90 | 645 | new_mem->ConstructFragmentedRangeTombstones(); |
20effc67 TL |
646 | cfd->imm()->Add(new_mem, &to_delete); |
647 | } | |
648 | ||
649 | std::vector<SequenceNumber> snapshots; | |
650 | constexpr SnapshotChecker* const snapshot_checker = nullptr; | |
651 | JobContext job_context(0); | |
652 | EventLogger event_logger(db_options_.info_log.get()); | |
653 | std::string full_history_ts_low; | |
654 | PutFixed64(&full_history_ts_low, std::numeric_limits<uint64_t>::max()); | |
655 | FlushJob flush_job( | |
656 | dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(), | |
1e59de90 TL |
657 | std::numeric_limits<uint64_t>::max() /* memtable_id */, env_options_, |
658 | versions_.get(), &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber, | |
659 | snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, | |
20effc67 TL |
660 | db_options_.statistics.get(), &event_logger, true, |
661 | true /* sync_output_directory */, true /* write_manifest */, | |
1e59de90 TL |
662 | Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_, |
663 | /*db_id=*/"", | |
20effc67 TL |
664 | /*db_session_id=*/"", full_history_ts_low); |
665 | ||
666 | FileMetaData fmeta; | |
667 | mutex_.Lock(); | |
668 | flush_job.PickMemTable(); | |
669 | ASSERT_OK(flush_job.Run(/*prep_tracker=*/nullptr, &fmeta)); | |
670 | mutex_.Unlock(); | |
671 | ||
672 | { | |
673 | std::string key = test::EncodeInt(0); | |
674 | key.append(test::EncodeInt(curr_ts_.load(std::memory_order_relaxed) - 1)); | |
675 | InternalKey ikey(key, curr_seq_ - 1, ValueType::kTypeDeletionWithTimestamp); | |
676 | ASSERT_EQ(ikey.Encode(), fmeta.smallest.Encode()); | |
677 | ASSERT_EQ(ikey.Encode(), fmeta.largest.Encode()); | |
678 | } | |
679 | ||
680 | job_context.Clean(); | |
681 | ASSERT_TRUE(to_delete.empty()); | |
682 | } | |
683 | ||
684 | TEST_F(FlushJobTimestampTest, NoKeyExpired) { | |
685 | ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault(); | |
686 | autovector<MemTable*> to_delete; | |
687 | ||
688 | { | |
689 | MemTable* new_mem = cfd->ConstructNewMemtable( | |
690 | *cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber); | |
691 | new_mem->Ref(); | |
692 | for (int i = 0; i < 100; ++i) { | |
693 | uint64_t ts = curr_ts_.fetch_add(1); | |
694 | SequenceNumber seq = (curr_seq_++); | |
695 | AddKeyValueToMemtable(new_mem, test::EncodeInt(0), ts, seq, | |
696 | ValueType::kTypeValue, "0_value"); | |
697 | } | |
1e59de90 | 698 | new_mem->ConstructFragmentedRangeTombstones(); |
20effc67 TL |
699 | cfd->imm()->Add(new_mem, &to_delete); |
700 | } | |
701 | ||
702 | std::vector<SequenceNumber> snapshots; | |
703 | SnapshotChecker* const snapshot_checker = nullptr; | |
704 | JobContext job_context(0); | |
705 | EventLogger event_logger(db_options_.info_log.get()); | |
706 | std::string full_history_ts_low; | |
707 | PutFixed64(&full_history_ts_low, 0); | |
708 | FlushJob flush_job( | |
709 | dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(), | |
1e59de90 TL |
710 | std::numeric_limits<uint64_t>::max() /* memtable_id */, env_options_, |
711 | versions_.get(), &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber, | |
712 | snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, | |
20effc67 TL |
713 | db_options_.statistics.get(), &event_logger, true, |
714 | true /* sync_output_directory */, true /* write_manifest */, | |
1e59de90 TL |
715 | Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_, |
716 | /*db_id=*/"", | |
20effc67 TL |
717 | /*db_session_id=*/"", full_history_ts_low); |
718 | ||
719 | FileMetaData fmeta; | |
720 | mutex_.Lock(); | |
721 | flush_job.PickMemTable(); | |
722 | ASSERT_OK(flush_job.Run(/*prep_tracker=*/nullptr, &fmeta)); | |
723 | mutex_.Unlock(); | |
724 | ||
725 | { | |
726 | std::string ukey = test::EncodeInt(0); | |
727 | std::string smallest_key = | |
728 | ukey + test::EncodeInt(curr_ts_.load(std::memory_order_relaxed) - 1); | |
729 | std::string largest_key = ukey + test::EncodeInt(kStartTs); | |
730 | InternalKey smallest(smallest_key, curr_seq_ - 1, ValueType::kTypeValue); | |
731 | InternalKey largest(largest_key, kStartSeq, ValueType::kTypeValue); | |
732 | ASSERT_EQ(smallest.Encode(), fmeta.smallest.Encode()); | |
733 | ASSERT_EQ(largest.Encode(), fmeta.largest.Encode()); | |
734 | } | |
735 | job_context.Clean(); | |
736 | ASSERT_TRUE(to_delete.empty()); | |
737 | } | |
738 | ||
f67539c2 | 739 | } // namespace ROCKSDB_NAMESPACE |
7c673cae FG |
740 | |
741 | int main(int argc, char** argv) { | |
1e59de90 | 742 | ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); |
7c673cae FG |
743 | ::testing::InitGoogleTest(&argc, argv); |
744 | return RUN_ALL_TESTS(); | |
745 | } |