]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | |
6 | #include <algorithm> | |
f67539c2 | 7 | #include <array> |
7c673cae FG |
8 | #include <map> |
9 | #include <string> | |
10 | ||
f67539c2 | 11 | #include "db/blob_index.h" |
7c673cae | 12 | #include "db/column_family.h" |
f67539c2 | 13 | #include "db/db_impl/db_impl.h" |
7c673cae FG |
14 | #include "db/flush_job.h" |
15 | #include "db/version_set.h" | |
f67539c2 | 16 | #include "file/writable_file_writer.h" |
7c673cae FG |
17 | #include "rocksdb/cache.h" |
18 | #include "rocksdb/write_buffer_manager.h" | |
19 | #include "table/mock_table.h" | |
f67539c2 TL |
20 | #include "test_util/testharness.h" |
21 | #include "test_util/testutil.h" | |
7c673cae | 22 | #include "util/string_util.h" |
7c673cae | 23 | |
f67539c2 | 24 | namespace ROCKSDB_NAMESPACE { |
7c673cae FG |
25 | |
26 | // TODO(icanadi) Mock out everything else: | |
27 | // 1. VersionSet | |
28 | // 2. Memtable | |
29 | class FlushJobTest : public testing::Test { | |
30 | public: | |
31 | FlushJobTest() | |
32 | : env_(Env::Default()), | |
f67539c2 | 33 | fs_(std::make_shared<LegacyFileSystemWrapper>(env_)), |
11fdf7f2 | 34 | dbname_(test::PerThreadDBPath("flush_job_test")), |
7c673cae FG |
35 | options_(), |
36 | db_options_(options_), | |
494da23a | 37 | column_family_names_({kDefaultColumnFamilyName, "foo", "bar"}), |
7c673cae FG |
38 | table_cache_(NewLRUCache(50000, 16)), |
39 | write_buffer_manager_(db_options_.db_write_buffer_size), | |
7c673cae FG |
40 | shutting_down_(false), |
41 | mock_table_factory_(new mock::MockTableFactory()) { | |
42 | EXPECT_OK(env_->CreateDirIfMissing(dbname_)); | |
43 | db_options_.db_paths.emplace_back(dbname_, | |
44 | std::numeric_limits<uint64_t>::max()); | |
f67539c2 | 45 | db_options_.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); |
7c673cae FG |
46 | // TODO(icanadi) Remove this once we mock out VersionSet |
47 | NewDB(); | |
48 | std::vector<ColumnFamilyDescriptor> column_families; | |
49 | cf_options_.table_factory = mock_table_factory_; | |
494da23a TL |
50 | for (const auto& cf_name : column_family_names_) { |
51 | column_families.emplace_back(cf_name, cf_options_); | |
52 | } | |
7c673cae | 53 | |
f67539c2 TL |
54 | db_options_.env = env_; |
55 | db_options_.fs = fs_; | |
56 | versions_.reset(new VersionSet(dbname_, &db_options_, env_options_, | |
57 | table_cache_.get(), &write_buffer_manager_, | |
58 | &write_controller_, | |
59 | /*block_cache_tracer=*/nullptr)); | |
7c673cae FG |
60 | EXPECT_OK(versions_->Recover(column_families, false)); |
61 | } | |
62 | ||
63 | void NewDB() { | |
f67539c2 | 64 | SetIdentityFile(env_, dbname_); |
7c673cae | 65 | VersionEdit new_db; |
f67539c2 TL |
66 | if (db_options_.write_dbid_to_manifest) { |
67 | DBImpl* impl = new DBImpl(DBOptions(), dbname_); | |
68 | std::string db_id; | |
69 | impl->GetDbIdentityFromIdentityFile(&db_id); | |
70 | new_db.SetDBId(db_id); | |
71 | } | |
7c673cae FG |
72 | new_db.SetLogNumber(0); |
73 | new_db.SetNextFile(2); | |
74 | new_db.SetLastSequence(0); | |
75 | ||
494da23a TL |
76 | autovector<VersionEdit> new_cfs; |
77 | SequenceNumber last_seq = 1; | |
78 | uint32_t cf_id = 1; | |
79 | for (size_t i = 1; i != column_family_names_.size(); ++i) { | |
80 | VersionEdit new_cf; | |
81 | new_cf.AddColumnFamily(column_family_names_[i]); | |
82 | new_cf.SetColumnFamily(cf_id++); | |
83 | new_cf.SetLogNumber(0); | |
84 | new_cf.SetNextFile(2); | |
85 | new_cf.SetLastSequence(last_seq++); | |
86 | new_cfs.emplace_back(new_cf); | |
87 | } | |
88 | ||
7c673cae | 89 | const std::string manifest = DescriptorFileName(dbname_, 1); |
494da23a | 90 | std::unique_ptr<WritableFile> file; |
7c673cae FG |
91 | Status s = env_->NewWritableFile( |
92 | manifest, &file, env_->OptimizeForManifestWrite(env_options_)); | |
93 | ASSERT_OK(s); | |
f67539c2 TL |
94 | std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter( |
95 | NewLegacyWritableFileWrapper(std::move(file)), manifest, EnvOptions())); | |
7c673cae FG |
96 | { |
97 | log::Writer log(std::move(file_writer), 0, false); | |
98 | std::string record; | |
99 | new_db.EncodeTo(&record); | |
100 | s = log.AddRecord(record); | |
494da23a TL |
101 | |
102 | for (const auto& e : new_cfs) { | |
103 | record.clear(); | |
104 | e.EncodeTo(&record); | |
105 | s = log.AddRecord(record); | |
106 | ASSERT_OK(s); | |
107 | } | |
7c673cae FG |
108 | } |
109 | ASSERT_OK(s); | |
110 | // Make "CURRENT" file that points to the new manifest file. | |
111 | s = SetCurrentFile(env_, dbname_, 1, nullptr); | |
112 | } | |
113 | ||
114 | Env* env_; | |
f67539c2 | 115 | std::shared_ptr<FileSystem> fs_; |
7c673cae FG |
116 | std::string dbname_; |
117 | EnvOptions env_options_; | |
118 | Options options_; | |
119 | ImmutableDBOptions db_options_; | |
494da23a | 120 | const std::vector<std::string> column_family_names_; |
7c673cae FG |
121 | std::shared_ptr<Cache> table_cache_; |
122 | WriteController write_controller_; | |
123 | WriteBufferManager write_buffer_manager_; | |
124 | ColumnFamilyOptions cf_options_; | |
125 | std::unique_ptr<VersionSet> versions_; | |
126 | InstrumentedMutex mutex_; | |
127 | std::atomic<bool> shutting_down_; | |
128 | std::shared_ptr<mock::MockTableFactory> mock_table_factory_; | |
129 | }; | |
130 | ||
131 | TEST_F(FlushJobTest, Empty) { | |
132 | JobContext job_context(0); | |
133 | auto cfd = versions_->GetColumnFamilySet()->GetDefault(); | |
134 | EventLogger event_logger(db_options_.info_log.get()); | |
11fdf7f2 | 135 | SnapshotChecker* snapshot_checker = nullptr; // not relavant |
494da23a TL |
136 | FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), |
137 | db_options_, *cfd->GetLatestMutableCFOptions(), | |
138 | nullptr /* memtable_id */, env_options_, versions_.get(), | |
139 | &mutex_, &shutting_down_, {}, kMaxSequenceNumber, | |
140 | snapshot_checker, &job_context, nullptr, nullptr, nullptr, | |
141 | kNoCompression, nullptr, &event_logger, false, | |
142 | true /* sync_output_directory */, | |
143 | true /* write_manifest */, Env::Priority::USER); | |
7c673cae FG |
144 | { |
145 | InstrumentedMutexLock l(&mutex_); | |
146 | flush_job.PickMemTable(); | |
147 | ASSERT_OK(flush_job.Run()); | |
148 | } | |
149 | job_context.Clean(); | |
150 | } | |
151 | ||
152 | TEST_F(FlushJobTest, NonEmpty) { | |
153 | JobContext job_context(0); | |
154 | auto cfd = versions_->GetColumnFamilySet()->GetDefault(); | |
155 | auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(), | |
156 | kMaxSequenceNumber); | |
157 | new_mem->Ref(); | |
158 | auto inserted_keys = mock::MakeMockFile(); | |
159 | // Test data: | |
160 | // seqno [ 1, 2 ... 8998, 8999, 9000, 9001, 9002 ... 9999 ] | |
161 | // key [ 1001, 1002 ... 9998, 9999, 0, 1, 2 ... 999 ] | |
162 | // range-delete "9995" -> "9999" at seqno 10000 | |
f67539c2 | 163 | // blob references with seqnos 10001..10006 |
7c673cae FG |
164 | for (int i = 1; i < 10000; ++i) { |
165 | std::string key(ToString((i + 1000) % 10000)); | |
166 | std::string value("value" + key); | |
167 | new_mem->Add(SequenceNumber(i), kTypeValue, key, value); | |
168 | if ((i + 1000) % 10000 < 9995) { | |
169 | InternalKey internal_key(key, SequenceNumber(i), kTypeValue); | |
170 | inserted_keys.insert({internal_key.Encode().ToString(), value}); | |
171 | } | |
172 | } | |
f67539c2 TL |
173 | |
174 | { | |
175 | new_mem->Add(SequenceNumber(10000), kTypeRangeDeletion, "9995", "9999a"); | |
176 | InternalKey internal_key("9995", SequenceNumber(10000), kTypeRangeDeletion); | |
177 | inserted_keys.insert({internal_key.Encode().ToString(), "9999a"}); | |
178 | } | |
179 | ||
180 | #ifndef ROCKSDB_LITE | |
181 | // Note: the first two blob references will not be considered when resolving | |
182 | // the oldest blob file referenced (the first one is inlined TTL, while the | |
183 | // second one is TTL and thus points to a TTL blob file). | |
184 | constexpr std::array<uint64_t, 6> blob_file_numbers{ | |
185 | kInvalidBlobFileNumber, 5, 103, 17, 102, 101}; | |
186 | for (size_t i = 0; i < blob_file_numbers.size(); ++i) { | |
187 | std::string key(ToString(i + 10001)); | |
188 | std::string blob_index; | |
189 | if (i == 0) { | |
190 | BlobIndex::EncodeInlinedTTL(&blob_index, /* expiration */ 1234567890ULL, | |
191 | "foo"); | |
192 | } else if (i == 1) { | |
193 | BlobIndex::EncodeBlobTTL(&blob_index, /* expiration */ 1234567890ULL, | |
194 | blob_file_numbers[i], /* offset */ i << 10, | |
195 | /* size */ i << 20, kNoCompression); | |
196 | } else { | |
197 | BlobIndex::EncodeBlob(&blob_index, blob_file_numbers[i], | |
198 | /* offset */ i << 10, /* size */ i << 20, | |
199 | kNoCompression); | |
200 | } | |
201 | ||
202 | const SequenceNumber seq(i + 10001); | |
203 | new_mem->Add(seq, kTypeBlobIndex, key, blob_index); | |
204 | ||
205 | InternalKey internal_key(key, seq, kTypeBlobIndex); | |
206 | inserted_keys.emplace_hint(inserted_keys.end(), | |
207 | internal_key.Encode().ToString(), blob_index); | |
208 | } | |
209 | #endif | |
7c673cae FG |
210 | |
211 | autovector<MemTable*> to_delete; | |
212 | cfd->imm()->Add(new_mem, &to_delete); | |
213 | for (auto& m : to_delete) { | |
214 | delete m; | |
215 | } | |
216 | ||
217 | EventLogger event_logger(db_options_.info_log.get()); | |
11fdf7f2 | 218 | SnapshotChecker* snapshot_checker = nullptr; // not relavant |
7c673cae FG |
219 | FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), |
220 | db_options_, *cfd->GetLatestMutableCFOptions(), | |
494da23a TL |
221 | nullptr /* memtable_id */, env_options_, versions_.get(), |
222 | &mutex_, &shutting_down_, {}, kMaxSequenceNumber, | |
223 | snapshot_checker, &job_context, nullptr, nullptr, nullptr, | |
224 | kNoCompression, db_options_.statistics.get(), | |
225 | &event_logger, true, true /* sync_output_directory */, | |
226 | true /* write_manifest */, Env::Priority::USER); | |
11fdf7f2 TL |
227 | |
228 | HistogramData hist; | |
229 | FileMetaData file_meta; | |
7c673cae FG |
230 | mutex_.Lock(); |
231 | flush_job.PickMemTable(); | |
11fdf7f2 | 232 | ASSERT_OK(flush_job.Run(nullptr, &file_meta)); |
7c673cae | 233 | mutex_.Unlock(); |
11fdf7f2 TL |
234 | db_options_.statistics->histogramData(FLUSH_TIME, &hist); |
235 | ASSERT_GT(hist.average, 0.0); | |
236 | ||
237 | ASSERT_EQ(ToString(0), file_meta.smallest.user_key().ToString()); | |
f67539c2 | 238 | ASSERT_EQ("9999a", file_meta.largest.user_key().ToString()); |
11fdf7f2 | 239 | ASSERT_EQ(1, file_meta.fd.smallest_seqno); |
f67539c2 TL |
240 | #ifndef ROCKSDB_LITE |
241 | ASSERT_EQ(10006, file_meta.fd.largest_seqno); | |
242 | ASSERT_EQ(17, file_meta.oldest_blob_file_number); | |
243 | #else | |
244 | ASSERT_EQ(10000, file_meta.fd.largest_seqno); | |
245 | #endif | |
7c673cae FG |
246 | mock_table_factory_->AssertSingleFile(inserted_keys); |
247 | job_context.Clean(); | |
248 | } | |
249 | ||
494da23a TL |
250 | TEST_F(FlushJobTest, FlushMemTablesSingleColumnFamily) { |
251 | const size_t num_mems = 2; | |
252 | const size_t num_mems_to_flush = 1; | |
253 | const size_t num_keys_per_table = 100; | |
254 | JobContext job_context(0); | |
255 | ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault(); | |
256 | std::vector<uint64_t> memtable_ids; | |
257 | std::vector<MemTable*> new_mems; | |
258 | for (size_t i = 0; i != num_mems; ++i) { | |
259 | MemTable* mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(), | |
260 | kMaxSequenceNumber); | |
261 | mem->SetID(i); | |
262 | mem->Ref(); | |
263 | new_mems.emplace_back(mem); | |
264 | memtable_ids.push_back(mem->GetID()); | |
265 | ||
266 | for (size_t j = 0; j < num_keys_per_table; ++j) { | |
267 | std::string key(ToString(j + i * num_keys_per_table)); | |
268 | std::string value("value" + key); | |
269 | mem->Add(SequenceNumber(j + i * num_keys_per_table), kTypeValue, key, | |
270 | value); | |
271 | } | |
272 | } | |
273 | ||
274 | autovector<MemTable*> to_delete; | |
275 | for (auto mem : new_mems) { | |
276 | cfd->imm()->Add(mem, &to_delete); | |
277 | } | |
278 | ||
279 | EventLogger event_logger(db_options_.info_log.get()); | |
280 | SnapshotChecker* snapshot_checker = nullptr; // not relavant | |
281 | ||
282 | assert(memtable_ids.size() == num_mems); | |
283 | uint64_t smallest_memtable_id = memtable_ids.front(); | |
284 | uint64_t flush_memtable_id = smallest_memtable_id + num_mems_to_flush - 1; | |
285 | ||
286 | FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), | |
287 | db_options_, *cfd->GetLatestMutableCFOptions(), | |
288 | &flush_memtable_id, env_options_, versions_.get(), &mutex_, | |
289 | &shutting_down_, {}, kMaxSequenceNumber, snapshot_checker, | |
290 | &job_context, nullptr, nullptr, nullptr, kNoCompression, | |
291 | db_options_.statistics.get(), &event_logger, true, | |
292 | true /* sync_output_directory */, | |
293 | true /* write_manifest */, Env::Priority::USER); | |
294 | HistogramData hist; | |
295 | FileMetaData file_meta; | |
296 | mutex_.Lock(); | |
297 | flush_job.PickMemTable(); | |
298 | ASSERT_OK(flush_job.Run(nullptr /* prep_tracker */, &file_meta)); | |
299 | mutex_.Unlock(); | |
300 | db_options_.statistics->histogramData(FLUSH_TIME, &hist); | |
301 | ASSERT_GT(hist.average, 0.0); | |
302 | ||
303 | ASSERT_EQ(ToString(0), file_meta.smallest.user_key().ToString()); | |
304 | ASSERT_EQ("99", file_meta.largest.user_key().ToString()); | |
305 | ASSERT_EQ(0, file_meta.fd.smallest_seqno); | |
306 | ASSERT_EQ(SequenceNumber(num_mems_to_flush * num_keys_per_table - 1), | |
307 | file_meta.fd.largest_seqno); | |
f67539c2 | 308 | ASSERT_EQ(kInvalidBlobFileNumber, file_meta.oldest_blob_file_number); |
494da23a TL |
309 | |
310 | for (auto m : to_delete) { | |
311 | delete m; | |
312 | } | |
313 | to_delete.clear(); | |
314 | job_context.Clean(); | |
315 | } | |
316 | ||
317 | TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { | |
318 | autovector<ColumnFamilyData*> all_cfds; | |
319 | for (auto cfd : *versions_->GetColumnFamilySet()) { | |
320 | all_cfds.push_back(cfd); | |
321 | } | |
322 | const std::vector<size_t> num_memtables = {2, 1, 3}; | |
323 | assert(num_memtables.size() == column_family_names_.size()); | |
324 | const size_t num_keys_per_memtable = 1000; | |
325 | JobContext job_context(0); | |
326 | std::vector<uint64_t> memtable_ids; | |
327 | std::vector<SequenceNumber> smallest_seqs; | |
328 | std::vector<SequenceNumber> largest_seqs; | |
329 | autovector<MemTable*> to_delete; | |
330 | SequenceNumber curr_seqno = 0; | |
331 | size_t k = 0; | |
332 | for (auto cfd : all_cfds) { | |
333 | smallest_seqs.push_back(curr_seqno); | |
334 | for (size_t i = 0; i != num_memtables[k]; ++i) { | |
335 | MemTable* mem = cfd->ConstructNewMemtable( | |
336 | *cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber); | |
337 | mem->SetID(i); | |
338 | mem->Ref(); | |
339 | ||
340 | for (size_t j = 0; j != num_keys_per_memtable; ++j) { | |
341 | std::string key(ToString(j + i * num_keys_per_memtable)); | |
342 | std::string value("value" + key); | |
343 | mem->Add(curr_seqno++, kTypeValue, key, value); | |
344 | } | |
345 | ||
346 | cfd->imm()->Add(mem, &to_delete); | |
347 | } | |
348 | largest_seqs.push_back(curr_seqno - 1); | |
349 | memtable_ids.push_back(num_memtables[k++] - 1); | |
350 | } | |
351 | ||
352 | EventLogger event_logger(db_options_.info_log.get()); | |
353 | SnapshotChecker* snapshot_checker = nullptr; // not relevant | |
f67539c2 | 354 | std::vector<std::unique_ptr<FlushJob>> flush_jobs; |
494da23a TL |
355 | k = 0; |
356 | for (auto cfd : all_cfds) { | |
357 | std::vector<SequenceNumber> snapshot_seqs; | |
f67539c2 | 358 | flush_jobs.emplace_back(new FlushJob( |
494da23a TL |
359 | dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(), |
360 | &memtable_ids[k], env_options_, versions_.get(), &mutex_, | |
361 | &shutting_down_, snapshot_seqs, kMaxSequenceNumber, snapshot_checker, | |
362 | &job_context, nullptr, nullptr, nullptr, kNoCompression, | |
363 | db_options_.statistics.get(), &event_logger, true, | |
364 | false /* sync_output_directory */, false /* write_manifest */, | |
f67539c2 | 365 | Env::Priority::USER)); |
494da23a TL |
366 | k++; |
367 | } | |
368 | HistogramData hist; | |
369 | std::vector<FileMetaData> file_metas; | |
370 | // Call reserve to avoid auto-resizing | |
371 | file_metas.reserve(flush_jobs.size()); | |
372 | mutex_.Lock(); | |
373 | for (auto& job : flush_jobs) { | |
f67539c2 | 374 | job->PickMemTable(); |
494da23a TL |
375 | } |
376 | for (auto& job : flush_jobs) { | |
377 | FileMetaData meta; | |
378 | // Run will release and re-acquire mutex | |
f67539c2 | 379 | ASSERT_OK(job->Run(nullptr /**/, &meta)); |
494da23a TL |
380 | file_metas.emplace_back(meta); |
381 | } | |
382 | autovector<FileMetaData*> file_meta_ptrs; | |
383 | for (auto& meta : file_metas) { | |
384 | file_meta_ptrs.push_back(&meta); | |
385 | } | |
386 | autovector<const autovector<MemTable*>*> mems_list; | |
387 | for (size_t i = 0; i != all_cfds.size(); ++i) { | |
f67539c2 | 388 | const auto& mems = flush_jobs[i]->GetMemTables(); |
494da23a TL |
389 | mems_list.push_back(&mems); |
390 | } | |
391 | autovector<const MutableCFOptions*> mutable_cf_options_list; | |
392 | for (auto cfd : all_cfds) { | |
393 | mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions()); | |
394 | } | |
395 | ||
396 | Status s = InstallMemtableAtomicFlushResults( | |
397 | nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list, | |
398 | versions_.get(), &mutex_, file_meta_ptrs, &job_context.memtables_to_free, | |
399 | nullptr /* db_directory */, nullptr /* log_buffer */); | |
400 | ASSERT_OK(s); | |
401 | ||
402 | mutex_.Unlock(); | |
403 | db_options_.statistics->histogramData(FLUSH_TIME, &hist); | |
404 | ASSERT_GT(hist.average, 0.0); | |
405 | k = 0; | |
406 | for (const auto& file_meta : file_metas) { | |
407 | ASSERT_EQ(ToString(0), file_meta.smallest.user_key().ToString()); | |
408 | ASSERT_EQ("999", file_meta.largest.user_key() | |
409 | .ToString()); // max key by bytewise comparator | |
410 | ASSERT_EQ(smallest_seqs[k], file_meta.fd.smallest_seqno); | |
411 | ASSERT_EQ(largest_seqs[k], file_meta.fd.largest_seqno); | |
412 | // Verify that imm is empty | |
413 | ASSERT_EQ(std::numeric_limits<uint64_t>::max(), | |
414 | all_cfds[k]->imm()->GetEarliestMemTableID()); | |
415 | ASSERT_EQ(0, all_cfds[k]->imm()->GetLatestMemTableID()); | |
416 | ++k; | |
417 | } | |
418 | ||
419 | for (auto m : to_delete) { | |
420 | delete m; | |
421 | } | |
422 | to_delete.clear(); | |
423 | job_context.Clean(); | |
424 | } | |
425 | ||
7c673cae FG |
426 | TEST_F(FlushJobTest, Snapshots) { |
427 | JobContext job_context(0); | |
428 | auto cfd = versions_->GetColumnFamilySet()->GetDefault(); | |
429 | auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(), | |
430 | kMaxSequenceNumber); | |
431 | ||
7c673cae FG |
432 | std::set<SequenceNumber> snapshots_set; |
433 | int keys = 10000; | |
434 | int max_inserts_per_keys = 8; | |
435 | ||
436 | Random rnd(301); | |
437 | for (int i = 0; i < keys / 2; ++i) { | |
494da23a | 438 | snapshots_set.insert(rnd.Uniform(keys * (max_inserts_per_keys / 2)) + 1); |
7c673cae | 439 | } |
494da23a TL |
440 | // set has already removed the duplicate snapshots |
441 | std::vector<SequenceNumber> snapshots(snapshots_set.begin(), | |
442 | snapshots_set.end()); | |
7c673cae FG |
443 | |
444 | new_mem->Ref(); | |
445 | SequenceNumber current_seqno = 0; | |
446 | auto inserted_keys = mock::MakeMockFile(); | |
447 | for (int i = 1; i < keys; ++i) { | |
448 | std::string key(ToString(i)); | |
449 | int insertions = rnd.Uniform(max_inserts_per_keys); | |
450 | for (int j = 0; j < insertions; ++j) { | |
451 | std::string value(test::RandomHumanReadableString(&rnd, 10)); | |
452 | auto seqno = ++current_seqno; | |
453 | new_mem->Add(SequenceNumber(seqno), kTypeValue, key, value); | |
454 | // a key is visible only if: | |
455 | // 1. it's the last one written (j == insertions - 1) | |
456 | // 2. there's a snapshot pointing at it | |
457 | bool visible = (j == insertions - 1) || | |
458 | (snapshots_set.find(seqno) != snapshots_set.end()); | |
459 | if (visible) { | |
460 | InternalKey internal_key(key, seqno, kTypeValue); | |
461 | inserted_keys.insert({internal_key.Encode().ToString(), value}); | |
462 | } | |
463 | } | |
464 | } | |
465 | ||
466 | autovector<MemTable*> to_delete; | |
467 | cfd->imm()->Add(new_mem, &to_delete); | |
468 | for (auto& m : to_delete) { | |
469 | delete m; | |
470 | } | |
471 | ||
472 | EventLogger event_logger(db_options_.info_log.get()); | |
11fdf7f2 TL |
473 | SnapshotChecker* snapshot_checker = nullptr; // not relavant |
474 | FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), | |
475 | db_options_, *cfd->GetLatestMutableCFOptions(), | |
494da23a TL |
476 | nullptr /* memtable_id */, env_options_, versions_.get(), |
477 | &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber, | |
478 | snapshot_checker, &job_context, nullptr, nullptr, nullptr, | |
479 | kNoCompression, db_options_.statistics.get(), | |
480 | &event_logger, true, true /* sync_output_directory */, | |
481 | true /* write_manifest */, Env::Priority::USER); | |
7c673cae FG |
482 | mutex_.Lock(); |
483 | flush_job.PickMemTable(); | |
484 | ASSERT_OK(flush_job.Run()); | |
485 | mutex_.Unlock(); | |
486 | mock_table_factory_->AssertSingleFile(inserted_keys); | |
11fdf7f2 TL |
487 | HistogramData hist; |
488 | db_options_.statistics->histogramData(FLUSH_TIME, &hist); | |
489 | ASSERT_GT(hist.average, 0.0); | |
7c673cae FG |
490 | job_context.Clean(); |
491 | } | |
492 | ||
f67539c2 | 493 | } // namespace ROCKSDB_NAMESPACE |
7c673cae FG |
494 | |
495 | int main(int argc, char** argv) { | |
496 | ::testing::InitGoogleTest(&argc, argv); | |
497 | return RUN_ALL_TESTS(); | |
498 | } |