]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae | 5 | |
20effc67 TL |
6 | #include "db/flush_job.h" |
7 | ||
7c673cae | 8 | #include <algorithm> |
f67539c2 | 9 | #include <array> |
7c673cae FG |
10 | #include <map> |
11 | #include <string> | |
12 | ||
20effc67 | 13 | #include "db/blob/blob_index.h" |
7c673cae | 14 | #include "db/column_family.h" |
f67539c2 | 15 | #include "db/db_impl/db_impl.h" |
7c673cae | 16 | #include "db/version_set.h" |
f67539c2 | 17 | #include "file/writable_file_writer.h" |
7c673cae FG |
18 | #include "rocksdb/cache.h" |
19 | #include "rocksdb/write_buffer_manager.h" | |
20 | #include "table/mock_table.h" | |
f67539c2 TL |
21 | #include "test_util/testharness.h" |
22 | #include "test_util/testutil.h" | |
20effc67 | 23 | #include "util/random.h" |
7c673cae | 24 | #include "util/string_util.h" |
7c673cae | 25 | |
f67539c2 | 26 | namespace ROCKSDB_NAMESPACE { |
7c673cae FG |
27 | |
28 | // TODO(icanadi) Mock out everything else: | |
29 | // 1. VersionSet | |
30 | // 2. Memtable | |
20effc67 TL |
31 | class FlushJobTestBase : public testing::Test { |
32 | protected: | |
33 | FlushJobTestBase(std::string dbname, const Comparator* ucmp) | |
7c673cae | 34 | : env_(Env::Default()), |
f67539c2 | 35 | fs_(std::make_shared<LegacyFileSystemWrapper>(env_)), |
20effc67 TL |
36 | dbname_(std::move(dbname)), |
37 | ucmp_(ucmp), | |
7c673cae FG |
38 | options_(), |
39 | db_options_(options_), | |
494da23a | 40 | column_family_names_({kDefaultColumnFamilyName, "foo", "bar"}), |
7c673cae FG |
41 | table_cache_(NewLRUCache(50000, 16)), |
42 | write_buffer_manager_(db_options_.db_write_buffer_size), | |
7c673cae | 43 | shutting_down_(false), |
20effc67 | 44 | mock_table_factory_(new mock::MockTableFactory()) {} |
7c673cae | 45 | |
20effc67 TL |
46 | virtual ~FlushJobTestBase() { |
47 | if (getenv("KEEP_DB")) { | |
48 | fprintf(stdout, "db is still in %s\n", dbname_.c_str()); | |
49 | } else { | |
50 | EXPECT_OK(DestroyDir(env_, dbname_)); | |
51 | } | |
7c673cae FG |
52 | } |
53 | ||
54 | void NewDB() { | |
f67539c2 | 55 | SetIdentityFile(env_, dbname_); |
7c673cae | 56 | VersionEdit new_db; |
20effc67 | 57 | |
7c673cae FG |
58 | new_db.SetLogNumber(0); |
59 | new_db.SetNextFile(2); | |
60 | new_db.SetLastSequence(0); | |
61 | ||
494da23a TL |
62 | autovector<VersionEdit> new_cfs; |
63 | SequenceNumber last_seq = 1; | |
64 | uint32_t cf_id = 1; | |
65 | for (size_t i = 1; i != column_family_names_.size(); ++i) { | |
66 | VersionEdit new_cf; | |
67 | new_cf.AddColumnFamily(column_family_names_[i]); | |
68 | new_cf.SetColumnFamily(cf_id++); | |
20effc67 | 69 | new_cf.SetComparatorName(ucmp_->Name()); |
494da23a TL |
70 | new_cf.SetLogNumber(0); |
71 | new_cf.SetNextFile(2); | |
72 | new_cf.SetLastSequence(last_seq++); | |
73 | new_cfs.emplace_back(new_cf); | |
74 | } | |
75 | ||
7c673cae | 76 | const std::string manifest = DescriptorFileName(dbname_, 1); |
494da23a | 77 | std::unique_ptr<WritableFile> file; |
7c673cae FG |
78 | Status s = env_->NewWritableFile( |
79 | manifest, &file, env_->OptimizeForManifestWrite(env_options_)); | |
80 | ASSERT_OK(s); | |
f67539c2 TL |
81 | std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter( |
82 | NewLegacyWritableFileWrapper(std::move(file)), manifest, EnvOptions())); | |
7c673cae FG |
83 | { |
84 | log::Writer log(std::move(file_writer), 0, false); | |
85 | std::string record; | |
86 | new_db.EncodeTo(&record); | |
87 | s = log.AddRecord(record); | |
494da23a TL |
88 | |
89 | for (const auto& e : new_cfs) { | |
90 | record.clear(); | |
91 | e.EncodeTo(&record); | |
92 | s = log.AddRecord(record); | |
93 | ASSERT_OK(s); | |
94 | } | |
7c673cae FG |
95 | } |
96 | ASSERT_OK(s); | |
97 | // Make "CURRENT" file that points to the new manifest file. | |
20effc67 TL |
98 | s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr); |
99 | ASSERT_OK(s); | |
100 | } | |
101 | ||
102 | void SetUp() override { | |
103 | EXPECT_OK(env_->CreateDirIfMissing(dbname_)); | |
104 | ||
105 | // TODO(icanadi) Remove this once we mock out VersionSet | |
106 | NewDB(); | |
107 | ||
108 | db_options_.env = env_; | |
109 | db_options_.fs = fs_; | |
110 | db_options_.db_paths.emplace_back(dbname_, | |
111 | std::numeric_limits<uint64_t>::max()); | |
112 | db_options_.statistics = CreateDBStatistics(); | |
113 | ||
114 | cf_options_.comparator = ucmp_; | |
115 | ||
116 | std::vector<ColumnFamilyDescriptor> column_families; | |
117 | cf_options_.table_factory = mock_table_factory_; | |
118 | for (const auto& cf_name : column_family_names_) { | |
119 | column_families.emplace_back(cf_name, cf_options_); | |
120 | } | |
121 | ||
122 | versions_.reset( | |
123 | new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), | |
124 | &write_buffer_manager_, &write_controller_, | |
125 | /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)); | |
126 | EXPECT_OK(versions_->Recover(column_families, false)); | |
7c673cae FG |
127 | } |
128 | ||
129 | Env* env_; | |
f67539c2 | 130 | std::shared_ptr<FileSystem> fs_; |
7c673cae | 131 | std::string dbname_; |
20effc67 | 132 | const Comparator* const ucmp_; |
7c673cae FG |
133 | EnvOptions env_options_; |
134 | Options options_; | |
135 | ImmutableDBOptions db_options_; | |
494da23a | 136 | const std::vector<std::string> column_family_names_; |
7c673cae FG |
137 | std::shared_ptr<Cache> table_cache_; |
138 | WriteController write_controller_; | |
139 | WriteBufferManager write_buffer_manager_; | |
140 | ColumnFamilyOptions cf_options_; | |
141 | std::unique_ptr<VersionSet> versions_; | |
142 | InstrumentedMutex mutex_; | |
143 | std::atomic<bool> shutting_down_; | |
144 | std::shared_ptr<mock::MockTableFactory> mock_table_factory_; | |
145 | }; | |
146 | ||
20effc67 TL |
147 | class FlushJobTest : public FlushJobTestBase { |
148 | public: | |
149 | FlushJobTest() | |
150 | : FlushJobTestBase(test::PerThreadDBPath("flush_job_test"), | |
151 | BytewiseComparator()) {} | |
152 | }; | |
153 | ||
7c673cae FG |
154 | TEST_F(FlushJobTest, Empty) { |
155 | JobContext job_context(0); | |
156 | auto cfd = versions_->GetColumnFamilySet()->GetDefault(); | |
157 | EventLogger event_logger(db_options_.info_log.get()); | |
11fdf7f2 | 158 | SnapshotChecker* snapshot_checker = nullptr; // not relavant |
20effc67 TL |
159 | FlushJob flush_job( |
160 | dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, | |
161 | *cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */, | |
162 | env_options_, versions_.get(), &mutex_, &shutting_down_, {}, | |
163 | kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, | |
164 | nullptr, kNoCompression, nullptr, &event_logger, false, | |
165 | true /* sync_output_directory */, true /* write_manifest */, | |
166 | Env::Priority::USER, nullptr /*IOTracer*/); | |
7c673cae FG |
167 | { |
168 | InstrumentedMutexLock l(&mutex_); | |
169 | flush_job.PickMemTable(); | |
170 | ASSERT_OK(flush_job.Run()); | |
171 | } | |
172 | job_context.Clean(); | |
173 | } | |
174 | ||
175 | TEST_F(FlushJobTest, NonEmpty) { | |
176 | JobContext job_context(0); | |
177 | auto cfd = versions_->GetColumnFamilySet()->GetDefault(); | |
178 | auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(), | |
179 | kMaxSequenceNumber); | |
180 | new_mem->Ref(); | |
181 | auto inserted_keys = mock::MakeMockFile(); | |
182 | // Test data: | |
183 | // seqno [ 1, 2 ... 8998, 8999, 9000, 9001, 9002 ... 9999 ] | |
184 | // key [ 1001, 1002 ... 9998, 9999, 0, 1, 2 ... 999 ] | |
185 | // range-delete "9995" -> "9999" at seqno 10000 | |
f67539c2 | 186 | // blob references with seqnos 10001..10006 |
7c673cae FG |
187 | for (int i = 1; i < 10000; ++i) { |
188 | std::string key(ToString((i + 1000) % 10000)); | |
189 | std::string value("value" + key); | |
190 | new_mem->Add(SequenceNumber(i), kTypeValue, key, value); | |
191 | if ((i + 1000) % 10000 < 9995) { | |
192 | InternalKey internal_key(key, SequenceNumber(i), kTypeValue); | |
20effc67 | 193 | inserted_keys.push_back({internal_key.Encode().ToString(), value}); |
7c673cae FG |
194 | } |
195 | } | |
f67539c2 TL |
196 | |
197 | { | |
198 | new_mem->Add(SequenceNumber(10000), kTypeRangeDeletion, "9995", "9999a"); | |
199 | InternalKey internal_key("9995", SequenceNumber(10000), kTypeRangeDeletion); | |
20effc67 | 200 | inserted_keys.push_back({internal_key.Encode().ToString(), "9999a"}); |
f67539c2 TL |
201 | } |
202 | ||
f67539c2 TL |
203 | // Note: the first two blob references will not be considered when resolving |
204 | // the oldest blob file referenced (the first one is inlined TTL, while the | |
205 | // second one is TTL and thus points to a TTL blob file). | |
20effc67 TL |
206 | constexpr std::array<uint64_t, 6> blob_file_numbers{{ |
207 | kInvalidBlobFileNumber, 5, 103, 17, 102, 101}}; | |
f67539c2 TL |
208 | for (size_t i = 0; i < blob_file_numbers.size(); ++i) { |
209 | std::string key(ToString(i + 10001)); | |
210 | std::string blob_index; | |
211 | if (i == 0) { | |
212 | BlobIndex::EncodeInlinedTTL(&blob_index, /* expiration */ 1234567890ULL, | |
213 | "foo"); | |
214 | } else if (i == 1) { | |
215 | BlobIndex::EncodeBlobTTL(&blob_index, /* expiration */ 1234567890ULL, | |
216 | blob_file_numbers[i], /* offset */ i << 10, | |
217 | /* size */ i << 20, kNoCompression); | |
218 | } else { | |
219 | BlobIndex::EncodeBlob(&blob_index, blob_file_numbers[i], | |
220 | /* offset */ i << 10, /* size */ i << 20, | |
221 | kNoCompression); | |
222 | } | |
223 | ||
224 | const SequenceNumber seq(i + 10001); | |
225 | new_mem->Add(seq, kTypeBlobIndex, key, blob_index); | |
226 | ||
227 | InternalKey internal_key(key, seq, kTypeBlobIndex); | |
20effc67 | 228 | inserted_keys.push_back({internal_key.Encode().ToString(), blob_index}); |
f67539c2 | 229 | } |
20effc67 | 230 | mock::SortKVVector(&inserted_keys); |
7c673cae FG |
231 | |
232 | autovector<MemTable*> to_delete; | |
233 | cfd->imm()->Add(new_mem, &to_delete); | |
234 | for (auto& m : to_delete) { | |
235 | delete m; | |
236 | } | |
237 | ||
238 | EventLogger event_logger(db_options_.info_log.get()); | |
11fdf7f2 | 239 | SnapshotChecker* snapshot_checker = nullptr; // not relavant |
20effc67 TL |
240 | FlushJob flush_job( |
241 | dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, | |
242 | *cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */, | |
243 | env_options_, versions_.get(), &mutex_, &shutting_down_, {}, | |
244 | kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, | |
245 | nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, | |
246 | true, true /* sync_output_directory */, true /* write_manifest */, | |
247 | Env::Priority::USER, nullptr /*IOTracer*/); | |
11fdf7f2 TL |
248 | |
249 | HistogramData hist; | |
250 | FileMetaData file_meta; | |
7c673cae FG |
251 | mutex_.Lock(); |
252 | flush_job.PickMemTable(); | |
11fdf7f2 | 253 | ASSERT_OK(flush_job.Run(nullptr, &file_meta)); |
7c673cae | 254 | mutex_.Unlock(); |
11fdf7f2 TL |
255 | db_options_.statistics->histogramData(FLUSH_TIME, &hist); |
256 | ASSERT_GT(hist.average, 0.0); | |
257 | ||
258 | ASSERT_EQ(ToString(0), file_meta.smallest.user_key().ToString()); | |
f67539c2 | 259 | ASSERT_EQ("9999a", file_meta.largest.user_key().ToString()); |
11fdf7f2 | 260 | ASSERT_EQ(1, file_meta.fd.smallest_seqno); |
f67539c2 TL |
261 | ASSERT_EQ(10006, file_meta.fd.largest_seqno); |
262 | ASSERT_EQ(17, file_meta.oldest_blob_file_number); | |
7c673cae FG |
263 | mock_table_factory_->AssertSingleFile(inserted_keys); |
264 | job_context.Clean(); | |
265 | } | |
266 | ||
494da23a TL |
267 | TEST_F(FlushJobTest, FlushMemTablesSingleColumnFamily) { |
268 | const size_t num_mems = 2; | |
269 | const size_t num_mems_to_flush = 1; | |
270 | const size_t num_keys_per_table = 100; | |
271 | JobContext job_context(0); | |
272 | ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault(); | |
273 | std::vector<uint64_t> memtable_ids; | |
274 | std::vector<MemTable*> new_mems; | |
275 | for (size_t i = 0; i != num_mems; ++i) { | |
276 | MemTable* mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(), | |
277 | kMaxSequenceNumber); | |
278 | mem->SetID(i); | |
279 | mem->Ref(); | |
280 | new_mems.emplace_back(mem); | |
281 | memtable_ids.push_back(mem->GetID()); | |
282 | ||
283 | for (size_t j = 0; j < num_keys_per_table; ++j) { | |
284 | std::string key(ToString(j + i * num_keys_per_table)); | |
285 | std::string value("value" + key); | |
286 | mem->Add(SequenceNumber(j + i * num_keys_per_table), kTypeValue, key, | |
287 | value); | |
288 | } | |
289 | } | |
290 | ||
291 | autovector<MemTable*> to_delete; | |
292 | for (auto mem : new_mems) { | |
293 | cfd->imm()->Add(mem, &to_delete); | |
294 | } | |
295 | ||
296 | EventLogger event_logger(db_options_.info_log.get()); | |
297 | SnapshotChecker* snapshot_checker = nullptr; // not relavant | |
298 | ||
299 | assert(memtable_ids.size() == num_mems); | |
300 | uint64_t smallest_memtable_id = memtable_ids.front(); | |
301 | uint64_t flush_memtable_id = smallest_memtable_id + num_mems_to_flush - 1; | |
20effc67 TL |
302 | FlushJob flush_job( |
303 | dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, | |
304 | *cfd->GetLatestMutableCFOptions(), &flush_memtable_id, env_options_, | |
305 | versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber, | |
306 | snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, | |
307 | db_options_.statistics.get(), &event_logger, true, | |
308 | true /* sync_output_directory */, true /* write_manifest */, | |
309 | Env::Priority::USER, nullptr /*IOTracer*/); | |
494da23a TL |
310 | HistogramData hist; |
311 | FileMetaData file_meta; | |
312 | mutex_.Lock(); | |
313 | flush_job.PickMemTable(); | |
314 | ASSERT_OK(flush_job.Run(nullptr /* prep_tracker */, &file_meta)); | |
315 | mutex_.Unlock(); | |
316 | db_options_.statistics->histogramData(FLUSH_TIME, &hist); | |
317 | ASSERT_GT(hist.average, 0.0); | |
318 | ||
319 | ASSERT_EQ(ToString(0), file_meta.smallest.user_key().ToString()); | |
320 | ASSERT_EQ("99", file_meta.largest.user_key().ToString()); | |
321 | ASSERT_EQ(0, file_meta.fd.smallest_seqno); | |
322 | ASSERT_EQ(SequenceNumber(num_mems_to_flush * num_keys_per_table - 1), | |
323 | file_meta.fd.largest_seqno); | |
f67539c2 | 324 | ASSERT_EQ(kInvalidBlobFileNumber, file_meta.oldest_blob_file_number); |
494da23a TL |
325 | |
326 | for (auto m : to_delete) { | |
327 | delete m; | |
328 | } | |
329 | to_delete.clear(); | |
330 | job_context.Clean(); | |
331 | } | |
332 | ||
333 | TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { | |
334 | autovector<ColumnFamilyData*> all_cfds; | |
335 | for (auto cfd : *versions_->GetColumnFamilySet()) { | |
336 | all_cfds.push_back(cfd); | |
337 | } | |
338 | const std::vector<size_t> num_memtables = {2, 1, 3}; | |
339 | assert(num_memtables.size() == column_family_names_.size()); | |
340 | const size_t num_keys_per_memtable = 1000; | |
341 | JobContext job_context(0); | |
342 | std::vector<uint64_t> memtable_ids; | |
343 | std::vector<SequenceNumber> smallest_seqs; | |
344 | std::vector<SequenceNumber> largest_seqs; | |
345 | autovector<MemTable*> to_delete; | |
346 | SequenceNumber curr_seqno = 0; | |
347 | size_t k = 0; | |
348 | for (auto cfd : all_cfds) { | |
349 | smallest_seqs.push_back(curr_seqno); | |
350 | for (size_t i = 0; i != num_memtables[k]; ++i) { | |
351 | MemTable* mem = cfd->ConstructNewMemtable( | |
352 | *cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber); | |
353 | mem->SetID(i); | |
354 | mem->Ref(); | |
355 | ||
356 | for (size_t j = 0; j != num_keys_per_memtable; ++j) { | |
357 | std::string key(ToString(j + i * num_keys_per_memtable)); | |
358 | std::string value("value" + key); | |
359 | mem->Add(curr_seqno++, kTypeValue, key, value); | |
360 | } | |
361 | ||
362 | cfd->imm()->Add(mem, &to_delete); | |
363 | } | |
364 | largest_seqs.push_back(curr_seqno - 1); | |
365 | memtable_ids.push_back(num_memtables[k++] - 1); | |
366 | } | |
367 | ||
368 | EventLogger event_logger(db_options_.info_log.get()); | |
369 | SnapshotChecker* snapshot_checker = nullptr; // not relevant | |
f67539c2 | 370 | std::vector<std::unique_ptr<FlushJob>> flush_jobs; |
494da23a TL |
371 | k = 0; |
372 | for (auto cfd : all_cfds) { | |
373 | std::vector<SequenceNumber> snapshot_seqs; | |
f67539c2 | 374 | flush_jobs.emplace_back(new FlushJob( |
494da23a TL |
375 | dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(), |
376 | &memtable_ids[k], env_options_, versions_.get(), &mutex_, | |
377 | &shutting_down_, snapshot_seqs, kMaxSequenceNumber, snapshot_checker, | |
378 | &job_context, nullptr, nullptr, nullptr, kNoCompression, | |
379 | db_options_.statistics.get(), &event_logger, true, | |
380 | false /* sync_output_directory */, false /* write_manifest */, | |
20effc67 | 381 | Env::Priority::USER, nullptr /*IOTracer*/)); |
494da23a TL |
382 | k++; |
383 | } | |
384 | HistogramData hist; | |
385 | std::vector<FileMetaData> file_metas; | |
386 | // Call reserve to avoid auto-resizing | |
387 | file_metas.reserve(flush_jobs.size()); | |
388 | mutex_.Lock(); | |
389 | for (auto& job : flush_jobs) { | |
f67539c2 | 390 | job->PickMemTable(); |
494da23a TL |
391 | } |
392 | for (auto& job : flush_jobs) { | |
393 | FileMetaData meta; | |
394 | // Run will release and re-acquire mutex | |
f67539c2 | 395 | ASSERT_OK(job->Run(nullptr /**/, &meta)); |
494da23a TL |
396 | file_metas.emplace_back(meta); |
397 | } | |
398 | autovector<FileMetaData*> file_meta_ptrs; | |
399 | for (auto& meta : file_metas) { | |
400 | file_meta_ptrs.push_back(&meta); | |
401 | } | |
402 | autovector<const autovector<MemTable*>*> mems_list; | |
403 | for (size_t i = 0; i != all_cfds.size(); ++i) { | |
f67539c2 | 404 | const auto& mems = flush_jobs[i]->GetMemTables(); |
494da23a TL |
405 | mems_list.push_back(&mems); |
406 | } | |
407 | autovector<const MutableCFOptions*> mutable_cf_options_list; | |
408 | for (auto cfd : all_cfds) { | |
409 | mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions()); | |
410 | } | |
411 | ||
412 | Status s = InstallMemtableAtomicFlushResults( | |
413 | nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list, | |
414 | versions_.get(), &mutex_, file_meta_ptrs, &job_context.memtables_to_free, | |
415 | nullptr /* db_directory */, nullptr /* log_buffer */); | |
416 | ASSERT_OK(s); | |
417 | ||
418 | mutex_.Unlock(); | |
419 | db_options_.statistics->histogramData(FLUSH_TIME, &hist); | |
420 | ASSERT_GT(hist.average, 0.0); | |
421 | k = 0; | |
422 | for (const auto& file_meta : file_metas) { | |
423 | ASSERT_EQ(ToString(0), file_meta.smallest.user_key().ToString()); | |
424 | ASSERT_EQ("999", file_meta.largest.user_key() | |
425 | .ToString()); // max key by bytewise comparator | |
426 | ASSERT_EQ(smallest_seqs[k], file_meta.fd.smallest_seqno); | |
427 | ASSERT_EQ(largest_seqs[k], file_meta.fd.largest_seqno); | |
428 | // Verify that imm is empty | |
429 | ASSERT_EQ(std::numeric_limits<uint64_t>::max(), | |
430 | all_cfds[k]->imm()->GetEarliestMemTableID()); | |
431 | ASSERT_EQ(0, all_cfds[k]->imm()->GetLatestMemTableID()); | |
432 | ++k; | |
433 | } | |
434 | ||
435 | for (auto m : to_delete) { | |
436 | delete m; | |
437 | } | |
438 | to_delete.clear(); | |
439 | job_context.Clean(); | |
440 | } | |
441 | ||
7c673cae FG |
442 | TEST_F(FlushJobTest, Snapshots) { |
443 | JobContext job_context(0); | |
444 | auto cfd = versions_->GetColumnFamilySet()->GetDefault(); | |
445 | auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(), | |
446 | kMaxSequenceNumber); | |
447 | ||
7c673cae FG |
448 | std::set<SequenceNumber> snapshots_set; |
449 | int keys = 10000; | |
450 | int max_inserts_per_keys = 8; | |
451 | ||
452 | Random rnd(301); | |
453 | for (int i = 0; i < keys / 2; ++i) { | |
494da23a | 454 | snapshots_set.insert(rnd.Uniform(keys * (max_inserts_per_keys / 2)) + 1); |
7c673cae | 455 | } |
494da23a TL |
456 | // set has already removed the duplicate snapshots |
457 | std::vector<SequenceNumber> snapshots(snapshots_set.begin(), | |
458 | snapshots_set.end()); | |
7c673cae FG |
459 | |
460 | new_mem->Ref(); | |
461 | SequenceNumber current_seqno = 0; | |
462 | auto inserted_keys = mock::MakeMockFile(); | |
463 | for (int i = 1; i < keys; ++i) { | |
464 | std::string key(ToString(i)); | |
465 | int insertions = rnd.Uniform(max_inserts_per_keys); | |
466 | for (int j = 0; j < insertions; ++j) { | |
20effc67 | 467 | std::string value(rnd.HumanReadableString(10)); |
7c673cae FG |
468 | auto seqno = ++current_seqno; |
469 | new_mem->Add(SequenceNumber(seqno), kTypeValue, key, value); | |
470 | // a key is visible only if: | |
471 | // 1. it's the last one written (j == insertions - 1) | |
472 | // 2. there's a snapshot pointing at it | |
473 | bool visible = (j == insertions - 1) || | |
474 | (snapshots_set.find(seqno) != snapshots_set.end()); | |
475 | if (visible) { | |
476 | InternalKey internal_key(key, seqno, kTypeValue); | |
20effc67 | 477 | inserted_keys.push_back({internal_key.Encode().ToString(), value}); |
7c673cae FG |
478 | } |
479 | } | |
480 | } | |
20effc67 | 481 | mock::SortKVVector(&inserted_keys); |
7c673cae FG |
482 | |
483 | autovector<MemTable*> to_delete; | |
484 | cfd->imm()->Add(new_mem, &to_delete); | |
485 | for (auto& m : to_delete) { | |
486 | delete m; | |
487 | } | |
488 | ||
489 | EventLogger event_logger(db_options_.info_log.get()); | |
11fdf7f2 | 490 | SnapshotChecker* snapshot_checker = nullptr; // not relavant |
20effc67 TL |
491 | FlushJob flush_job( |
492 | dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, | |
493 | *cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */, | |
494 | env_options_, versions_.get(), &mutex_, &shutting_down_, snapshots, | |
495 | kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, | |
496 | nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, | |
497 | true, true /* sync_output_directory */, true /* write_manifest */, | |
498 | Env::Priority::USER, nullptr /*IOTracer*/); | |
7c673cae FG |
499 | mutex_.Lock(); |
500 | flush_job.PickMemTable(); | |
501 | ASSERT_OK(flush_job.Run()); | |
502 | mutex_.Unlock(); | |
503 | mock_table_factory_->AssertSingleFile(inserted_keys); | |
11fdf7f2 TL |
504 | HistogramData hist; |
505 | db_options_.statistics->histogramData(FLUSH_TIME, &hist); | |
506 | ASSERT_GT(hist.average, 0.0); | |
7c673cae FG |
507 | job_context.Clean(); |
508 | } | |
509 | ||
20effc67 TL |
510 | class FlushJobTimestampTest : public FlushJobTestBase { |
511 | public: | |
512 | FlushJobTimestampTest() | |
513 | : FlushJobTestBase(test::PerThreadDBPath("flush_job_ts_gc_test"), | |
514 | test::ComparatorWithU64Ts()) {} | |
515 | ||
516 | void AddKeyValueToMemtable(MemTable* memtable, std::string key, uint64_t ts, | |
517 | SequenceNumber seq, ValueType value_type, | |
518 | Slice value) { | |
519 | std::string key_str(std::move(key)); | |
520 | PutFixed64(&key_str, ts); | |
521 | memtable->Add(seq, value_type, key_str, value); | |
522 | } | |
523 | ||
524 | protected: | |
525 | static constexpr uint64_t kStartTs = 10; | |
526 | static constexpr SequenceNumber kStartSeq = 0; | |
527 | SequenceNumber curr_seq_{kStartSeq}; | |
528 | std::atomic<uint64_t> curr_ts_{kStartTs}; | |
529 | }; | |
530 | ||
531 | TEST_F(FlushJobTimestampTest, AllKeysExpired) { | |
532 | ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault(); | |
533 | autovector<MemTable*> to_delete; | |
534 | ||
535 | { | |
536 | MemTable* new_mem = cfd->ConstructNewMemtable( | |
537 | *cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber); | |
538 | new_mem->Ref(); | |
539 | for (int i = 0; i < 100; ++i) { | |
540 | uint64_t ts = curr_ts_.fetch_add(1); | |
541 | SequenceNumber seq = (curr_seq_++); | |
542 | AddKeyValueToMemtable(new_mem, test::EncodeInt(0), ts, seq, | |
543 | ValueType::kTypeValue, "0_value"); | |
544 | } | |
545 | uint64_t ts = curr_ts_.fetch_add(1); | |
546 | SequenceNumber seq = (curr_seq_++); | |
547 | AddKeyValueToMemtable(new_mem, test::EncodeInt(0), ts, seq, | |
548 | ValueType::kTypeDeletionWithTimestamp, ""); | |
549 | cfd->imm()->Add(new_mem, &to_delete); | |
550 | } | |
551 | ||
552 | std::vector<SequenceNumber> snapshots; | |
553 | constexpr SnapshotChecker* const snapshot_checker = nullptr; | |
554 | JobContext job_context(0); | |
555 | EventLogger event_logger(db_options_.info_log.get()); | |
556 | std::string full_history_ts_low; | |
557 | PutFixed64(&full_history_ts_low, std::numeric_limits<uint64_t>::max()); | |
558 | FlushJob flush_job( | |
559 | dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(), | |
560 | nullptr /* memtable_id */, env_options_, versions_.get(), &mutex_, | |
561 | &shutting_down_, snapshots, kMaxSequenceNumber, snapshot_checker, | |
562 | &job_context, nullptr, nullptr, nullptr, kNoCompression, | |
563 | db_options_.statistics.get(), &event_logger, true, | |
564 | true /* sync_output_directory */, true /* write_manifest */, | |
565 | Env::Priority::USER, nullptr /*IOTracer*/, /*db_id=*/"", | |
566 | /*db_session_id=*/"", full_history_ts_low); | |
567 | ||
568 | FileMetaData fmeta; | |
569 | mutex_.Lock(); | |
570 | flush_job.PickMemTable(); | |
571 | ASSERT_OK(flush_job.Run(/*prep_tracker=*/nullptr, &fmeta)); | |
572 | mutex_.Unlock(); | |
573 | ||
574 | { | |
575 | std::string key = test::EncodeInt(0); | |
576 | key.append(test::EncodeInt(curr_ts_.load(std::memory_order_relaxed) - 1)); | |
577 | InternalKey ikey(key, curr_seq_ - 1, ValueType::kTypeDeletionWithTimestamp); | |
578 | ASSERT_EQ(ikey.Encode(), fmeta.smallest.Encode()); | |
579 | ASSERT_EQ(ikey.Encode(), fmeta.largest.Encode()); | |
580 | } | |
581 | ||
582 | job_context.Clean(); | |
583 | ASSERT_TRUE(to_delete.empty()); | |
584 | } | |
585 | ||
586 | TEST_F(FlushJobTimestampTest, NoKeyExpired) { | |
587 | ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault(); | |
588 | autovector<MemTable*> to_delete; | |
589 | ||
590 | { | |
591 | MemTable* new_mem = cfd->ConstructNewMemtable( | |
592 | *cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber); | |
593 | new_mem->Ref(); | |
594 | for (int i = 0; i < 100; ++i) { | |
595 | uint64_t ts = curr_ts_.fetch_add(1); | |
596 | SequenceNumber seq = (curr_seq_++); | |
597 | AddKeyValueToMemtable(new_mem, test::EncodeInt(0), ts, seq, | |
598 | ValueType::kTypeValue, "0_value"); | |
599 | } | |
600 | cfd->imm()->Add(new_mem, &to_delete); | |
601 | } | |
602 | ||
603 | std::vector<SequenceNumber> snapshots; | |
604 | SnapshotChecker* const snapshot_checker = nullptr; | |
605 | JobContext job_context(0); | |
606 | EventLogger event_logger(db_options_.info_log.get()); | |
607 | std::string full_history_ts_low; | |
608 | PutFixed64(&full_history_ts_low, 0); | |
609 | FlushJob flush_job( | |
610 | dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(), | |
611 | nullptr /* memtable_id */, env_options_, versions_.get(), &mutex_, | |
612 | &shutting_down_, snapshots, kMaxSequenceNumber, snapshot_checker, | |
613 | &job_context, nullptr, nullptr, nullptr, kNoCompression, | |
614 | db_options_.statistics.get(), &event_logger, true, | |
615 | true /* sync_output_directory */, true /* write_manifest */, | |
616 | Env::Priority::USER, nullptr /*IOTracer*/, /*db_id=*/"", | |
617 | /*db_session_id=*/"", full_history_ts_low); | |
618 | ||
619 | FileMetaData fmeta; | |
620 | mutex_.Lock(); | |
621 | flush_job.PickMemTable(); | |
622 | ASSERT_OK(flush_job.Run(/*prep_tracker=*/nullptr, &fmeta)); | |
623 | mutex_.Unlock(); | |
624 | ||
625 | { | |
626 | std::string ukey = test::EncodeInt(0); | |
627 | std::string smallest_key = | |
628 | ukey + test::EncodeInt(curr_ts_.load(std::memory_order_relaxed) - 1); | |
629 | std::string largest_key = ukey + test::EncodeInt(kStartTs); | |
630 | InternalKey smallest(smallest_key, curr_seq_ - 1, ValueType::kTypeValue); | |
631 | InternalKey largest(largest_key, kStartSeq, ValueType::kTypeValue); | |
632 | ASSERT_EQ(smallest.Encode(), fmeta.smallest.Encode()); | |
633 | ASSERT_EQ(largest.Encode(), fmeta.largest.Encode()); | |
634 | } | |
635 | job_context.Clean(); | |
636 | ASSERT_TRUE(to_delete.empty()); | |
637 | } | |
638 | ||
f67539c2 | 639 | } // namespace ROCKSDB_NAMESPACE |
7c673cae FG |
640 | |
641 | int main(int argc, char** argv) { | |
642 | ::testing::InitGoogleTest(&argc, argv); | |
643 | return RUN_ALL_TESTS(); | |
644 | } |