]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db_stress_tool/db_stress_test_base.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db_stress_tool / db_stress_test_base.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 //
10
11 #include <ios>
12
13 #include "util/compression.h"
14 #ifdef GFLAGS
15 #include "db_stress_tool/db_stress_common.h"
16 #include "db_stress_tool/db_stress_compaction_filter.h"
17 #include "db_stress_tool/db_stress_driver.h"
18 #include "db_stress_tool/db_stress_table_properties_collector.h"
19 #include "rocksdb/convenience.h"
20 #include "rocksdb/filter_policy.h"
21 #include "rocksdb/secondary_cache.h"
22 #include "rocksdb/sst_file_manager.h"
23 #include "rocksdb/types.h"
24 #include "rocksdb/utilities/object_registry.h"
25 #include "rocksdb/utilities/write_batch_with_index.h"
26 #include "test_util/testutil.h"
27 #include "util/cast_util.h"
28 #include "utilities/backup/backup_engine_impl.h"
29 #include "utilities/fault_injection_fs.h"
30 #include "utilities/fault_injection_secondary_cache.h"
31
32 namespace ROCKSDB_NAMESPACE {
33
34 namespace {
35
36 std::shared_ptr<const FilterPolicy> CreateFilterPolicy() {
37 if (FLAGS_bloom_bits < 0) {
38 return BlockBasedTableOptions().filter_policy;
39 }
40 const FilterPolicy* new_policy;
41 if (FLAGS_ribbon_starting_level >= 999) {
42 // Use Bloom API
43 new_policy = NewBloomFilterPolicy(FLAGS_bloom_bits, false);
44 } else {
45 new_policy = NewRibbonFilterPolicy(
46 FLAGS_bloom_bits, /* bloom_before_level */ FLAGS_ribbon_starting_level);
47 }
48 return std::shared_ptr<const FilterPolicy>(new_policy);
49 }
50
51 } // namespace
52
53 StressTest::StressTest()
54 : cache_(NewCache(FLAGS_cache_size, FLAGS_cache_numshardbits)),
55 compressed_cache_(NewLRUCache(FLAGS_compressed_cache_size,
56 FLAGS_compressed_cache_numshardbits)),
57 filter_policy_(CreateFilterPolicy()),
58 db_(nullptr),
59 #ifndef ROCKSDB_LITE
60 txn_db_(nullptr),
61 #endif
62 db_aptr_(nullptr),
63 clock_(db_stress_env->GetSystemClock().get()),
64 new_column_family_name_(1),
65 num_times_reopened_(0),
66 db_preload_finished_(false),
67 cmp_db_(nullptr),
68 is_db_stopped_(false) {
69 if (FLAGS_destroy_db_initially) {
70 std::vector<std::string> files;
71 db_stress_env->GetChildren(FLAGS_db, &files);
72 for (unsigned int i = 0; i < files.size(); i++) {
73 if (Slice(files[i]).starts_with("heap-")) {
74 db_stress_env->DeleteFile(FLAGS_db + "/" + files[i]);
75 }
76 }
77
78 Options options;
79 options.env = db_stress_env;
80 // Remove files without preserving manfiest files
81 #ifndef ROCKSDB_LITE
82 const Status s = !FLAGS_use_blob_db
83 ? DestroyDB(FLAGS_db, options)
84 : blob_db::DestroyBlobDB(FLAGS_db, options,
85 blob_db::BlobDBOptions());
86 #else
87 const Status s = DestroyDB(FLAGS_db, options);
88 #endif // !ROCKSDB_LITE
89
90 if (!s.ok()) {
91 fprintf(stderr, "Cannot destroy original db: %s\n", s.ToString().c_str());
92 exit(1);
93 }
94 }
95 }
96
97 StressTest::~StressTest() {
98 for (auto cf : column_families_) {
99 delete cf;
100 }
101 column_families_.clear();
102 delete db_;
103
104 for (auto* cf : cmp_cfhs_) {
105 delete cf;
106 }
107 cmp_cfhs_.clear();
108 delete cmp_db_;
109 }
110
111 std::shared_ptr<Cache> StressTest::NewCache(size_t capacity,
112 int32_t num_shard_bits) {
113 ConfigOptions config_options;
114 if (capacity <= 0) {
115 return nullptr;
116 }
117
118 if (FLAGS_cache_type == "clock_cache") {
119 fprintf(stderr, "Old clock cache implementation has been removed.\n");
120 exit(1);
121 } else if (FLAGS_cache_type == "hyper_clock_cache") {
122 return HyperClockCacheOptions(static_cast<size_t>(capacity),
123 FLAGS_block_size /*estimated_entry_charge*/,
124 num_shard_bits)
125 .MakeSharedCache();
126 } else if (FLAGS_cache_type == "lru_cache") {
127 LRUCacheOptions opts;
128 opts.capacity = capacity;
129 opts.num_shard_bits = num_shard_bits;
130 #ifndef ROCKSDB_LITE
131 std::shared_ptr<SecondaryCache> secondary_cache;
132 if (!FLAGS_secondary_cache_uri.empty()) {
133 Status s = SecondaryCache::CreateFromString(
134 config_options, FLAGS_secondary_cache_uri, &secondary_cache);
135 if (secondary_cache == nullptr) {
136 fprintf(stderr,
137 "No secondary cache registered matching string: %s status=%s\n",
138 FLAGS_secondary_cache_uri.c_str(), s.ToString().c_str());
139 exit(1);
140 }
141 if (FLAGS_secondary_cache_fault_one_in > 0) {
142 secondary_cache = std::make_shared<FaultInjectionSecondaryCache>(
143 secondary_cache, static_cast<uint32_t>(FLAGS_seed),
144 FLAGS_secondary_cache_fault_one_in);
145 }
146 opts.secondary_cache = secondary_cache;
147 }
148 #endif
149 return NewLRUCache(opts);
150 } else {
151 fprintf(stderr, "Cache type not supported.");
152 exit(1);
153 }
154 }
155
156 std::vector<std::string> StressTest::GetBlobCompressionTags() {
157 std::vector<std::string> compression_tags{"kNoCompression"};
158
159 if (Snappy_Supported()) {
160 compression_tags.emplace_back("kSnappyCompression");
161 }
162 if (LZ4_Supported()) {
163 compression_tags.emplace_back("kLZ4Compression");
164 }
165 if (ZSTD_Supported()) {
166 compression_tags.emplace_back("kZSTD");
167 }
168
169 return compression_tags;
170 }
171
172 bool StressTest::BuildOptionsTable() {
173 if (FLAGS_set_options_one_in <= 0) {
174 return true;
175 }
176
177 std::unordered_map<std::string, std::vector<std::string>> options_tbl = {
178 {"write_buffer_size",
179 {std::to_string(options_.write_buffer_size),
180 std::to_string(options_.write_buffer_size * 2),
181 std::to_string(options_.write_buffer_size * 4)}},
182 {"max_write_buffer_number",
183 {std::to_string(options_.max_write_buffer_number),
184 std::to_string(options_.max_write_buffer_number * 2),
185 std::to_string(options_.max_write_buffer_number * 4)}},
186 {"arena_block_size",
187 {
188 std::to_string(options_.arena_block_size),
189 std::to_string(options_.write_buffer_size / 4),
190 std::to_string(options_.write_buffer_size / 8),
191 }},
192 {"memtable_huge_page_size", {"0", std::to_string(2 * 1024 * 1024)}},
193 {"max_successive_merges", {"0", "2", "4"}},
194 {"inplace_update_num_locks", {"100", "200", "300"}},
195 // TODO: re-enable once internal task T124324915 is fixed.
196 // {"experimental_mempurge_threshold", {"0.0", "1.0"}},
197 // TODO(ljin): enable test for this option
198 // {"disable_auto_compactions", {"100", "200", "300"}},
199 {"level0_file_num_compaction_trigger",
200 {
201 std::to_string(options_.level0_file_num_compaction_trigger),
202 std::to_string(options_.level0_file_num_compaction_trigger + 2),
203 std::to_string(options_.level0_file_num_compaction_trigger + 4),
204 }},
205 {"level0_slowdown_writes_trigger",
206 {
207 std::to_string(options_.level0_slowdown_writes_trigger),
208 std::to_string(options_.level0_slowdown_writes_trigger + 2),
209 std::to_string(options_.level0_slowdown_writes_trigger + 4),
210 }},
211 {"level0_stop_writes_trigger",
212 {
213 std::to_string(options_.level0_stop_writes_trigger),
214 std::to_string(options_.level0_stop_writes_trigger + 2),
215 std::to_string(options_.level0_stop_writes_trigger + 4),
216 }},
217 {"max_compaction_bytes",
218 {
219 std::to_string(options_.target_file_size_base * 5),
220 std::to_string(options_.target_file_size_base * 15),
221 std::to_string(options_.target_file_size_base * 100),
222 }},
223 {"target_file_size_base",
224 {
225 std::to_string(options_.target_file_size_base),
226 std::to_string(options_.target_file_size_base * 2),
227 std::to_string(options_.target_file_size_base * 4),
228 }},
229 {"target_file_size_multiplier",
230 {
231 std::to_string(options_.target_file_size_multiplier),
232 "1",
233 "2",
234 }},
235 {"max_bytes_for_level_base",
236 {
237 std::to_string(options_.max_bytes_for_level_base / 2),
238 std::to_string(options_.max_bytes_for_level_base),
239 std::to_string(options_.max_bytes_for_level_base * 2),
240 }},
241 {"max_bytes_for_level_multiplier",
242 {
243 std::to_string(options_.max_bytes_for_level_multiplier),
244 "1",
245 "2",
246 }},
247 {"max_sequential_skip_in_iterations", {"4", "8", "12"}},
248 };
249
250 if (FLAGS_allow_setting_blob_options_dynamically) {
251 options_tbl.emplace("enable_blob_files",
252 std::vector<std::string>{"false", "true"});
253 options_tbl.emplace("min_blob_size",
254 std::vector<std::string>{"0", "8", "16"});
255 options_tbl.emplace("blob_file_size",
256 std::vector<std::string>{"1M", "16M", "256M", "1G"});
257 options_tbl.emplace("blob_compression_type", GetBlobCompressionTags());
258 options_tbl.emplace("enable_blob_garbage_collection",
259 std::vector<std::string>{"false", "true"});
260 options_tbl.emplace(
261 "blob_garbage_collection_age_cutoff",
262 std::vector<std::string>{"0.0", "0.25", "0.5", "0.75", "1.0"});
263 options_tbl.emplace("blob_garbage_collection_force_threshold",
264 std::vector<std::string>{"0.5", "0.75", "1.0"});
265 options_tbl.emplace("blob_compaction_readahead_size",
266 std::vector<std::string>{"0", "1M", "4M"});
267 options_tbl.emplace("blob_file_starting_level",
268 std::vector<std::string>{"0", "1", "2"});
269 options_tbl.emplace("prepopulate_blob_cache",
270 std::vector<std::string>{"kDisable", "kFlushOnly"});
271 }
272
273 options_table_ = std::move(options_tbl);
274
275 for (const auto& iter : options_table_) {
276 options_index_.push_back(iter.first);
277 }
278 return true;
279 }
280
281 void StressTest::InitDb(SharedState* shared) {
282 uint64_t now = clock_->NowMicros();
283 fprintf(stdout, "%s Initializing db_stress\n",
284 clock_->TimeToString(now / 1000000).c_str());
285 PrintEnv();
286 Open(shared);
287 BuildOptionsTable();
288 }
289
290 void StressTest::FinishInitDb(SharedState* shared) {
291 if (FLAGS_read_only) {
292 uint64_t now = clock_->NowMicros();
293 fprintf(stdout, "%s Preloading db with %" PRIu64 " KVs\n",
294 clock_->TimeToString(now / 1000000).c_str(), FLAGS_max_key);
295 PreloadDbAndReopenAsReadOnly(FLAGS_max_key, shared);
296 }
297
298 if (shared->HasHistory()) {
299 // The way it works right now is, if there's any history, that means the
300 // previous run mutating the DB had all its operations traced, in which case
301 // we should always be able to `Restore()` the expected values to match the
302 // `db_`'s current seqno.
303 Status s = shared->Restore(db_);
304 if (!s.ok()) {
305 fprintf(stderr, "Error restoring historical expected values: %s\n",
306 s.ToString().c_str());
307 exit(1);
308 }
309 }
310 #ifndef ROCKSDB_LITE
311 if (FLAGS_use_txn) {
312 // It's OK here without sync because unsynced data cannot be lost at this
313 // point
314 // - even with sync_fault_injection=1 as the
315 // file is still directly writable until after FinishInitDb()
316 ProcessRecoveredPreparedTxns(shared);
317 }
318 #endif
319 if (FLAGS_enable_compaction_filter) {
320 auto* compaction_filter_factory =
321 reinterpret_cast<DbStressCompactionFilterFactory*>(
322 options_.compaction_filter_factory.get());
323 assert(compaction_filter_factory);
324 // This must be called only after any potential `SharedState::Restore()` has
325 // completed in order for the `compaction_filter_factory` to operate on the
326 // correct latest values file.
327 compaction_filter_factory->SetSharedState(shared);
328 fprintf(stdout, "Compaction filter factory: %s\n",
329 compaction_filter_factory->Name());
330 }
331 }
332
333 void StressTest::TrackExpectedState(SharedState* shared) {
334 // For `FLAGS_manual_wal_flush_one_inWAL`
335 // data can be lost when `manual_wal_flush_one_in > 0` and `FlushWAL()` is not
336 // explictly called by users of RocksDB (in our case, db stress).
337 // Therefore recovery from such potential WAL data loss is a prefix recovery
338 // that requires tracing
339 if ((FLAGS_sync_fault_injection || FLAGS_disable_wal ||
340 FLAGS_manual_wal_flush_one_in > 0) &&
341 IsStateTracked()) {
342 Status s = shared->SaveAtAndAfter(db_);
343 if (!s.ok()) {
344 fprintf(stderr, "Error enabling history tracing: %s\n",
345 s.ToString().c_str());
346 exit(1);
347 }
348 }
349 }
350
351 Status StressTest::AssertSame(DB* db, ColumnFamilyHandle* cf,
352 ThreadState::SnapshotState& snap_state) {
353 Status s;
354 if (cf->GetName() != snap_state.cf_at_name) {
355 return s;
356 }
357 // This `ReadOptions` is for validation purposes. Ignore
358 // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
359 ReadOptions ropt;
360 ropt.snapshot = snap_state.snapshot;
361 Slice ts;
362 if (!snap_state.timestamp.empty()) {
363 ts = snap_state.timestamp;
364 ropt.timestamp = &ts;
365 }
366 PinnableSlice exp_v(&snap_state.value);
367 exp_v.PinSelf();
368 PinnableSlice v;
369 s = db->Get(ropt, cf, snap_state.key, &v);
370 if (!s.ok() && !s.IsNotFound()) {
371 return s;
372 }
373 if (snap_state.status != s) {
374 return Status::Corruption(
375 "The snapshot gave inconsistent results for key " +
376 std::to_string(Hash(snap_state.key.c_str(), snap_state.key.size(), 0)) +
377 " in cf " + cf->GetName() + ": (" + snap_state.status.ToString() +
378 ") vs. (" + s.ToString() + ")");
379 }
380 if (s.ok()) {
381 if (exp_v != v) {
382 return Status::Corruption("The snapshot gave inconsistent values: (" +
383 exp_v.ToString() + ") vs. (" + v.ToString() +
384 ")");
385 }
386 }
387 if (snap_state.key_vec != nullptr) {
388 // When `prefix_extractor` is set, seeking to beginning and scanning
389 // across prefixes are only supported with `total_order_seek` set.
390 ropt.total_order_seek = true;
391 std::unique_ptr<Iterator> iterator(db->NewIterator(ropt));
392 std::unique_ptr<std::vector<bool>> tmp_bitvec(
393 new std::vector<bool>(FLAGS_max_key));
394 for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
395 uint64_t key_val;
396 if (GetIntVal(iterator->key().ToString(), &key_val)) {
397 (*tmp_bitvec.get())[key_val] = true;
398 }
399 }
400 if (!std::equal(snap_state.key_vec->begin(), snap_state.key_vec->end(),
401 tmp_bitvec.get()->begin())) {
402 return Status::Corruption("Found inconsistent keys at this snapshot");
403 }
404 }
405 return Status::OK();
406 }
407
408 void StressTest::VerificationAbort(SharedState* shared, std::string msg,
409 Status s) const {
410 fprintf(stderr, "Verification failed: %s. Status is %s\n", msg.c_str(),
411 s.ToString().c_str());
412 shared->SetVerificationFailure();
413 }
414
415 void StressTest::VerificationAbort(SharedState* shared, std::string msg, int cf,
416 int64_t key) const {
417 auto key_str = Key(key);
418 Slice key_slice = key_str;
419 fprintf(stderr,
420 "Verification failed for column family %d key %s (%" PRIi64 "): %s\n",
421 cf, key_slice.ToString(true).c_str(), key, msg.c_str());
422 shared->SetVerificationFailure();
423 }
424
425 void StressTest::VerificationAbort(SharedState* shared, std::string msg, int cf,
426 int64_t key, Slice value_from_db,
427 Slice value_from_expected) const {
428 auto key_str = Key(key);
429 fprintf(stderr,
430 "Verification failed for column family %d key %s (%" PRIi64
431 "): value_from_db: %s, value_from_expected: %s, msg: %s\n",
432 cf, Slice(key_str).ToString(true).c_str(), key,
433 value_from_db.ToString(true).c_str(),
434 value_from_expected.ToString(true).c_str(), msg.c_str());
435 shared->SetVerificationFailure();
436 }
437
438 void StressTest::VerificationAbort(SharedState* shared, int cf, int64_t key,
439 const Slice& value,
440 const WideColumns& columns,
441 const WideColumns& expected_columns) const {
442 assert(shared);
443
444 auto key_str = Key(key);
445
446 fprintf(stderr,
447 "Verification failed for column family %d key %s (%" PRIi64
448 "): Value and columns inconsistent: %s\n",
449 cf, Slice(key_str).ToString(/* hex */ true).c_str(), key,
450 DebugString(value, columns, expected_columns).c_str());
451
452 shared->SetVerificationFailure();
453 }
454
455 std::string StressTest::DebugString(const Slice& value,
456 const WideColumns& columns,
457 const WideColumns& expected_columns) {
458 std::ostringstream oss;
459
460 oss << "value: " << value.ToString(/* hex */ true);
461
462 auto dump = [](const WideColumns& cols, std::ostream& os) {
463 if (cols.empty()) {
464 return;
465 }
466
467 os << std::hex;
468
469 auto it = cols.begin();
470 os << *it;
471 for (++it; it != cols.end(); ++it) {
472 os << ' ' << *it;
473 }
474 };
475
476 oss << ", columns: ";
477 dump(columns, oss);
478
479 oss << ", expected_columns: ";
480 dump(expected_columns, oss);
481
482 return oss.str();
483 }
484
485 void StressTest::PrintStatistics() {
486 if (dbstats) {
487 fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str());
488 }
489 if (dbstats_secondaries) {
490 fprintf(stdout, "Secondary instances STATISTICS:\n%s\n",
491 dbstats_secondaries->ToString().c_str());
492 }
493 }
494
495 // Currently PreloadDb has to be single-threaded.
496 void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
497 SharedState* shared) {
498 WriteOptions write_opts;
499 write_opts.disableWAL = FLAGS_disable_wal;
500 if (FLAGS_sync) {
501 write_opts.sync = true;
502 }
503 if (FLAGS_rate_limit_auto_wal_flush) {
504 write_opts.rate_limiter_priority = Env::IO_USER;
505 }
506 char value[100];
507 int cf_idx = 0;
508 Status s;
509 for (auto cfh : column_families_) {
510 for (int64_t k = 0; k != number_of_keys; ++k) {
511 const std::string key = Key(k);
512
513 constexpr uint32_t value_base = 0;
514 const size_t sz = GenerateValue(value_base, value, sizeof(value));
515
516 const Slice v(value, sz);
517
518 shared->Put(cf_idx, k, value_base, true /* pending */);
519
520 std::string ts;
521 if (FLAGS_user_timestamp_size > 0) {
522 ts = GetNowNanos();
523 }
524
525 if (FLAGS_use_merge) {
526 if (!FLAGS_use_txn) {
527 if (FLAGS_user_timestamp_size > 0) {
528 s = db_->Merge(write_opts, cfh, key, ts, v);
529 } else {
530 s = db_->Merge(write_opts, cfh, key, v);
531 }
532 } else {
533 #ifndef ROCKSDB_LITE
534 Transaction* txn;
535 s = NewTxn(write_opts, &txn);
536 if (s.ok()) {
537 s = txn->Merge(cfh, key, v);
538 if (s.ok()) {
539 s = CommitTxn(txn);
540 }
541 }
542 #endif
543 }
544 } else if (FLAGS_use_put_entity_one_in > 0) {
545 s = db_->PutEntity(write_opts, cfh, key,
546 GenerateWideColumns(value_base, v));
547 } else {
548 if (!FLAGS_use_txn) {
549 if (FLAGS_user_timestamp_size > 0) {
550 s = db_->Put(write_opts, cfh, key, ts, v);
551 } else {
552 s = db_->Put(write_opts, cfh, key, v);
553 }
554 } else {
555 #ifndef ROCKSDB_LITE
556 Transaction* txn;
557 s = NewTxn(write_opts, &txn);
558 if (s.ok()) {
559 s = txn->Put(cfh, key, v);
560 if (s.ok()) {
561 s = CommitTxn(txn);
562 }
563 }
564 #endif
565 }
566 }
567
568 shared->Put(cf_idx, k, value_base, false /* pending */);
569 if (!s.ok()) {
570 break;
571 }
572 }
573 if (!s.ok()) {
574 break;
575 }
576 ++cf_idx;
577 }
578 if (s.ok()) {
579 s = db_->Flush(FlushOptions(), column_families_);
580 }
581 if (s.ok()) {
582 for (auto cf : column_families_) {
583 delete cf;
584 }
585 column_families_.clear();
586 delete db_;
587 db_ = nullptr;
588 #ifndef ROCKSDB_LITE
589 txn_db_ = nullptr;
590 #endif
591
592 db_preload_finished_.store(true);
593 auto now = clock_->NowMicros();
594 fprintf(stdout, "%s Reopening database in read-only\n",
595 clock_->TimeToString(now / 1000000).c_str());
596 // Reopen as read-only, can ignore all options related to updates
597 Open(shared);
598 } else {
599 fprintf(stderr, "Failed to preload db");
600 exit(1);
601 }
602 }
603
604 Status StressTest::SetOptions(ThreadState* thread) {
605 assert(FLAGS_set_options_one_in > 0);
606 std::unordered_map<std::string, std::string> opts;
607 std::string name =
608 options_index_[thread->rand.Next() % options_index_.size()];
609 int value_idx = thread->rand.Next() % options_table_[name].size();
610 if (name == "level0_file_num_compaction_trigger" ||
611 name == "level0_slowdown_writes_trigger" ||
612 name == "level0_stop_writes_trigger") {
613 opts["level0_file_num_compaction_trigger"] =
614 options_table_["level0_file_num_compaction_trigger"][value_idx];
615 opts["level0_slowdown_writes_trigger"] =
616 options_table_["level0_slowdown_writes_trigger"][value_idx];
617 opts["level0_stop_writes_trigger"] =
618 options_table_["level0_stop_writes_trigger"][value_idx];
619 } else {
620 opts[name] = options_table_[name][value_idx];
621 }
622
623 int rand_cf_idx = thread->rand.Next() % FLAGS_column_families;
624 auto cfh = column_families_[rand_cf_idx];
625 return db_->SetOptions(cfh, opts);
626 }
627
628 #ifndef ROCKSDB_LITE
629 void StressTest::ProcessRecoveredPreparedTxns(SharedState* shared) {
630 assert(txn_db_);
631 std::vector<Transaction*> recovered_prepared_trans;
632 txn_db_->GetAllPreparedTransactions(&recovered_prepared_trans);
633 for (Transaction* txn : recovered_prepared_trans) {
634 ProcessRecoveredPreparedTxnsHelper(txn, shared);
635 delete txn;
636 }
637 recovered_prepared_trans.clear();
638 txn_db_->GetAllPreparedTransactions(&recovered_prepared_trans);
639 assert(recovered_prepared_trans.size() == 0);
640 }
641
642 void StressTest::ProcessRecoveredPreparedTxnsHelper(Transaction* txn,
643 SharedState* shared) {
644 thread_local Random rand(static_cast<uint32_t>(FLAGS_seed));
645 for (size_t i = 0; i < column_families_.size(); ++i) {
646 std::unique_ptr<WBWIIterator> wbwi_iter(
647 txn->GetWriteBatch()->NewIterator(column_families_[i]));
648 for (wbwi_iter->SeekToFirst(); wbwi_iter->Valid(); wbwi_iter->Next()) {
649 uint64_t key_val;
650 if (GetIntVal(wbwi_iter->Entry().key.ToString(), &key_val)) {
651 shared->Put(static_cast<int>(i) /* cf_idx */, key_val,
652 0 /* value_base */, true /* pending */);
653 }
654 }
655 }
656 if (rand.OneIn(2)) {
657 Status s = txn->Commit();
658 assert(s.ok());
659 } else {
660 Status s = txn->Rollback();
661 assert(s.ok());
662 }
663 }
664
665 Status StressTest::NewTxn(WriteOptions& write_opts, Transaction** txn) {
666 if (!FLAGS_use_txn) {
667 return Status::InvalidArgument("NewTxn when FLAGS_use_txn is not set");
668 }
669 write_opts.disableWAL = FLAGS_disable_wal;
670 static std::atomic<uint64_t> txn_id = {0};
671 TransactionOptions txn_options;
672 txn_options.use_only_the_last_commit_time_batch_for_recovery =
673 FLAGS_use_only_the_last_commit_time_batch_for_recovery;
674 txn_options.lock_timeout = 600000; // 10 min
675 txn_options.deadlock_detect = true;
676 *txn = txn_db_->BeginTransaction(write_opts, txn_options);
677 auto istr = std::to_string(txn_id.fetch_add(1));
678 Status s = (*txn)->SetName("xid" + istr);
679 return s;
680 }
681
682 Status StressTest::CommitTxn(Transaction* txn, ThreadState* thread) {
683 if (!FLAGS_use_txn) {
684 return Status::InvalidArgument("CommitTxn when FLAGS_use_txn is not set");
685 }
686 assert(txn_db_);
687 Status s = txn->Prepare();
688 std::shared_ptr<const Snapshot> timestamped_snapshot;
689 if (s.ok()) {
690 if (thread && FLAGS_create_timestamped_snapshot_one_in &&
691 thread->rand.OneIn(FLAGS_create_timestamped_snapshot_one_in)) {
692 uint64_t ts = db_stress_env->NowNanos();
693 s = txn->CommitAndTryCreateSnapshot(/*notifier=*/nullptr, ts,
694 &timestamped_snapshot);
695
696 std::pair<Status, std::shared_ptr<const Snapshot>> res;
697 if (thread->tid == 0) {
698 uint64_t now = db_stress_env->NowNanos();
699 res = txn_db_->CreateTimestampedSnapshot(now);
700 if (res.first.ok()) {
701 assert(res.second);
702 assert(res.second->GetTimestamp() == now);
703 if (timestamped_snapshot) {
704 assert(res.second->GetTimestamp() >
705 timestamped_snapshot->GetTimestamp());
706 }
707 } else {
708 assert(!res.second);
709 }
710 }
711 } else {
712 s = txn->Commit();
713 }
714 }
715 if (thread && FLAGS_create_timestamped_snapshot_one_in > 0 &&
716 thread->rand.OneInOpt(50000)) {
717 uint64_t now = db_stress_env->NowNanos();
718 constexpr uint64_t time_diff = static_cast<uint64_t>(1000) * 1000 * 1000;
719 txn_db_->ReleaseTimestampedSnapshotsOlderThan(now - time_diff);
720 }
721 delete txn;
722 return s;
723 }
724
725 Status StressTest::RollbackTxn(Transaction* txn) {
726 if (!FLAGS_use_txn) {
727 return Status::InvalidArgument(
728 "RollbackTxn when FLAGS_use_txn is not"
729 " set");
730 }
731 Status s = txn->Rollback();
732 delete txn;
733 return s;
734 }
735 #endif
736
737 void StressTest::OperateDb(ThreadState* thread) {
738 ReadOptions read_opts(FLAGS_verify_checksum, true);
739 read_opts.rate_limiter_priority =
740 FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
741 read_opts.async_io = FLAGS_async_io;
742 read_opts.adaptive_readahead = FLAGS_adaptive_readahead;
743 read_opts.readahead_size = FLAGS_readahead_size;
744 WriteOptions write_opts;
745 if (FLAGS_rate_limit_auto_wal_flush) {
746 write_opts.rate_limiter_priority = Env::IO_USER;
747 }
748 auto shared = thread->shared;
749 char value[100];
750 std::string from_db;
751 if (FLAGS_sync) {
752 write_opts.sync = true;
753 }
754 write_opts.disableWAL = FLAGS_disable_wal;
755 write_opts.protection_bytes_per_key = FLAGS_batch_protection_bytes_per_key;
756 const int prefix_bound = static_cast<int>(FLAGS_readpercent) +
757 static_cast<int>(FLAGS_prefixpercent);
758 const int write_bound = prefix_bound + static_cast<int>(FLAGS_writepercent);
759 const int del_bound = write_bound + static_cast<int>(FLAGS_delpercent);
760 const int delrange_bound =
761 del_bound + static_cast<int>(FLAGS_delrangepercent);
762 const int iterate_bound =
763 delrange_bound + static_cast<int>(FLAGS_iterpercent);
764
765 const uint64_t ops_per_open = FLAGS_ops_per_thread / (FLAGS_reopen + 1);
766
767 #ifndef NDEBUG
768 if (FLAGS_read_fault_one_in) {
769 fault_fs_guard->SetThreadLocalReadErrorContext(thread->shared->GetSeed(),
770 FLAGS_read_fault_one_in);
771 }
772 #endif // NDEBUG
773 if (FLAGS_write_fault_one_in) {
774 IOStatus error_msg;
775 if (FLAGS_injest_error_severity <= 1 || FLAGS_injest_error_severity > 2) {
776 error_msg = IOStatus::IOError("Retryable IO Error");
777 error_msg.SetRetryable(true);
778 } else if (FLAGS_injest_error_severity == 2) {
779 // Ingest the fatal error
780 error_msg = IOStatus::IOError("Fatal IO Error");
781 error_msg.SetDataLoss(true);
782 }
783 std::vector<FileType> types = {FileType::kTableFile,
784 FileType::kDescriptorFile,
785 FileType::kCurrentFile};
786 fault_fs_guard->SetRandomWriteError(
787 thread->shared->GetSeed(), FLAGS_write_fault_one_in, error_msg,
788 /*inject_for_all_file_types=*/false, types);
789 }
790 thread->stats.Start();
791 for (int open_cnt = 0; open_cnt <= FLAGS_reopen; ++open_cnt) {
792 if (thread->shared->HasVerificationFailedYet() ||
793 thread->shared->ShouldStopTest()) {
794 break;
795 }
796 if (open_cnt != 0) {
797 thread->stats.FinishedSingleOp();
798 MutexLock l(thread->shared->GetMutex());
799 while (!thread->snapshot_queue.empty()) {
800 db_->ReleaseSnapshot(thread->snapshot_queue.front().second.snapshot);
801 delete thread->snapshot_queue.front().second.key_vec;
802 thread->snapshot_queue.pop();
803 }
804 thread->shared->IncVotedReopen();
805 if (thread->shared->AllVotedReopen()) {
806 thread->shared->GetStressTest()->Reopen(thread);
807 thread->shared->GetCondVar()->SignalAll();
808 } else {
809 thread->shared->GetCondVar()->Wait();
810 }
811 // Commenting this out as we don't want to reset stats on each open.
812 // thread->stats.Start();
813 }
814
815 for (uint64_t i = 0; i < ops_per_open; i++) {
816 if (thread->shared->HasVerificationFailedYet()) {
817 break;
818 }
819
820 // Change Options
821 if (thread->rand.OneInOpt(FLAGS_set_options_one_in)) {
822 SetOptions(thread);
823 }
824
825 if (thread->rand.OneInOpt(FLAGS_set_in_place_one_in)) {
826 options_.inplace_update_support ^= options_.inplace_update_support;
827 }
828
829 if (thread->tid == 0 && FLAGS_verify_db_one_in > 0 &&
830 thread->rand.OneIn(FLAGS_verify_db_one_in)) {
831 ContinuouslyVerifyDb(thread);
832 if (thread->shared->ShouldStopTest()) {
833 break;
834 }
835 }
836
837 MaybeClearOneColumnFamily(thread);
838
839 if (thread->rand.OneInOpt(FLAGS_manual_wal_flush_one_in)) {
840 bool sync = thread->rand.OneIn(2) ? true : false;
841 Status s = db_->FlushWAL(sync);
842 if (!s.ok() && !(sync && s.IsNotSupported())) {
843 fprintf(stderr, "FlushWAL(sync=%s) failed: %s\n",
844 (sync ? "true" : "false"), s.ToString().c_str());
845 }
846 }
847
848 if (thread->rand.OneInOpt(FLAGS_sync_wal_one_in)) {
849 Status s = db_->SyncWAL();
850 if (!s.ok() && !s.IsNotSupported()) {
851 fprintf(stderr, "SyncWAL() failed: %s\n", s.ToString().c_str());
852 }
853 }
854
855 int rand_column_family = thread->rand.Next() % FLAGS_column_families;
856 ColumnFamilyHandle* column_family = column_families_[rand_column_family];
857
858 if (thread->rand.OneInOpt(FLAGS_compact_files_one_in)) {
859 TestCompactFiles(thread, column_family);
860 }
861
862 int64_t rand_key = GenerateOneKey(thread, i);
863 std::string keystr = Key(rand_key);
864 Slice key = keystr;
865
866 if (thread->rand.OneInOpt(FLAGS_compact_range_one_in)) {
867 TestCompactRange(thread, rand_key, key, column_family);
868 if (thread->shared->HasVerificationFailedYet()) {
869 break;
870 }
871 }
872
873 std::vector<int> rand_column_families =
874 GenerateColumnFamilies(FLAGS_column_families, rand_column_family);
875
876 if (thread->rand.OneInOpt(FLAGS_flush_one_in)) {
877 Status status = TestFlush(rand_column_families);
878 if (!status.ok()) {
879 fprintf(stdout, "Unable to perform Flush(): %s\n",
880 status.ToString().c_str());
881 }
882 }
883
884 #ifndef ROCKSDB_LITE
885 // Verify GetLiveFiles with a 1 in N chance.
886 if (thread->rand.OneInOpt(FLAGS_get_live_files_one_in) &&
887 !FLAGS_write_fault_one_in) {
888 Status status = VerifyGetLiveFiles();
889 if (!status.ok()) {
890 VerificationAbort(shared, "VerifyGetLiveFiles status not OK", status);
891 }
892 }
893
894 // Verify GetSortedWalFiles with a 1 in N chance.
895 if (thread->rand.OneInOpt(FLAGS_get_sorted_wal_files_one_in)) {
896 Status status = VerifyGetSortedWalFiles();
897 if (!status.ok()) {
898 VerificationAbort(shared, "VerifyGetSortedWalFiles status not OK",
899 status);
900 }
901 }
902
903 // Verify GetCurrentWalFile with a 1 in N chance.
904 if (thread->rand.OneInOpt(FLAGS_get_current_wal_file_one_in)) {
905 Status status = VerifyGetCurrentWalFile();
906 if (!status.ok()) {
907 VerificationAbort(shared, "VerifyGetCurrentWalFile status not OK",
908 status);
909 }
910 }
911 #endif // !ROCKSDB_LITE
912
913 if (thread->rand.OneInOpt(FLAGS_pause_background_one_in)) {
914 Status status = TestPauseBackground(thread);
915 if (!status.ok()) {
916 VerificationAbort(
917 shared, "Pause/ContinueBackgroundWork status not OK", status);
918 }
919 }
920
921 #ifndef ROCKSDB_LITE
922 if (thread->rand.OneInOpt(FLAGS_verify_checksum_one_in)) {
923 Status status = db_->VerifyChecksum();
924 if (!status.ok()) {
925 VerificationAbort(shared, "VerifyChecksum status not OK", status);
926 }
927 }
928
929 if (thread->rand.OneInOpt(FLAGS_get_property_one_in)) {
930 TestGetProperty(thread);
931 }
932 #endif
933
934 std::vector<int64_t> rand_keys = GenerateKeys(rand_key);
935
936 if (thread->rand.OneInOpt(FLAGS_ingest_external_file_one_in)) {
937 TestIngestExternalFile(thread, rand_column_families, rand_keys);
938 }
939
940 if (thread->rand.OneInOpt(FLAGS_backup_one_in)) {
941 // Beyond a certain DB size threshold, this test becomes heavier than
942 // it's worth.
943 uint64_t total_size = 0;
944 if (FLAGS_backup_max_size > 0) {
945 std::vector<FileAttributes> files;
946 db_stress_env->GetChildrenFileAttributes(FLAGS_db, &files);
947 for (auto& file : files) {
948 total_size += file.size_bytes;
949 }
950 }
951
952 if (total_size <= FLAGS_backup_max_size) {
953 Status s = TestBackupRestore(thread, rand_column_families, rand_keys);
954 if (!s.ok()) {
955 VerificationAbort(shared, "Backup/restore gave inconsistent state",
956 s);
957 }
958 }
959 }
960
961 if (thread->rand.OneInOpt(FLAGS_checkpoint_one_in)) {
962 Status s = TestCheckpoint(thread, rand_column_families, rand_keys);
963 if (!s.ok()) {
964 VerificationAbort(shared, "Checkpoint gave inconsistent state", s);
965 }
966 }
967
968 #ifndef ROCKSDB_LITE
969 if (thread->rand.OneInOpt(FLAGS_approximate_size_one_in)) {
970 Status s =
971 TestApproximateSize(thread, i, rand_column_families, rand_keys);
972 if (!s.ok()) {
973 VerificationAbort(shared, "ApproximateSize Failed", s);
974 }
975 }
976 #endif // !ROCKSDB_LITE
977 if (thread->rand.OneInOpt(FLAGS_acquire_snapshot_one_in)) {
978 TestAcquireSnapshot(thread, rand_column_family, keystr, i);
979 }
980
981 /*always*/ {
982 Status s = MaybeReleaseSnapshots(thread, i);
983 if (!s.ok()) {
984 VerificationAbort(shared, "Snapshot gave inconsistent state", s);
985 }
986 }
987
988 // Assign timestamps if necessary.
989 std::string read_ts_str;
990 Slice read_ts;
991 if (FLAGS_user_timestamp_size > 0) {
992 read_ts_str = GetNowNanos();
993 read_ts = read_ts_str;
994 read_opts.timestamp = &read_ts;
995 }
996
997 int prob_op = thread->rand.Uniform(100);
998 // Reset this in case we pick something other than a read op. We don't
999 // want to use a stale value when deciding at the beginning of the loop
1000 // whether to vote to reopen
1001 if (prob_op >= 0 && prob_op < static_cast<int>(FLAGS_readpercent)) {
1002 assert(0 <= prob_op);
1003 // OPERATION read
1004 if (FLAGS_use_multiget) {
1005 // Leave room for one more iteration of the loop with a single key
1006 // batch. This is to ensure that each thread does exactly the same
1007 // number of ops
1008 int multiget_batch_size = static_cast<int>(
1009 std::min(static_cast<uint64_t>(thread->rand.Uniform(64)),
1010 FLAGS_ops_per_thread - i - 1));
1011 // If its the last iteration, ensure that multiget_batch_size is 1
1012 multiget_batch_size = std::max(multiget_batch_size, 1);
1013 rand_keys = GenerateNKeys(thread, multiget_batch_size, i);
1014 TestMultiGet(thread, read_opts, rand_column_families, rand_keys);
1015 i += multiget_batch_size - 1;
1016 } else {
1017 TestGet(thread, read_opts, rand_column_families, rand_keys);
1018 }
1019 } else if (prob_op < prefix_bound) {
1020 assert(static_cast<int>(FLAGS_readpercent) <= prob_op);
1021 // OPERATION prefix scan
1022 // keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are
1023 // (8 - FLAGS_prefix_size) bytes besides the prefix. So there will
1024 // be 2 ^ ((8 - FLAGS_prefix_size) * 8) possible keys with the same
1025 // prefix
1026 TestPrefixScan(thread, read_opts, rand_column_families, rand_keys);
1027 } else if (prob_op < write_bound) {
1028 assert(prefix_bound <= prob_op);
1029 // OPERATION write
1030 TestPut(thread, write_opts, read_opts, rand_column_families, rand_keys,
1031 value);
1032 } else if (prob_op < del_bound) {
1033 assert(write_bound <= prob_op);
1034 // OPERATION delete
1035 TestDelete(thread, write_opts, rand_column_families, rand_keys);
1036 } else if (prob_op < delrange_bound) {
1037 assert(del_bound <= prob_op);
1038 // OPERATION delete range
1039 TestDeleteRange(thread, write_opts, rand_column_families, rand_keys);
1040 } else if (prob_op < iterate_bound) {
1041 assert(delrange_bound <= prob_op);
1042 // OPERATION iterate
1043 if (!FLAGS_skip_verifydb &&
1044 thread->rand.OneInOpt(
1045 FLAGS_verify_iterator_with_expected_state_one_in)) {
1046 TestIterateAgainstExpected(thread, read_opts, rand_column_families,
1047 rand_keys);
1048 } else {
1049 int num_seeks = static_cast<int>(
1050 std::min(static_cast<uint64_t>(thread->rand.Uniform(4)),
1051 FLAGS_ops_per_thread - i - 1));
1052 rand_keys = GenerateNKeys(thread, num_seeks, i);
1053 i += num_seeks - 1;
1054 TestIterate(thread, read_opts, rand_column_families, rand_keys);
1055 }
1056 } else {
1057 assert(iterate_bound <= prob_op);
1058 TestCustomOperations(thread, rand_column_families);
1059 }
1060 thread->stats.FinishedSingleOp();
1061 }
1062 }
1063 while (!thread->snapshot_queue.empty()) {
1064 db_->ReleaseSnapshot(thread->snapshot_queue.front().second.snapshot);
1065 delete thread->snapshot_queue.front().second.key_vec;
1066 thread->snapshot_queue.pop();
1067 }
1068
1069 thread->stats.Stop();
1070 }
1071
1072 #ifndef ROCKSDB_LITE
1073 // Generated a list of keys that close to boundaries of SST keys.
1074 // If there isn't any SST file in the DB, return empty list.
1075 std::vector<std::string> StressTest::GetWhiteBoxKeys(ThreadState* thread,
1076 DB* db,
1077 ColumnFamilyHandle* cfh,
1078 size_t num_keys) {
1079 ColumnFamilyMetaData cfmd;
1080 db->GetColumnFamilyMetaData(cfh, &cfmd);
1081 std::vector<std::string> boundaries;
1082 for (const LevelMetaData& lmd : cfmd.levels) {
1083 for (const SstFileMetaData& sfmd : lmd.files) {
1084 // If FLAGS_user_timestamp_size > 0, then both smallestkey and largestkey
1085 // have timestamps.
1086 const auto& skey = sfmd.smallestkey;
1087 const auto& lkey = sfmd.largestkey;
1088 assert(skey.size() >= FLAGS_user_timestamp_size);
1089 assert(lkey.size() >= FLAGS_user_timestamp_size);
1090 boundaries.push_back(
1091 skey.substr(0, skey.size() - FLAGS_user_timestamp_size));
1092 boundaries.push_back(
1093 lkey.substr(0, lkey.size() - FLAGS_user_timestamp_size));
1094 }
1095 }
1096 if (boundaries.empty()) {
1097 return {};
1098 }
1099
1100 std::vector<std::string> ret;
1101 for (size_t j = 0; j < num_keys; j++) {
1102 std::string k =
1103 boundaries[thread->rand.Uniform(static_cast<int>(boundaries.size()))];
1104 if (thread->rand.OneIn(3)) {
1105 // Reduce one byte from the string
1106 for (int i = static_cast<int>(k.length()) - 1; i >= 0; i--) {
1107 uint8_t cur = k[i];
1108 if (cur > 0) {
1109 k[i] = static_cast<char>(cur - 1);
1110 break;
1111 } else if (i > 0) {
1112 k[i] = 0xFFu;
1113 }
1114 }
1115 } else if (thread->rand.OneIn(2)) {
1116 // Add one byte to the string
1117 for (int i = static_cast<int>(k.length()) - 1; i >= 0; i--) {
1118 uint8_t cur = k[i];
1119 if (cur < 255) {
1120 k[i] = static_cast<char>(cur + 1);
1121 break;
1122 } else if (i > 0) {
1123 k[i] = 0x00;
1124 }
1125 }
1126 }
1127 ret.push_back(k);
1128 }
1129 return ret;
1130 }
1131 #endif // !ROCKSDB_LITE
1132
1133 // Given a key K, this creates an iterator which scans to K and then
1134 // does a random sequence of Next/Prev operations.
1135 Status StressTest::TestIterate(ThreadState* thread,
1136 const ReadOptions& read_opts,
1137 const std::vector<int>& rand_column_families,
1138 const std::vector<int64_t>& rand_keys) {
1139 assert(!rand_column_families.empty());
1140 assert(!rand_keys.empty());
1141
1142 ManagedSnapshot snapshot_guard(db_);
1143
1144 ReadOptions ro = read_opts;
1145 ro.snapshot = snapshot_guard.snapshot();
1146
1147 std::string read_ts_str;
1148 Slice read_ts_slice;
1149 MaybeUseOlderTimestampForRangeScan(thread, read_ts_str, read_ts_slice, ro);
1150
1151 bool expect_total_order = false;
1152 if (thread->rand.OneIn(16)) {
1153 // When prefix extractor is used, it's useful to cover total order seek.
1154 ro.total_order_seek = true;
1155 expect_total_order = true;
1156 } else if (thread->rand.OneIn(4)) {
1157 ro.total_order_seek = false;
1158 ro.auto_prefix_mode = true;
1159 expect_total_order = true;
1160 } else if (options_.prefix_extractor.get() == nullptr) {
1161 expect_total_order = true;
1162 }
1163
1164 std::string upper_bound_str;
1165 Slice upper_bound;
1166 if (thread->rand.OneIn(16)) {
1167 // With a 1/16 chance, set an iterator upper bound.
1168 // Note: upper_bound can be smaller than the seek key.
1169 const int64_t rand_upper_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
1170 upper_bound_str = Key(rand_upper_key);
1171 upper_bound = Slice(upper_bound_str);
1172 ro.iterate_upper_bound = &upper_bound;
1173 }
1174 std::string lower_bound_str;
1175 Slice lower_bound;
1176 if (thread->rand.OneIn(16)) {
1177 // With a 1/16 chance, enable iterator lower bound.
1178 // Note: lower_bound can be greater than the seek key.
1179 const int64_t rand_lower_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
1180 lower_bound_str = Key(rand_lower_key);
1181 lower_bound = Slice(lower_bound_str);
1182 ro.iterate_lower_bound = &lower_bound;
1183 }
1184
1185 ColumnFamilyHandle* const cfh = column_families_[rand_column_families[0]];
1186 assert(cfh);
1187
1188 std::unique_ptr<Iterator> iter(db_->NewIterator(ro, cfh));
1189
1190 std::vector<std::string> key_strs;
1191 if (thread->rand.OneIn(16)) {
1192 // Generate keys close to lower or upper bound of SST files.
1193 key_strs = GetWhiteBoxKeys(thread, db_, cfh, rand_keys.size());
1194 }
1195 if (key_strs.empty()) {
1196 // Use the random keys passed in.
1197 for (int64_t rkey : rand_keys) {
1198 key_strs.push_back(Key(rkey));
1199 }
1200 }
1201
1202 std::string op_logs;
1203 constexpr size_t kOpLogsLimit = 10000;
1204
1205 for (const std::string& key_str : key_strs) {
1206 if (op_logs.size() > kOpLogsLimit) {
1207 // Shouldn't take too much memory for the history log. Clear it.
1208 op_logs = "(cleared...)\n";
1209 }
1210
1211 if (ro.iterate_upper_bound != nullptr && thread->rand.OneIn(2)) {
1212 // With a 1/2 chance, change the upper bound.
1213 // It is possible that it is changed before first use, but there is no
1214 // problem with that.
1215 const int64_t rand_upper_key =
1216 GenerateOneKey(thread, FLAGS_ops_per_thread);
1217 upper_bound_str = Key(rand_upper_key);
1218 upper_bound = Slice(upper_bound_str);
1219 }
1220 if (ro.iterate_lower_bound != nullptr && thread->rand.OneIn(4)) {
1221 // With a 1/4 chance, change the lower bound.
1222 // It is possible that it is changed before first use, but there is no
1223 // problem with that.
1224 const int64_t rand_lower_key =
1225 GenerateOneKey(thread, FLAGS_ops_per_thread);
1226 lower_bound_str = Key(rand_lower_key);
1227 lower_bound = Slice(lower_bound_str);
1228 }
1229
1230 // Record some options to op_logs
1231 op_logs += "total_order_seek: ";
1232 op_logs += (ro.total_order_seek ? "1 " : "0 ");
1233 op_logs += "auto_prefix_mode: ";
1234 op_logs += (ro.auto_prefix_mode ? "1 " : "0 ");
1235 if (ro.iterate_upper_bound != nullptr) {
1236 op_logs += "ub: " + upper_bound.ToString(true) + " ";
1237 }
1238 if (ro.iterate_lower_bound != nullptr) {
1239 op_logs += "lb: " + lower_bound.ToString(true) + " ";
1240 }
1241
1242 // Set up an iterator, perform the same operations without bounds and with
1243 // total order seek, and compare the results. This is to identify bugs
1244 // related to bounds, prefix extractor, or reseeking. Sometimes we are
1245 // comparing iterators with the same set-up, and it doesn't hurt to check
1246 // them to be equal.
1247 //
1248 // This `ReadOptions` is for validation purposes. Ignore
1249 // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
1250 ReadOptions cmp_ro;
1251 cmp_ro.timestamp = ro.timestamp;
1252 cmp_ro.iter_start_ts = ro.iter_start_ts;
1253 cmp_ro.snapshot = snapshot_guard.snapshot();
1254 cmp_ro.total_order_seek = true;
1255
1256 ColumnFamilyHandle* const cmp_cfh =
1257 GetControlCfh(thread, rand_column_families[0]);
1258 assert(cmp_cfh);
1259
1260 std::unique_ptr<Iterator> cmp_iter(db_->NewIterator(cmp_ro, cmp_cfh));
1261
1262 bool diverged = false;
1263
1264 Slice key(key_str);
1265
1266 const bool support_seek_first_or_last = expect_total_order;
1267
1268 LastIterateOp last_op;
1269 if (support_seek_first_or_last && thread->rand.OneIn(100)) {
1270 iter->SeekToFirst();
1271 cmp_iter->SeekToFirst();
1272 last_op = kLastOpSeekToFirst;
1273 op_logs += "STF ";
1274 } else if (support_seek_first_or_last && thread->rand.OneIn(100)) {
1275 iter->SeekToLast();
1276 cmp_iter->SeekToLast();
1277 last_op = kLastOpSeekToLast;
1278 op_logs += "STL ";
1279 } else if (thread->rand.OneIn(8)) {
1280 iter->SeekForPrev(key);
1281 cmp_iter->SeekForPrev(key);
1282 last_op = kLastOpSeekForPrev;
1283 op_logs += "SFP " + key.ToString(true) + " ";
1284 } else {
1285 iter->Seek(key);
1286 cmp_iter->Seek(key);
1287 last_op = kLastOpSeek;
1288 op_logs += "S " + key.ToString(true) + " ";
1289 }
1290
1291 VerifyIterator(thread, cmp_cfh, ro, iter.get(), cmp_iter.get(), last_op,
1292 key, op_logs, &diverged);
1293
1294 const bool no_reverse =
1295 (FLAGS_memtablerep == "prefix_hash" && !expect_total_order);
1296 for (uint64_t i = 0; i < FLAGS_num_iterations && iter->Valid(); ++i) {
1297 if (no_reverse || thread->rand.OneIn(2)) {
1298 iter->Next();
1299 if (!diverged) {
1300 assert(cmp_iter->Valid());
1301 cmp_iter->Next();
1302 }
1303 op_logs += "N";
1304 } else {
1305 iter->Prev();
1306 if (!diverged) {
1307 assert(cmp_iter->Valid());
1308 cmp_iter->Prev();
1309 }
1310 op_logs += "P";
1311 }
1312
1313 last_op = kLastOpNextOrPrev;
1314
1315 VerifyIterator(thread, cmp_cfh, ro, iter.get(), cmp_iter.get(), last_op,
1316 key, op_logs, &diverged);
1317 }
1318
1319 thread->stats.AddIterations(1);
1320
1321 op_logs += "; ";
1322 }
1323
1324 return Status::OK();
1325 }
1326
1327 #ifndef ROCKSDB_LITE
1328 // Test the return status of GetLiveFiles.
1329 Status StressTest::VerifyGetLiveFiles() const {
1330 std::vector<std::string> live_file;
1331 uint64_t manifest_size = 0;
1332 return db_->GetLiveFiles(live_file, &manifest_size);
1333 }
1334
1335 // Test the return status of GetSortedWalFiles.
1336 Status StressTest::VerifyGetSortedWalFiles() const {
1337 VectorLogPtr log_ptr;
1338 return db_->GetSortedWalFiles(log_ptr);
1339 }
1340
1341 // Test the return status of GetCurrentWalFile.
1342 Status StressTest::VerifyGetCurrentWalFile() const {
1343 std::unique_ptr<LogFile> cur_wal_file;
1344 return db_->GetCurrentWalFile(&cur_wal_file);
1345 }
1346 #endif // !ROCKSDB_LITE
1347
1348 // Compare the two iterator, iter and cmp_iter are in the same position,
1349 // unless iter might be made invalidate or undefined because of
1350 // upper or lower bounds, or prefix extractor.
1351 // Will flag failure if the verification fails.
1352 // diverged = true if the two iterator is already diverged.
1353 // True if verification passed, false if not.
1354 void StressTest::VerifyIterator(ThreadState* thread,
1355 ColumnFamilyHandle* cmp_cfh,
1356 const ReadOptions& ro, Iterator* iter,
1357 Iterator* cmp_iter, LastIterateOp op,
1358 const Slice& seek_key,
1359 const std::string& op_logs, bool* diverged) {
1360 assert(diverged);
1361
1362 if (*diverged) {
1363 return;
1364 }
1365
1366 if (ro.iter_start_ts != nullptr) {
1367 assert(FLAGS_user_timestamp_size > 0);
1368 // We currently do not verify iterator when dumping history of internal
1369 // keys.
1370 *diverged = true;
1371 return;
1372 }
1373
1374 if (op == kLastOpSeekToFirst && ro.iterate_lower_bound != nullptr) {
1375 // SeekToFirst() with lower bound is not well defined.
1376 *diverged = true;
1377 return;
1378 } else if (op == kLastOpSeekToLast && ro.iterate_upper_bound != nullptr) {
1379 // SeekToLast() with higher bound is not well defined.
1380 *diverged = true;
1381 return;
1382 } else if (op == kLastOpSeek && ro.iterate_lower_bound != nullptr &&
1383 (options_.comparator->CompareWithoutTimestamp(
1384 *ro.iterate_lower_bound, /*a_has_ts=*/false, seek_key,
1385 /*b_has_ts=*/false) >= 0 ||
1386 (ro.iterate_upper_bound != nullptr &&
1387 options_.comparator->CompareWithoutTimestamp(
1388 *ro.iterate_lower_bound, /*a_has_ts=*/false,
1389 *ro.iterate_upper_bound, /*b_has_ts*/ false) >= 0))) {
1390 // Lower bound behavior is not well defined if it is larger than
1391 // seek key or upper bound. Disable the check for now.
1392 *diverged = true;
1393 return;
1394 } else if (op == kLastOpSeekForPrev && ro.iterate_upper_bound != nullptr &&
1395 (options_.comparator->CompareWithoutTimestamp(
1396 *ro.iterate_upper_bound, /*a_has_ts=*/false, seek_key,
1397 /*b_has_ts=*/false) <= 0 ||
1398 (ro.iterate_lower_bound != nullptr &&
1399 options_.comparator->CompareWithoutTimestamp(
1400 *ro.iterate_lower_bound, /*a_has_ts=*/false,
1401 *ro.iterate_upper_bound, /*b_has_ts=*/false) >= 0))) {
1402 // Uppder bound behavior is not well defined if it is smaller than
1403 // seek key or lower bound. Disable the check for now.
1404 *diverged = true;
1405 return;
1406 }
1407
1408 const SliceTransform* pe = (ro.total_order_seek || ro.auto_prefix_mode)
1409 ? nullptr
1410 : options_.prefix_extractor.get();
1411 const Comparator* cmp = options_.comparator;
1412
1413 if (iter->Valid() && !cmp_iter->Valid()) {
1414 if (pe != nullptr) {
1415 if (!pe->InDomain(seek_key)) {
1416 // Prefix seek a non-in-domain key is undefined. Skip checking for
1417 // this scenario.
1418 *diverged = true;
1419 return;
1420 } else if (!pe->InDomain(iter->key())) {
1421 // out of range is iterator key is not in domain anymore.
1422 *diverged = true;
1423 return;
1424 } else if (pe->Transform(iter->key()) != pe->Transform(seek_key)) {
1425 *diverged = true;
1426 return;
1427 }
1428 }
1429 fprintf(stderr,
1430 "Control interator is invalid but iterator has key %s "
1431 "%s\n",
1432 iter->key().ToString(true).c_str(), op_logs.c_str());
1433
1434 *diverged = true;
1435 } else if (cmp_iter->Valid()) {
1436 // Iterator is not valid. It can be legimate if it has already been
1437 // out of upper or lower bound, or filtered out by prefix iterator.
1438 const Slice& total_order_key = cmp_iter->key();
1439
1440 if (pe != nullptr) {
1441 if (!pe->InDomain(seek_key)) {
1442 // Prefix seek a non-in-domain key is undefined. Skip checking for
1443 // this scenario.
1444 *diverged = true;
1445 return;
1446 }
1447
1448 if (!pe->InDomain(total_order_key) ||
1449 pe->Transform(total_order_key) != pe->Transform(seek_key)) {
1450 // If the prefix is exhausted, the only thing needs to check
1451 // is the iterator isn't return a position in prefix.
1452 // Either way, checking can stop from here.
1453 *diverged = true;
1454 if (!iter->Valid() || !pe->InDomain(iter->key()) ||
1455 pe->Transform(iter->key()) != pe->Transform(seek_key)) {
1456 return;
1457 }
1458 fprintf(stderr,
1459 "Iterator stays in prefix but contol doesn't"
1460 " iterator key %s control iterator key %s %s\n",
1461 iter->key().ToString(true).c_str(),
1462 cmp_iter->key().ToString(true).c_str(), op_logs.c_str());
1463 }
1464 }
1465 // Check upper or lower bounds.
1466 if (!*diverged) {
1467 if ((iter->Valid() && iter->key() != cmp_iter->key()) ||
1468 (!iter->Valid() &&
1469 (ro.iterate_upper_bound == nullptr ||
1470 cmp->CompareWithoutTimestamp(total_order_key, /*a_has_ts=*/false,
1471 *ro.iterate_upper_bound,
1472 /*b_has_ts=*/false) < 0) &&
1473 (ro.iterate_lower_bound == nullptr ||
1474 cmp->CompareWithoutTimestamp(total_order_key, /*a_has_ts=*/false,
1475 *ro.iterate_lower_bound,
1476 /*b_has_ts=*/false) > 0))) {
1477 fprintf(stderr,
1478 "Iterator diverged from control iterator which"
1479 " has value %s %s\n",
1480 total_order_key.ToString(true).c_str(), op_logs.c_str());
1481 if (iter->Valid()) {
1482 fprintf(stderr, "iterator has value %s\n",
1483 iter->key().ToString(true).c_str());
1484 } else {
1485 fprintf(stderr, "iterator is not valid\n");
1486 }
1487 *diverged = true;
1488 }
1489 }
1490 }
1491
1492 if (!*diverged && iter->Valid()) {
1493 const WideColumns expected_columns =
1494 GenerateExpectedWideColumns(GetValueBase(iter->value()), iter->value());
1495 if (iter->columns() != expected_columns) {
1496 fprintf(stderr, "Value and columns inconsistent for iterator: %s\n",
1497 DebugString(iter->value(), iter->columns(), expected_columns)
1498 .c_str());
1499
1500 *diverged = true;
1501 }
1502 }
1503
1504 if (*diverged) {
1505 fprintf(stderr, "Control CF %s\n", cmp_cfh->GetName().c_str());
1506 thread->stats.AddErrors(1);
1507 // Fail fast to preserve the DB state.
1508 thread->shared->SetVerificationFailure();
1509 }
1510 }
1511
1512 #ifdef ROCKSDB_LITE
1513 Status StressTest::TestBackupRestore(
1514 ThreadState* /* thread */,
1515 const std::vector<int>& /* rand_column_families */,
1516 const std::vector<int64_t>& /* rand_keys */) {
1517 assert(false);
1518 fprintf(stderr,
1519 "RocksDB lite does not support "
1520 "TestBackupRestore\n");
1521 std::terminate();
1522 }
1523
1524 Status StressTest::TestCheckpoint(
1525 ThreadState* /* thread */,
1526 const std::vector<int>& /* rand_column_families */,
1527 const std::vector<int64_t>& /* rand_keys */) {
1528 assert(false);
1529 fprintf(stderr,
1530 "RocksDB lite does not support "
1531 "TestCheckpoint\n");
1532 std::terminate();
1533 }
1534
1535 void StressTest::TestCompactFiles(ThreadState* /* thread */,
1536 ColumnFamilyHandle* /* column_family */) {
1537 assert(false);
1538 fprintf(stderr,
1539 "RocksDB lite does not support "
1540 "CompactFiles\n");
1541 std::terminate();
1542 }
1543 #else // ROCKSDB_LITE
1544 Status StressTest::TestBackupRestore(
1545 ThreadState* thread, const std::vector<int>& rand_column_families,
1546 const std::vector<int64_t>& rand_keys) {
1547 std::vector<std::unique_ptr<MutexLock>> locks;
1548 if (ShouldAcquireMutexOnKey()) {
1549 for (int rand_column_family : rand_column_families) {
1550 // `rand_keys[0]` on each chosen CF will be verified.
1551 locks.emplace_back(new MutexLock(
1552 thread->shared->GetMutexForKey(rand_column_family, rand_keys[0])));
1553 }
1554 }
1555
1556 const std::string backup_dir =
1557 FLAGS_db + "/.backup" + std::to_string(thread->tid);
1558 const std::string restore_dir =
1559 FLAGS_db + "/.restore" + std::to_string(thread->tid);
1560 BackupEngineOptions backup_opts(backup_dir);
1561 // For debugging, get info_log from live options
1562 backup_opts.info_log = db_->GetDBOptions().info_log.get();
1563 if (thread->rand.OneIn(10)) {
1564 backup_opts.share_table_files = false;
1565 } else {
1566 backup_opts.share_table_files = true;
1567 if (thread->rand.OneIn(5)) {
1568 backup_opts.share_files_with_checksum = false;
1569 } else {
1570 backup_opts.share_files_with_checksum = true;
1571 if (thread->rand.OneIn(2)) {
1572 // old
1573 backup_opts.share_files_with_checksum_naming =
1574 BackupEngineOptions::kLegacyCrc32cAndFileSize;
1575 } else {
1576 // new
1577 backup_opts.share_files_with_checksum_naming =
1578 BackupEngineOptions::kUseDbSessionId;
1579 }
1580 if (thread->rand.OneIn(2)) {
1581 backup_opts.share_files_with_checksum_naming =
1582 backup_opts.share_files_with_checksum_naming |
1583 BackupEngineOptions::kFlagIncludeFileSize;
1584 }
1585 }
1586 }
1587 if (thread->rand.OneIn(2)) {
1588 backup_opts.schema_version = 1;
1589 } else {
1590 backup_opts.schema_version = 2;
1591 }
1592 BackupEngine* backup_engine = nullptr;
1593 std::string from = "a backup/restore operation";
1594 Status s = BackupEngine::Open(db_stress_env, backup_opts, &backup_engine);
1595 if (!s.ok()) {
1596 from = "BackupEngine::Open";
1597 }
1598 if (s.ok()) {
1599 if (backup_opts.schema_version >= 2 && thread->rand.OneIn(2)) {
1600 TEST_BackupMetaSchemaOptions test_opts;
1601 test_opts.crc32c_checksums = thread->rand.OneIn(2) == 0;
1602 test_opts.file_sizes = thread->rand.OneIn(2) == 0;
1603 TEST_SetBackupMetaSchemaOptions(backup_engine, test_opts);
1604 }
1605 CreateBackupOptions create_opts;
1606 if (FLAGS_disable_wal) {
1607 // The verification can only work when latest value of `key` is backed up,
1608 // which requires flushing in case of WAL disabled.
1609 //
1610 // Note this triggers a flush with a key lock held. Meanwhile, operations
1611 // like flush/compaction may attempt to grab key locks like in
1612 // `DbStressCompactionFilter`. The philosophy around preventing deadlock
1613 // is the background operation key lock acquisition only tries but does
1614 // not wait for the lock. So here in the foreground it is OK to hold the
1615 // lock and wait on a background operation (flush).
1616 create_opts.flush_before_backup = true;
1617 }
1618 s = backup_engine->CreateNewBackup(create_opts, db_);
1619 if (!s.ok()) {
1620 from = "BackupEngine::CreateNewBackup";
1621 }
1622 }
1623 if (s.ok()) {
1624 delete backup_engine;
1625 backup_engine = nullptr;
1626 s = BackupEngine::Open(db_stress_env, backup_opts, &backup_engine);
1627 if (!s.ok()) {
1628 from = "BackupEngine::Open (again)";
1629 }
1630 }
1631 std::vector<BackupInfo> backup_info;
1632 // If inplace_not_restore, we verify the backup by opening it as a
1633 // read-only DB. If !inplace_not_restore, we restore it to a temporary
1634 // directory for verification.
1635 bool inplace_not_restore = thread->rand.OneIn(3);
1636 if (s.ok()) {
1637 backup_engine->GetBackupInfo(&backup_info,
1638 /*include_file_details*/ inplace_not_restore);
1639 if (backup_info.empty()) {
1640 s = Status::NotFound("no backups found");
1641 from = "BackupEngine::GetBackupInfo";
1642 }
1643 }
1644 if (s.ok() && thread->rand.OneIn(2)) {
1645 s = backup_engine->VerifyBackup(
1646 backup_info.front().backup_id,
1647 thread->rand.OneIn(2) /* verify_with_checksum */);
1648 if (!s.ok()) {
1649 from = "BackupEngine::VerifyBackup";
1650 }
1651 }
1652 const bool allow_persistent = thread->tid == 0; // not too many
1653 bool from_latest = false;
1654 int count = static_cast<int>(backup_info.size());
1655 if (s.ok() && !inplace_not_restore) {
1656 if (count > 1) {
1657 s = backup_engine->RestoreDBFromBackup(
1658 RestoreOptions(), backup_info[thread->rand.Uniform(count)].backup_id,
1659 restore_dir /* db_dir */, restore_dir /* wal_dir */);
1660 if (!s.ok()) {
1661 from = "BackupEngine::RestoreDBFromBackup";
1662 }
1663 } else {
1664 from_latest = true;
1665 s = backup_engine->RestoreDBFromLatestBackup(RestoreOptions(),
1666 restore_dir /* db_dir */,
1667 restore_dir /* wal_dir */);
1668 if (!s.ok()) {
1669 from = "BackupEngine::RestoreDBFromLatestBackup";
1670 }
1671 }
1672 }
1673 if (s.ok() && !inplace_not_restore) {
1674 // Purge early if restoring, to ensure the restored directory doesn't
1675 // have some secret dependency on the backup directory.
1676 uint32_t to_keep = 0;
1677 if (allow_persistent) {
1678 // allow one thread to keep up to 2 backups
1679 to_keep = thread->rand.Uniform(3);
1680 }
1681 s = backup_engine->PurgeOldBackups(to_keep);
1682 if (!s.ok()) {
1683 from = "BackupEngine::PurgeOldBackups";
1684 }
1685 }
1686 DB* restored_db = nullptr;
1687 std::vector<ColumnFamilyHandle*> restored_cf_handles;
1688 // Not yet implemented: opening restored BlobDB or TransactionDB
1689 if (s.ok() && !FLAGS_use_txn && !FLAGS_use_blob_db) {
1690 Options restore_options(options_);
1691 restore_options.best_efforts_recovery = false;
1692 restore_options.listeners.clear();
1693 // Avoid dangling/shared file descriptors, for reliable destroy
1694 restore_options.sst_file_manager = nullptr;
1695 std::vector<ColumnFamilyDescriptor> cf_descriptors;
1696 // TODO(ajkr): `column_family_names_` is not safe to access here when
1697 // `clear_column_family_one_in != 0`. But we can't easily switch to
1698 // `ListColumnFamilies` to get names because it won't necessarily give
1699 // the same order as `column_family_names_`.
1700 assert(FLAGS_clear_column_family_one_in == 0);
1701 for (auto name : column_family_names_) {
1702 cf_descriptors.emplace_back(name, ColumnFamilyOptions(restore_options));
1703 }
1704 if (inplace_not_restore) {
1705 BackupInfo& info = backup_info[thread->rand.Uniform(count)];
1706 restore_options.env = info.env_for_open.get();
1707 s = DB::OpenForReadOnly(DBOptions(restore_options), info.name_for_open,
1708 cf_descriptors, &restored_cf_handles,
1709 &restored_db);
1710 if (!s.ok()) {
1711 from = "DB::OpenForReadOnly in backup/restore";
1712 }
1713 } else {
1714 s = DB::Open(DBOptions(restore_options), restore_dir, cf_descriptors,
1715 &restored_cf_handles, &restored_db);
1716 if (!s.ok()) {
1717 from = "DB::Open in backup/restore";
1718 }
1719 }
1720 }
1721 // Note the column families chosen by `rand_column_families` cannot be
1722 // dropped while the locks for `rand_keys` are held. So we should not have
1723 // to worry about accessing those column families throughout this function.
1724 //
1725 // For simplicity, currently only verifies existence/non-existence of a
1726 // single key
1727 for (size_t i = 0; restored_db && s.ok() && i < rand_column_families.size();
1728 ++i) {
1729 std::string key_str = Key(rand_keys[0]);
1730 Slice key = key_str;
1731 std::string restored_value;
1732 // This `ReadOptions` is for validation purposes. Ignore
1733 // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
1734 ReadOptions read_opts;
1735 std::string ts_str;
1736 Slice ts;
1737 if (FLAGS_user_timestamp_size > 0) {
1738 ts_str = GetNowNanos();
1739 ts = ts_str;
1740 read_opts.timestamp = &ts;
1741 }
1742 Status get_status = restored_db->Get(
1743 read_opts, restored_cf_handles[rand_column_families[i]], key,
1744 &restored_value);
1745 bool exists = thread->shared->Exists(rand_column_families[i], rand_keys[0]);
1746 if (get_status.ok()) {
1747 if (!exists && from_latest && ShouldAcquireMutexOnKey()) {
1748 std::ostringstream oss;
1749 oss << "0x" << key.ToString(true)
1750 << " exists in restore but not in original db";
1751 s = Status::Corruption(oss.str());
1752 }
1753 } else if (get_status.IsNotFound()) {
1754 if (exists && from_latest && ShouldAcquireMutexOnKey()) {
1755 std::ostringstream oss;
1756 oss << "0x" << key.ToString(true)
1757 << " exists in original db but not in restore";
1758 s = Status::Corruption(oss.str());
1759 }
1760 } else {
1761 s = get_status;
1762 if (!s.ok()) {
1763 from = "DB::Get in backup/restore";
1764 }
1765 }
1766 }
1767 if (restored_db != nullptr) {
1768 for (auto* cf_handle : restored_cf_handles) {
1769 restored_db->DestroyColumnFamilyHandle(cf_handle);
1770 }
1771 delete restored_db;
1772 restored_db = nullptr;
1773 }
1774 if (s.ok() && inplace_not_restore) {
1775 // Purge late if inplace open read-only
1776 uint32_t to_keep = 0;
1777 if (allow_persistent) {
1778 // allow one thread to keep up to 2 backups
1779 to_keep = thread->rand.Uniform(3);
1780 }
1781 s = backup_engine->PurgeOldBackups(to_keep);
1782 if (!s.ok()) {
1783 from = "BackupEngine::PurgeOldBackups";
1784 }
1785 }
1786 if (backup_engine != nullptr) {
1787 delete backup_engine;
1788 backup_engine = nullptr;
1789 }
1790 if (s.ok()) {
1791 // Preserve directories on failure, or allowed persistent backup
1792 if (!allow_persistent) {
1793 s = DestroyDir(db_stress_env, backup_dir);
1794 if (!s.ok()) {
1795 from = "Destroy backup dir";
1796 }
1797 }
1798 }
1799 if (s.ok()) {
1800 s = DestroyDir(db_stress_env, restore_dir);
1801 if (!s.ok()) {
1802 from = "Destroy restore dir";
1803 }
1804 }
1805 if (!s.ok()) {
1806 fprintf(stderr, "Failure in %s with: %s\n", from.c_str(),
1807 s.ToString().c_str());
1808 }
1809 return s;
1810 }
1811
1812 Status StressTest::TestApproximateSize(
1813 ThreadState* thread, uint64_t iteration,
1814 const std::vector<int>& rand_column_families,
1815 const std::vector<int64_t>& rand_keys) {
1816 // rand_keys likely only has one key. Just use the first one.
1817 assert(!rand_keys.empty());
1818 assert(!rand_column_families.empty());
1819 int64_t key1 = rand_keys[0];
1820 int64_t key2;
1821 if (thread->rand.OneIn(2)) {
1822 // Two totally random keys. This tends to cover large ranges.
1823 key2 = GenerateOneKey(thread, iteration);
1824 if (key2 < key1) {
1825 std::swap(key1, key2);
1826 }
1827 } else {
1828 // Unless users pass a very large FLAGS_max_key, it we should not worry
1829 // about overflow. It is for testing, so we skip the overflow checking
1830 // for simplicity.
1831 key2 = key1 + static_cast<int64_t>(thread->rand.Uniform(1000));
1832 }
1833 std::string key1_str = Key(key1);
1834 std::string key2_str = Key(key2);
1835 Range range{Slice(key1_str), Slice(key2_str)};
1836 SizeApproximationOptions sao;
1837 sao.include_memtables = thread->rand.OneIn(2);
1838 if (sao.include_memtables) {
1839 sao.include_files = thread->rand.OneIn(2);
1840 }
1841 if (thread->rand.OneIn(2)) {
1842 if (thread->rand.OneIn(2)) {
1843 sao.files_size_error_margin = 0.0;
1844 } else {
1845 sao.files_size_error_margin =
1846 static_cast<double>(thread->rand.Uniform(3));
1847 }
1848 }
1849 uint64_t result;
1850 return db_->GetApproximateSizes(
1851 sao, column_families_[rand_column_families[0]], &range, 1, &result);
1852 }
1853
1854 Status StressTest::TestCheckpoint(ThreadState* thread,
1855 const std::vector<int>& rand_column_families,
1856 const std::vector<int64_t>& rand_keys) {
1857 std::vector<std::unique_ptr<MutexLock>> locks;
1858 if (ShouldAcquireMutexOnKey()) {
1859 for (int rand_column_family : rand_column_families) {
1860 // `rand_keys[0]` on each chosen CF will be verified.
1861 locks.emplace_back(new MutexLock(
1862 thread->shared->GetMutexForKey(rand_column_family, rand_keys[0])));
1863 }
1864 }
1865
1866 std::string checkpoint_dir =
1867 FLAGS_db + "/.checkpoint" + std::to_string(thread->tid);
1868 Options tmp_opts(options_);
1869 tmp_opts.listeners.clear();
1870 tmp_opts.env = db_stress_env;
1871
1872 DestroyDB(checkpoint_dir, tmp_opts);
1873
1874 if (db_stress_env->FileExists(checkpoint_dir).ok()) {
1875 // If the directory might still exist, try to delete the files one by one.
1876 // Likely a trash file is still there.
1877 Status my_s = DestroyDir(db_stress_env, checkpoint_dir);
1878 if (!my_s.ok()) {
1879 fprintf(stderr, "Fail to destory directory before checkpoint: %s",
1880 my_s.ToString().c_str());
1881 }
1882 }
1883
1884 Checkpoint* checkpoint = nullptr;
1885 Status s = Checkpoint::Create(db_, &checkpoint);
1886 if (s.ok()) {
1887 s = checkpoint->CreateCheckpoint(checkpoint_dir);
1888 if (!s.ok()) {
1889 fprintf(stderr, "Fail to create checkpoint to %s\n",
1890 checkpoint_dir.c_str());
1891 std::vector<std::string> files;
1892 Status my_s = db_stress_env->GetChildren(checkpoint_dir, &files);
1893 if (my_s.ok()) {
1894 for (const auto& f : files) {
1895 fprintf(stderr, " %s\n", f.c_str());
1896 }
1897 } else {
1898 fprintf(stderr, "Fail to get files under the directory to %s\n",
1899 my_s.ToString().c_str());
1900 }
1901 }
1902 }
1903 delete checkpoint;
1904 checkpoint = nullptr;
1905 std::vector<ColumnFamilyHandle*> cf_handles;
1906 DB* checkpoint_db = nullptr;
1907 if (s.ok()) {
1908 Options options(options_);
1909 options.best_efforts_recovery = false;
1910 options.listeners.clear();
1911 // Avoid race condition in trash handling after delete checkpoint_db
1912 options.sst_file_manager.reset();
1913 std::vector<ColumnFamilyDescriptor> cf_descs;
1914 // TODO(ajkr): `column_family_names_` is not safe to access here when
1915 // `clear_column_family_one_in != 0`. But we can't easily switch to
1916 // `ListColumnFamilies` to get names because it won't necessarily give
1917 // the same order as `column_family_names_`.
1918 assert(FLAGS_clear_column_family_one_in == 0);
1919 if (FLAGS_clear_column_family_one_in == 0) {
1920 for (const auto& name : column_family_names_) {
1921 cf_descs.emplace_back(name, ColumnFamilyOptions(options));
1922 }
1923 s = DB::OpenForReadOnly(DBOptions(options), checkpoint_dir, cf_descs,
1924 &cf_handles, &checkpoint_db);
1925 }
1926 }
1927 if (checkpoint_db != nullptr) {
1928 // Note the column families chosen by `rand_column_families` cannot be
1929 // dropped while the locks for `rand_keys` are held. So we should not have
1930 // to worry about accessing those column families throughout this function.
1931 for (size_t i = 0; s.ok() && i < rand_column_families.size(); ++i) {
1932 std::string key_str = Key(rand_keys[0]);
1933 Slice key = key_str;
1934 std::string ts_str;
1935 Slice ts;
1936 ReadOptions read_opts;
1937 if (FLAGS_user_timestamp_size > 0) {
1938 ts_str = GetNowNanos();
1939 ts = ts_str;
1940 read_opts.timestamp = &ts;
1941 }
1942 std::string value;
1943 Status get_status = checkpoint_db->Get(
1944 read_opts, cf_handles[rand_column_families[i]], key, &value);
1945 bool exists =
1946 thread->shared->Exists(rand_column_families[i], rand_keys[0]);
1947 if (get_status.ok()) {
1948 if (!exists && ShouldAcquireMutexOnKey()) {
1949 std::ostringstream oss;
1950 oss << "0x" << key.ToString(true) << " exists in checkpoint "
1951 << checkpoint_dir << " but not in original db";
1952 s = Status::Corruption(oss.str());
1953 }
1954 } else if (get_status.IsNotFound()) {
1955 if (exists && ShouldAcquireMutexOnKey()) {
1956 std::ostringstream oss;
1957 oss << "0x" << key.ToString(true)
1958 << " exists in original db but not in checkpoint "
1959 << checkpoint_dir;
1960 s = Status::Corruption(oss.str());
1961 }
1962 } else {
1963 s = get_status;
1964 }
1965 }
1966 for (auto cfh : cf_handles) {
1967 delete cfh;
1968 }
1969 cf_handles.clear();
1970 delete checkpoint_db;
1971 checkpoint_db = nullptr;
1972 }
1973
1974 if (!s.ok()) {
1975 fprintf(stderr, "A checkpoint operation failed with: %s\n",
1976 s.ToString().c_str());
1977 } else {
1978 DestroyDB(checkpoint_dir, tmp_opts);
1979 }
1980 return s;
1981 }
1982
1983 void StressTest::TestGetProperty(ThreadState* thread) const {
1984 std::unordered_set<std::string> levelPropertyNames = {
1985 DB::Properties::kAggregatedTablePropertiesAtLevel,
1986 DB::Properties::kCompressionRatioAtLevelPrefix,
1987 DB::Properties::kNumFilesAtLevelPrefix,
1988 };
1989 std::unordered_set<std::string> unknownPropertyNames = {
1990 DB::Properties::kEstimateOldestKeyTime,
1991 DB::Properties::kOptionsStatistics,
1992 DB::Properties::
1993 kLiveSstFilesSizeAtTemperature, // similar to levelPropertyNames, it
1994 // requires a number suffix
1995 };
1996 unknownPropertyNames.insert(levelPropertyNames.begin(),
1997 levelPropertyNames.end());
1998
1999 std::unordered_set<std::string> blobCachePropertyNames = {
2000 DB::Properties::kBlobCacheCapacity,
2001 DB::Properties::kBlobCacheUsage,
2002 DB::Properties::kBlobCachePinnedUsage,
2003 };
2004 if (db_->GetOptions().blob_cache == nullptr) {
2005 unknownPropertyNames.insert(blobCachePropertyNames.begin(),
2006 blobCachePropertyNames.end());
2007 }
2008
2009 std::string prop;
2010 for (const auto& ppt_name_and_info : InternalStats::ppt_name_to_info) {
2011 bool res = db_->GetProperty(ppt_name_and_info.first, &prop);
2012 if (unknownPropertyNames.find(ppt_name_and_info.first) ==
2013 unknownPropertyNames.end()) {
2014 if (!res) {
2015 fprintf(stderr, "Failed to get DB property: %s\n",
2016 ppt_name_and_info.first.c_str());
2017 thread->shared->SetVerificationFailure();
2018 }
2019 if (ppt_name_and_info.second.handle_int != nullptr) {
2020 uint64_t prop_int;
2021 if (!db_->GetIntProperty(ppt_name_and_info.first, &prop_int)) {
2022 fprintf(stderr, "Failed to get Int property: %s\n",
2023 ppt_name_and_info.first.c_str());
2024 thread->shared->SetVerificationFailure();
2025 }
2026 }
2027 if (ppt_name_and_info.second.handle_map != nullptr) {
2028 std::map<std::string, std::string> prop_map;
2029 if (!db_->GetMapProperty(ppt_name_and_info.first, &prop_map)) {
2030 fprintf(stderr, "Failed to get Map property: %s\n",
2031 ppt_name_and_info.first.c_str());
2032 thread->shared->SetVerificationFailure();
2033 }
2034 }
2035 }
2036 }
2037
2038 ROCKSDB_NAMESPACE::ColumnFamilyMetaData cf_meta_data;
2039 db_->GetColumnFamilyMetaData(&cf_meta_data);
2040 int level_size = static_cast<int>(cf_meta_data.levels.size());
2041 for (int level = 0; level < level_size; level++) {
2042 for (const auto& ppt_name : levelPropertyNames) {
2043 bool res = db_->GetProperty(ppt_name + std::to_string(level), &prop);
2044 if (!res) {
2045 fprintf(stderr, "Failed to get DB property: %s\n",
2046 (ppt_name + std::to_string(level)).c_str());
2047 thread->shared->SetVerificationFailure();
2048 }
2049 }
2050 }
2051
2052 // Test for an invalid property name
2053 if (thread->rand.OneIn(100)) {
2054 if (db_->GetProperty("rocksdb.invalid_property_name", &prop)) {
2055 fprintf(stderr, "Failed to return false for invalid property name\n");
2056 thread->shared->SetVerificationFailure();
2057 }
2058 }
2059 }
2060
2061 void StressTest::TestCompactFiles(ThreadState* thread,
2062 ColumnFamilyHandle* column_family) {
2063 ROCKSDB_NAMESPACE::ColumnFamilyMetaData cf_meta_data;
2064 db_->GetColumnFamilyMetaData(column_family, &cf_meta_data);
2065
2066 if (cf_meta_data.levels.empty()) {
2067 return;
2068 }
2069
2070 // Randomly compact up to three consecutive files from a level
2071 const int kMaxRetry = 3;
2072 for (int attempt = 0; attempt < kMaxRetry; ++attempt) {
2073 size_t random_level =
2074 thread->rand.Uniform(static_cast<int>(cf_meta_data.levels.size()));
2075
2076 const auto& files = cf_meta_data.levels[random_level].files;
2077 if (files.size() > 0) {
2078 size_t random_file_index =
2079 thread->rand.Uniform(static_cast<int>(files.size()));
2080 if (files[random_file_index].being_compacted) {
2081 // Retry as the selected file is currently being compacted
2082 continue;
2083 }
2084
2085 std::vector<std::string> input_files;
2086 input_files.push_back(files[random_file_index].name);
2087 if (random_file_index > 0 &&
2088 !files[random_file_index - 1].being_compacted) {
2089 input_files.push_back(files[random_file_index - 1].name);
2090 }
2091 if (random_file_index + 1 < files.size() &&
2092 !files[random_file_index + 1].being_compacted) {
2093 input_files.push_back(files[random_file_index + 1].name);
2094 }
2095
2096 size_t output_level =
2097 std::min(random_level + 1, cf_meta_data.levels.size() - 1);
2098 auto s = db_->CompactFiles(CompactionOptions(), column_family,
2099 input_files, static_cast<int>(output_level));
2100 if (!s.ok()) {
2101 fprintf(stdout, "Unable to perform CompactFiles(): %s\n",
2102 s.ToString().c_str());
2103 thread->stats.AddNumCompactFilesFailed(1);
2104 } else {
2105 thread->stats.AddNumCompactFilesSucceed(1);
2106 }
2107 break;
2108 }
2109 }
2110 }
2111 #endif // ROCKSDB_LITE
2112
2113 Status StressTest::TestFlush(const std::vector<int>& rand_column_families) {
2114 FlushOptions flush_opts;
2115 if (FLAGS_atomic_flush) {
2116 return db_->Flush(flush_opts, column_families_);
2117 }
2118 std::vector<ColumnFamilyHandle*> cfhs;
2119 std::for_each(rand_column_families.begin(), rand_column_families.end(),
2120 [this, &cfhs](int k) { cfhs.push_back(column_families_[k]); });
2121 return db_->Flush(flush_opts, cfhs);
2122 }
2123
2124 Status StressTest::TestPauseBackground(ThreadState* thread) {
2125 Status status = db_->PauseBackgroundWork();
2126 if (!status.ok()) {
2127 return status;
2128 }
2129 // To avoid stalling/deadlocking ourself in this thread, just
2130 // sleep here during pause and let other threads do db operations.
2131 // Sleep up to ~16 seconds (2**24 microseconds), but very skewed
2132 // toward short pause. (1 chance in 25 of pausing >= 1s;
2133 // 1 chance in 625 of pausing full 16s.)
2134 int pwr2_micros =
2135 std::min(thread->rand.Uniform(25), thread->rand.Uniform(25));
2136 clock_->SleepForMicroseconds(1 << pwr2_micros);
2137 return db_->ContinueBackgroundWork();
2138 }
2139
2140 void StressTest::TestAcquireSnapshot(ThreadState* thread,
2141 int rand_column_family,
2142 const std::string& keystr, uint64_t i) {
2143 Slice key = keystr;
2144 ColumnFamilyHandle* column_family = column_families_[rand_column_family];
2145 // This `ReadOptions` is for validation purposes. Ignore
2146 // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
2147 ReadOptions ropt;
2148 #ifndef ROCKSDB_LITE
2149 auto db_impl = static_cast_with_check<DBImpl>(db_->GetRootDB());
2150 const bool ww_snapshot = thread->rand.OneIn(10);
2151 const Snapshot* snapshot =
2152 ww_snapshot ? db_impl->GetSnapshotForWriteConflictBoundary()
2153 : db_->GetSnapshot();
2154 #else
2155 const Snapshot* snapshot = db_->GetSnapshot();
2156 #endif // !ROCKSDB_LITE
2157 ropt.snapshot = snapshot;
2158
2159 // Ideally, we want snapshot taking and timestamp generation to be atomic
2160 // here, so that the snapshot corresponds to the timestamp. However, it is
2161 // not possible with current GetSnapshot() API.
2162 std::string ts_str;
2163 Slice ts;
2164 if (FLAGS_user_timestamp_size > 0) {
2165 ts_str = GetNowNanos();
2166 ts = ts_str;
2167 ropt.timestamp = &ts;
2168 }
2169
2170 std::string value_at;
2171 // When taking a snapshot, we also read a key from that snapshot. We
2172 // will later read the same key before releasing the snapshot and
2173 // verify that the results are the same.
2174 auto status_at = db_->Get(ropt, column_family, key, &value_at);
2175 std::vector<bool>* key_vec = nullptr;
2176
2177 if (FLAGS_compare_full_db_state_snapshot && (thread->tid == 0)) {
2178 key_vec = new std::vector<bool>(FLAGS_max_key);
2179 // When `prefix_extractor` is set, seeking to beginning and scanning
2180 // across prefixes are only supported with `total_order_seek` set.
2181 ropt.total_order_seek = true;
2182 std::unique_ptr<Iterator> iterator(db_->NewIterator(ropt));
2183 for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
2184 uint64_t key_val;
2185 if (GetIntVal(iterator->key().ToString(), &key_val)) {
2186 (*key_vec)[key_val] = true;
2187 }
2188 }
2189 }
2190
2191 ThreadState::SnapshotState snap_state = {snapshot,
2192 rand_column_family,
2193 column_family->GetName(),
2194 keystr,
2195 status_at,
2196 value_at,
2197 key_vec,
2198 ts_str};
2199 uint64_t hold_for = FLAGS_snapshot_hold_ops;
2200 if (FLAGS_long_running_snapshots) {
2201 // Hold 10% of snapshots for 10x more
2202 if (thread->rand.OneIn(10)) {
2203 assert(hold_for < std::numeric_limits<uint64_t>::max() / 10);
2204 hold_for *= 10;
2205 // Hold 1% of snapshots for 100x more
2206 if (thread->rand.OneIn(10)) {
2207 assert(hold_for < std::numeric_limits<uint64_t>::max() / 10);
2208 hold_for *= 10;
2209 }
2210 }
2211 }
2212 uint64_t release_at = std::min(FLAGS_ops_per_thread - 1, i + hold_for);
2213 thread->snapshot_queue.emplace(release_at, snap_state);
2214 }
2215
2216 Status StressTest::MaybeReleaseSnapshots(ThreadState* thread, uint64_t i) {
2217 while (!thread->snapshot_queue.empty() &&
2218 i >= thread->snapshot_queue.front().first) {
2219 auto snap_state = thread->snapshot_queue.front().second;
2220 assert(snap_state.snapshot);
2221 // Note: this is unsafe as the cf might be dropped concurrently. But
2222 // it is ok since unclean cf drop is cunnrently not supported by write
2223 // prepared transactions.
2224 Status s = AssertSame(db_, column_families_[snap_state.cf_at], snap_state);
2225 db_->ReleaseSnapshot(snap_state.snapshot);
2226 delete snap_state.key_vec;
2227 thread->snapshot_queue.pop();
2228 if (!s.ok()) {
2229 return s;
2230 }
2231 }
2232 return Status::OK();
2233 }
2234
2235 void StressTest::TestCompactRange(ThreadState* thread, int64_t rand_key,
2236 const Slice& start_key,
2237 ColumnFamilyHandle* column_family) {
2238 int64_t end_key_num;
2239 if (std::numeric_limits<int64_t>::max() - rand_key <
2240 FLAGS_compact_range_width) {
2241 end_key_num = std::numeric_limits<int64_t>::max();
2242 } else {
2243 end_key_num = FLAGS_compact_range_width + rand_key;
2244 }
2245 std::string end_key_buf = Key(end_key_num);
2246 Slice end_key(end_key_buf);
2247
2248 CompactRangeOptions cro;
2249 cro.exclusive_manual_compaction = static_cast<bool>(thread->rand.Next() % 2);
2250 cro.change_level = static_cast<bool>(thread->rand.Next() % 2);
2251 std::vector<BottommostLevelCompaction> bottom_level_styles = {
2252 BottommostLevelCompaction::kSkip,
2253 BottommostLevelCompaction::kIfHaveCompactionFilter,
2254 BottommostLevelCompaction::kForce,
2255 BottommostLevelCompaction::kForceOptimized};
2256 cro.bottommost_level_compaction =
2257 bottom_level_styles[thread->rand.Next() %
2258 static_cast<uint32_t>(bottom_level_styles.size())];
2259 cro.allow_write_stall = static_cast<bool>(thread->rand.Next() % 2);
2260 cro.max_subcompactions = static_cast<uint32_t>(thread->rand.Next() % 4);
2261 std::vector<BlobGarbageCollectionPolicy> blob_gc_policies = {
2262 BlobGarbageCollectionPolicy::kForce,
2263 BlobGarbageCollectionPolicy::kDisable,
2264 BlobGarbageCollectionPolicy::kUseDefault};
2265 cro.blob_garbage_collection_policy =
2266 blob_gc_policies[thread->rand.Next() %
2267 static_cast<uint32_t>(blob_gc_policies.size())];
2268 cro.blob_garbage_collection_age_cutoff =
2269 static_cast<double>(thread->rand.Next() % 100) / 100.0;
2270
2271 const Snapshot* pre_snapshot = nullptr;
2272 uint32_t pre_hash = 0;
2273 if (thread->rand.OneIn(2)) {
2274 // Do some validation by declaring a snapshot and compare the data before
2275 // and after the compaction
2276 pre_snapshot = db_->GetSnapshot();
2277 pre_hash =
2278 GetRangeHash(thread, pre_snapshot, column_family, start_key, end_key);
2279 }
2280
2281 Status status = db_->CompactRange(cro, column_family, &start_key, &end_key);
2282
2283 if (!status.ok()) {
2284 fprintf(stdout, "Unable to perform CompactRange(): %s\n",
2285 status.ToString().c_str());
2286 }
2287
2288 if (pre_snapshot != nullptr) {
2289 uint32_t post_hash =
2290 GetRangeHash(thread, pre_snapshot, column_family, start_key, end_key);
2291 if (pre_hash != post_hash) {
2292 fprintf(stderr,
2293 "Data hash different before and after compact range "
2294 "start_key %s end_key %s\n",
2295 start_key.ToString(true).c_str(), end_key.ToString(true).c_str());
2296 thread->stats.AddErrors(1);
2297 // Fail fast to preserve the DB state.
2298 thread->shared->SetVerificationFailure();
2299 }
2300 db_->ReleaseSnapshot(pre_snapshot);
2301 }
2302 }
2303
2304 uint32_t StressTest::GetRangeHash(ThreadState* thread, const Snapshot* snapshot,
2305 ColumnFamilyHandle* column_family,
2306 const Slice& start_key,
2307 const Slice& end_key) {
2308 // This `ReadOptions` is for validation purposes. Ignore
2309 // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
2310 ReadOptions ro;
2311 ro.snapshot = snapshot;
2312 ro.total_order_seek = true;
2313 std::string ts_str;
2314 Slice ts;
2315 if (FLAGS_user_timestamp_size > 0) {
2316 ts_str = GetNowNanos();
2317 ts = ts_str;
2318 ro.timestamp = &ts;
2319 }
2320
2321 std::unique_ptr<Iterator> it(db_->NewIterator(ro, column_family));
2322
2323 constexpr char kCrcCalculatorSepearator = ';';
2324
2325 uint32_t crc = 0;
2326
2327 for (it->Seek(start_key);
2328 it->Valid() && options_.comparator->Compare(it->key(), end_key) <= 0;
2329 it->Next()) {
2330 crc = crc32c::Extend(crc, it->key().data(), it->key().size());
2331 crc = crc32c::Extend(crc, &kCrcCalculatorSepearator, sizeof(char));
2332 crc = crc32c::Extend(crc, it->value().data(), it->value().size());
2333 crc = crc32c::Extend(crc, &kCrcCalculatorSepearator, sizeof(char));
2334
2335 for (const auto& column : it->columns()) {
2336 crc = crc32c::Extend(crc, column.name().data(), column.name().size());
2337 crc = crc32c::Extend(crc, &kCrcCalculatorSepearator, sizeof(char));
2338 crc = crc32c::Extend(crc, column.value().data(), column.value().size());
2339 crc = crc32c::Extend(crc, &kCrcCalculatorSepearator, sizeof(char));
2340 }
2341 }
2342
2343 if (!it->status().ok()) {
2344 fprintf(stderr, "Iterator non-OK when calculating range CRC: %s\n",
2345 it->status().ToString().c_str());
2346 thread->stats.AddErrors(1);
2347 // Fail fast to preserve the DB state.
2348 thread->shared->SetVerificationFailure();
2349 }
2350
2351 return crc;
2352 }
2353
2354 void StressTest::PrintEnv() const {
2355 fprintf(stdout, "RocksDB version : %d.%d\n", kMajorVersion,
2356 kMinorVersion);
2357 fprintf(stdout, "Format version : %d\n", FLAGS_format_version);
2358 fprintf(stdout, "TransactionDB : %s\n",
2359 FLAGS_use_txn ? "true" : "false");
2360
2361 if (FLAGS_use_txn) {
2362 #ifndef ROCKSDB_LITE
2363 fprintf(stdout, "Two write queues: : %s\n",
2364 FLAGS_two_write_queues ? "true" : "false");
2365 fprintf(stdout, "Write policy : %d\n",
2366 static_cast<int>(FLAGS_txn_write_policy));
2367 if (static_cast<uint64_t>(TxnDBWritePolicy::WRITE_PREPARED) ==
2368 FLAGS_txn_write_policy ||
2369 static_cast<uint64_t>(TxnDBWritePolicy::WRITE_UNPREPARED) ==
2370 FLAGS_txn_write_policy) {
2371 fprintf(stdout, "Snapshot cache bits : %d\n",
2372 static_cast<int>(FLAGS_wp_snapshot_cache_bits));
2373 fprintf(stdout, "Commit cache bits : %d\n",
2374 static_cast<int>(FLAGS_wp_commit_cache_bits));
2375 }
2376 fprintf(stdout, "last cwb for recovery : %s\n",
2377 FLAGS_use_only_the_last_commit_time_batch_for_recovery ? "true"
2378 : "false");
2379 #endif // !ROCKSDB_LITE
2380 }
2381
2382 #ifndef ROCKSDB_LITE
2383 fprintf(stdout, "Stacked BlobDB : %s\n",
2384 FLAGS_use_blob_db ? "true" : "false");
2385 #endif // !ROCKSDB_LITE
2386 fprintf(stdout, "Read only mode : %s\n",
2387 FLAGS_read_only ? "true" : "false");
2388 fprintf(stdout, "Atomic flush : %s\n",
2389 FLAGS_atomic_flush ? "true" : "false");
2390 fprintf(stdout, "Manual WAL flush : %s\n",
2391 FLAGS_manual_wal_flush_one_in > 0 ? "true" : "false");
2392 fprintf(stdout, "Column families : %d\n", FLAGS_column_families);
2393 if (!FLAGS_test_batches_snapshots) {
2394 fprintf(stdout, "Clear CFs one in : %d\n",
2395 FLAGS_clear_column_family_one_in);
2396 }
2397 fprintf(stdout, "Number of threads : %d\n", FLAGS_threads);
2398 fprintf(stdout, "Ops per thread : %lu\n",
2399 (unsigned long)FLAGS_ops_per_thread);
2400 std::string ttl_state("unused");
2401 if (FLAGS_ttl > 0) {
2402 ttl_state = std::to_string(FLAGS_ttl);
2403 }
2404 fprintf(stdout, "Time to live(sec) : %s\n", ttl_state.c_str());
2405 fprintf(stdout, "Read percentage : %d%%\n", FLAGS_readpercent);
2406 fprintf(stdout, "Prefix percentage : %d%%\n", FLAGS_prefixpercent);
2407 fprintf(stdout, "Write percentage : %d%%\n", FLAGS_writepercent);
2408 fprintf(stdout, "Delete percentage : %d%%\n", FLAGS_delpercent);
2409 fprintf(stdout, "Delete range percentage : %d%%\n", FLAGS_delrangepercent);
2410 fprintf(stdout, "No overwrite percentage : %d%%\n",
2411 FLAGS_nooverwritepercent);
2412 fprintf(stdout, "Iterate percentage : %d%%\n", FLAGS_iterpercent);
2413 fprintf(stdout, "Custom ops percentage : %d%%\n", FLAGS_customopspercent);
2414 fprintf(stdout, "DB-write-buffer-size : %" PRIu64 "\n",
2415 FLAGS_db_write_buffer_size);
2416 fprintf(stdout, "Write-buffer-size : %d\n", FLAGS_write_buffer_size);
2417 fprintf(stdout, "Iterations : %lu\n",
2418 (unsigned long)FLAGS_num_iterations);
2419 fprintf(stdout, "Max key : %lu\n",
2420 (unsigned long)FLAGS_max_key);
2421 fprintf(stdout, "Ratio #ops/#keys : %f\n",
2422 (1.0 * FLAGS_ops_per_thread * FLAGS_threads) / FLAGS_max_key);
2423 fprintf(stdout, "Num times DB reopens : %d\n", FLAGS_reopen);
2424 fprintf(stdout, "Batches/snapshots : %d\n",
2425 FLAGS_test_batches_snapshots);
2426 fprintf(stdout, "Do update in place : %d\n", FLAGS_in_place_update);
2427 fprintf(stdout, "Num keys per lock : %d\n",
2428 1 << FLAGS_log2_keys_per_lock);
2429 std::string compression = CompressionTypeToString(compression_type_e);
2430 fprintf(stdout, "Compression : %s\n", compression.c_str());
2431 std::string bottommost_compression =
2432 CompressionTypeToString(bottommost_compression_type_e);
2433 fprintf(stdout, "Bottommost Compression : %s\n",
2434 bottommost_compression.c_str());
2435 std::string checksum = ChecksumTypeToString(checksum_type_e);
2436 fprintf(stdout, "Checksum type : %s\n", checksum.c_str());
2437 fprintf(stdout, "File checksum impl : %s\n",
2438 FLAGS_file_checksum_impl.c_str());
2439 fprintf(stdout, "Bloom bits / key : %s\n",
2440 FormatDoubleParam(FLAGS_bloom_bits).c_str());
2441 fprintf(stdout, "Max subcompactions : %" PRIu64 "\n",
2442 FLAGS_subcompactions);
2443 fprintf(stdout, "Use MultiGet : %s\n",
2444 FLAGS_use_multiget ? "true" : "false");
2445
2446 const char* memtablerep = "";
2447 switch (FLAGS_rep_factory) {
2448 case kSkipList:
2449 memtablerep = "skip_list";
2450 break;
2451 case kHashSkipList:
2452 memtablerep = "prefix_hash";
2453 break;
2454 case kVectorRep:
2455 memtablerep = "vector";
2456 break;
2457 }
2458
2459 fprintf(stdout, "Memtablerep : %s\n", memtablerep);
2460
2461 #ifndef NDEBUG
2462 KillPoint* kp = KillPoint::GetInstance();
2463 fprintf(stdout, "Test kill odd : %d\n", kp->rocksdb_kill_odds);
2464 if (!kp->rocksdb_kill_exclude_prefixes.empty()) {
2465 fprintf(stdout, "Skipping kill points prefixes:\n");
2466 for (auto& p : kp->rocksdb_kill_exclude_prefixes) {
2467 fprintf(stdout, " %s\n", p.c_str());
2468 }
2469 }
2470 #endif
2471 fprintf(stdout, "Periodic Compaction Secs : %" PRIu64 "\n",
2472 FLAGS_periodic_compaction_seconds);
2473 fprintf(stdout, "Compaction TTL : %" PRIu64 "\n",
2474 FLAGS_compaction_ttl);
2475 const char* compaction_pri = "";
2476 switch (FLAGS_compaction_pri) {
2477 case kByCompensatedSize:
2478 compaction_pri = "kByCompensatedSize";
2479 break;
2480 case kOldestLargestSeqFirst:
2481 compaction_pri = "kOldestLargestSeqFirst";
2482 break;
2483 case kOldestSmallestSeqFirst:
2484 compaction_pri = "kOldestSmallestSeqFirst";
2485 break;
2486 case kMinOverlappingRatio:
2487 compaction_pri = "kMinOverlappingRatio";
2488 break;
2489 case kRoundRobin:
2490 compaction_pri = "kRoundRobin";
2491 break;
2492 }
2493 fprintf(stdout, "Compaction Pri : %s\n", compaction_pri);
2494 fprintf(stdout, "Background Purge : %d\n",
2495 static_cast<int>(FLAGS_avoid_unnecessary_blocking_io));
2496 fprintf(stdout, "Write DB ID to manifest : %d\n",
2497 static_cast<int>(FLAGS_write_dbid_to_manifest));
2498 fprintf(stdout, "Max Write Batch Group Size: %" PRIu64 "\n",
2499 FLAGS_max_write_batch_group_size_bytes);
2500 fprintf(stdout, "Use dynamic level : %d\n",
2501 static_cast<int>(FLAGS_level_compaction_dynamic_level_bytes));
2502 fprintf(stdout, "Read fault one in : %d\n", FLAGS_read_fault_one_in);
2503 fprintf(stdout, "Write fault one in : %d\n", FLAGS_write_fault_one_in);
2504 fprintf(stdout, "Open metadata write fault one in:\n");
2505 fprintf(stdout, " %d\n",
2506 FLAGS_open_metadata_write_fault_one_in);
2507 fprintf(stdout, "Sync fault injection : %d\n",
2508 FLAGS_sync_fault_injection);
2509 fprintf(stdout, "Best efforts recovery : %d\n",
2510 static_cast<int>(FLAGS_best_efforts_recovery));
2511 fprintf(stdout, "Fail if OPTIONS file error: %d\n",
2512 static_cast<int>(FLAGS_fail_if_options_file_error));
2513 fprintf(stdout, "User timestamp size bytes : %d\n",
2514 static_cast<int>(FLAGS_user_timestamp_size));
2515 fprintf(stdout, "WAL compression : %s\n",
2516 FLAGS_wal_compression.c_str());
2517 fprintf(stdout, "Try verify sst unique id : %d\n",
2518 static_cast<int>(FLAGS_verify_sst_unique_id_in_manifest));
2519
2520 fprintf(stdout, "------------------------------------------------\n");
2521 }
2522
2523 void StressTest::Open(SharedState* shared) {
2524 assert(db_ == nullptr);
2525 #ifndef ROCKSDB_LITE
2526 assert(txn_db_ == nullptr);
2527 #else
2528 (void)shared;
2529 #endif
2530 if (!InitializeOptionsFromFile(options_)) {
2531 InitializeOptionsFromFlags(cache_, compressed_cache_, filter_policy_,
2532 options_);
2533 }
2534 InitializeOptionsGeneral(cache_, compressed_cache_, filter_policy_, options_);
2535
2536 if (FLAGS_prefix_size == 0 && FLAGS_rep_factory == kHashSkipList) {
2537 fprintf(stderr,
2538 "prefeix_size cannot be zero if memtablerep == prefix_hash\n");
2539 exit(1);
2540 }
2541 if (FLAGS_prefix_size != 0 && FLAGS_rep_factory != kHashSkipList) {
2542 fprintf(stderr,
2543 "WARNING: prefix_size is non-zero but "
2544 "memtablerep != prefix_hash\n");
2545 }
2546
2547 if ((options_.enable_blob_files || options_.enable_blob_garbage_collection ||
2548 FLAGS_allow_setting_blob_options_dynamically) &&
2549 FLAGS_best_efforts_recovery) {
2550 fprintf(stderr,
2551 "Integrated BlobDB is currently incompatible with best-effort "
2552 "recovery\n");
2553 exit(1);
2554 }
2555
2556 fprintf(stdout,
2557 "Integrated BlobDB: blob files enabled %d, min blob size %" PRIu64
2558 ", blob file size %" PRIu64
2559 ", blob compression type %s, blob GC enabled %d, cutoff %f, force "
2560 "threshold %f, blob compaction readahead size %" PRIu64
2561 ", blob file starting level %d\n",
2562 options_.enable_blob_files, options_.min_blob_size,
2563 options_.blob_file_size,
2564 CompressionTypeToString(options_.blob_compression_type).c_str(),
2565 options_.enable_blob_garbage_collection,
2566 options_.blob_garbage_collection_age_cutoff,
2567 options_.blob_garbage_collection_force_threshold,
2568 options_.blob_compaction_readahead_size,
2569 options_.blob_file_starting_level);
2570
2571 if (FLAGS_use_blob_cache) {
2572 fprintf(stdout,
2573 "Integrated BlobDB: blob cache enabled"
2574 ", block and blob caches shared: %d",
2575 FLAGS_use_shared_block_and_blob_cache);
2576 if (!FLAGS_use_shared_block_and_blob_cache) {
2577 fprintf(stdout,
2578 ", blob cache size %" PRIu64 ", blob cache num shard bits: %d",
2579 FLAGS_blob_cache_size, FLAGS_blob_cache_numshardbits);
2580 }
2581 fprintf(stdout, ", blob cache prepopulated: %d\n",
2582 FLAGS_prepopulate_blob_cache);
2583 } else {
2584 fprintf(stdout, "Integrated BlobDB: blob cache disabled\n");
2585 }
2586
2587 fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());
2588
2589 Status s;
2590
2591 if (FLAGS_ttl == -1) {
2592 std::vector<std::string> existing_column_families;
2593 s = DB::ListColumnFamilies(DBOptions(options_), FLAGS_db,
2594 &existing_column_families); // ignore errors
2595 if (!s.ok()) {
2596 // DB doesn't exist
2597 assert(existing_column_families.empty());
2598 assert(column_family_names_.empty());
2599 column_family_names_.push_back(kDefaultColumnFamilyName);
2600 } else if (column_family_names_.empty()) {
2601 // this is the first call to the function Open()
2602 column_family_names_ = existing_column_families;
2603 } else {
2604 // this is a reopen. just assert that existing column_family_names are
2605 // equivalent to what we remember
2606 auto sorted_cfn = column_family_names_;
2607 std::sort(sorted_cfn.begin(), sorted_cfn.end());
2608 std::sort(existing_column_families.begin(),
2609 existing_column_families.end());
2610 if (sorted_cfn != existing_column_families) {
2611 fprintf(stderr, "Expected column families differ from the existing:\n");
2612 fprintf(stderr, "Expected: {");
2613 for (auto cf : sorted_cfn) {
2614 fprintf(stderr, "%s ", cf.c_str());
2615 }
2616 fprintf(stderr, "}\n");
2617 fprintf(stderr, "Existing: {");
2618 for (auto cf : existing_column_families) {
2619 fprintf(stderr, "%s ", cf.c_str());
2620 }
2621 fprintf(stderr, "}\n");
2622 }
2623 assert(sorted_cfn == existing_column_families);
2624 }
2625 std::vector<ColumnFamilyDescriptor> cf_descriptors;
2626 for (auto name : column_family_names_) {
2627 if (name != kDefaultColumnFamilyName) {
2628 new_column_family_name_ =
2629 std::max(new_column_family_name_.load(), std::stoi(name) + 1);
2630 }
2631 cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
2632 }
2633 while (cf_descriptors.size() < (size_t)FLAGS_column_families) {
2634 std::string name = std::to_string(new_column_family_name_.load());
2635 new_column_family_name_++;
2636 cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
2637 column_family_names_.push_back(name);
2638 }
2639
2640 options_.listeners.clear();
2641 #ifndef ROCKSDB_LITE
2642 options_.listeners.emplace_back(new DbStressListener(
2643 FLAGS_db, options_.db_paths, cf_descriptors, db_stress_listener_env));
2644 #endif // !ROCKSDB_LITE
2645 RegisterAdditionalListeners();
2646
2647 if (!FLAGS_use_txn) {
2648 // Determine whether we need to ingest file metadata write failures
2649 // during DB reopen. If it does, enable it.
2650 // Only ingest metadata error if it is reopening, as initial open
2651 // failure doesn't need to be handled.
2652 // TODO cover transaction DB is not covered in this fault test too.
2653 bool ingest_meta_error = false;
2654 bool ingest_write_error = false;
2655 bool ingest_read_error = false;
2656 if ((FLAGS_open_metadata_write_fault_one_in ||
2657 FLAGS_open_write_fault_one_in || FLAGS_open_read_fault_one_in) &&
2658 fault_fs_guard
2659 ->FileExists(FLAGS_db + "/CURRENT", IOOptions(), nullptr)
2660 .ok()) {
2661 if (!FLAGS_sync) {
2662 // When DB Stress is not sync mode, we expect all WAL writes to
2663 // WAL is durable. Buffering unsynced writes will cause false
2664 // positive in crash tests. Before we figure out a way to
2665 // solve it, skip WAL from failure injection.
2666 fault_fs_guard->SetSkipDirectWritableTypes({kWalFile});
2667 }
2668 ingest_meta_error = FLAGS_open_metadata_write_fault_one_in;
2669 ingest_write_error = FLAGS_open_write_fault_one_in;
2670 ingest_read_error = FLAGS_open_read_fault_one_in;
2671 if (ingest_meta_error) {
2672 fault_fs_guard->EnableMetadataWriteErrorInjection();
2673 fault_fs_guard->SetRandomMetadataWriteError(
2674 FLAGS_open_metadata_write_fault_one_in);
2675 }
2676 if (ingest_write_error) {
2677 fault_fs_guard->SetFilesystemDirectWritable(false);
2678 fault_fs_guard->EnableWriteErrorInjection();
2679 fault_fs_guard->SetRandomWriteError(
2680 static_cast<uint32_t>(FLAGS_seed), FLAGS_open_write_fault_one_in,
2681 IOStatus::IOError("Injected Open Error"),
2682 /*inject_for_all_file_types=*/true, /*types=*/{});
2683 }
2684 if (ingest_read_error) {
2685 fault_fs_guard->SetRandomReadError(FLAGS_open_read_fault_one_in);
2686 }
2687 }
2688 while (true) {
2689 #ifndef ROCKSDB_LITE
2690 // StackableDB-based BlobDB
2691 if (FLAGS_use_blob_db) {
2692 blob_db::BlobDBOptions blob_db_options;
2693 blob_db_options.min_blob_size = FLAGS_blob_db_min_blob_size;
2694 blob_db_options.bytes_per_sync = FLAGS_blob_db_bytes_per_sync;
2695 blob_db_options.blob_file_size = FLAGS_blob_db_file_size;
2696 blob_db_options.enable_garbage_collection = FLAGS_blob_db_enable_gc;
2697 blob_db_options.garbage_collection_cutoff = FLAGS_blob_db_gc_cutoff;
2698
2699 blob_db::BlobDB* blob_db = nullptr;
2700 s = blob_db::BlobDB::Open(options_, blob_db_options, FLAGS_db,
2701 cf_descriptors, &column_families_,
2702 &blob_db);
2703 if (s.ok()) {
2704 db_ = blob_db;
2705 }
2706 } else
2707 #endif // !ROCKSDB_LITE
2708 {
2709 if (db_preload_finished_.load() && FLAGS_read_only) {
2710 s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db,
2711 cf_descriptors, &column_families_, &db_);
2712 } else {
2713 s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
2714 &column_families_, &db_);
2715 }
2716 }
2717
2718 if (ingest_meta_error || ingest_write_error || ingest_read_error) {
2719 fault_fs_guard->SetFilesystemDirectWritable(true);
2720 fault_fs_guard->DisableMetadataWriteErrorInjection();
2721 fault_fs_guard->DisableWriteErrorInjection();
2722 fault_fs_guard->SetSkipDirectWritableTypes({});
2723 fault_fs_guard->SetRandomReadError(0);
2724 if (s.ok()) {
2725 // Ingested errors might happen in background compactions. We
2726 // wait for all compactions to finish to make sure DB is in
2727 // clean state before executing queries.
2728 s = static_cast_with_check<DBImpl>(db_->GetRootDB())
2729 ->WaitForCompact(true /* wait_unscheduled */);
2730 if (!s.ok()) {
2731 for (auto cf : column_families_) {
2732 delete cf;
2733 }
2734 column_families_.clear();
2735 delete db_;
2736 db_ = nullptr;
2737 }
2738 }
2739 if (!s.ok()) {
2740 // After failure to opening a DB due to IO error, retry should
2741 // successfully open the DB with correct data if no IO error shows
2742 // up.
2743 ingest_meta_error = false;
2744 ingest_write_error = false;
2745 ingest_read_error = false;
2746
2747 Random rand(static_cast<uint32_t>(FLAGS_seed));
2748 if (rand.OneIn(2)) {
2749 fault_fs_guard->DeleteFilesCreatedAfterLastDirSync(IOOptions(),
2750 nullptr);
2751 }
2752 if (rand.OneIn(3)) {
2753 fault_fs_guard->DropUnsyncedFileData();
2754 } else if (rand.OneIn(2)) {
2755 fault_fs_guard->DropRandomUnsyncedFileData(&rand);
2756 }
2757 continue;
2758 }
2759 }
2760 break;
2761 }
2762 } else {
2763 #ifndef ROCKSDB_LITE
2764 TransactionDBOptions txn_db_options;
2765 assert(FLAGS_txn_write_policy <= TxnDBWritePolicy::WRITE_UNPREPARED);
2766 txn_db_options.write_policy =
2767 static_cast<TxnDBWritePolicy>(FLAGS_txn_write_policy);
2768 if (FLAGS_unordered_write) {
2769 assert(txn_db_options.write_policy == TxnDBWritePolicy::WRITE_PREPARED);
2770 options_.unordered_write = true;
2771 options_.two_write_queues = true;
2772 txn_db_options.skip_concurrency_control = true;
2773 } else {
2774 options_.two_write_queues = FLAGS_two_write_queues;
2775 }
2776 txn_db_options.wp_snapshot_cache_bits =
2777 static_cast<size_t>(FLAGS_wp_snapshot_cache_bits);
2778 txn_db_options.wp_commit_cache_bits =
2779 static_cast<size_t>(FLAGS_wp_commit_cache_bits);
2780 PrepareTxnDbOptions(shared, txn_db_options);
2781 s = TransactionDB::Open(options_, txn_db_options, FLAGS_db,
2782 cf_descriptors, &column_families_, &txn_db_);
2783 if (!s.ok()) {
2784 fprintf(stderr, "Error in opening the TransactionDB [%s]\n",
2785 s.ToString().c_str());
2786 fflush(stderr);
2787 }
2788 assert(s.ok());
2789
2790 // Do not swap the order of the following.
2791 {
2792 db_ = txn_db_;
2793 db_aptr_.store(txn_db_, std::memory_order_release);
2794 }
2795 #endif
2796 }
2797 if (!s.ok()) {
2798 fprintf(stderr, "Error in opening the DB [%s]\n", s.ToString().c_str());
2799 fflush(stderr);
2800 }
2801 assert(s.ok());
2802 assert(column_families_.size() ==
2803 static_cast<size_t>(FLAGS_column_families));
2804
2805 // Secondary instance does not support write-prepared/write-unprepared
2806 // transactions, thus just disable secondary instance if we use
2807 // transaction.
2808 if (s.ok() && FLAGS_test_secondary && !FLAGS_use_txn) {
2809 #ifndef ROCKSDB_LITE
2810 Options tmp_opts;
2811 // TODO(yanqin) support max_open_files != -1 for secondary instance.
2812 tmp_opts.max_open_files = -1;
2813 tmp_opts.env = db_stress_env;
2814 const std::string& secondary_path = FLAGS_secondaries_base;
2815 s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
2816 cf_descriptors, &cmp_cfhs_, &cmp_db_);
2817 assert(s.ok());
2818 assert(cmp_cfhs_.size() == static_cast<size_t>(FLAGS_column_families));
2819 #else
2820 fprintf(stderr, "Secondary is not supported in RocksDBLite\n");
2821 exit(1);
2822 #endif // !ROCKSDB_LITE
2823 }
2824 } else {
2825 #ifndef ROCKSDB_LITE
2826 DBWithTTL* db_with_ttl;
2827 s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl);
2828 db_ = db_with_ttl;
2829 #else
2830 fprintf(stderr, "TTL is not supported in RocksDBLite\n");
2831 exit(1);
2832 #endif
2833 }
2834
2835 if (FLAGS_preserve_unverified_changes) {
2836 // Up until now, no live file should have become obsolete due to these
2837 // options. After `DisableFileDeletions()` we can reenable auto compactions
2838 // since, even if live files become obsolete, they won't be deleted.
2839 assert(options_.avoid_flush_during_recovery);
2840 assert(options_.disable_auto_compactions);
2841 if (s.ok()) {
2842 s = db_->DisableFileDeletions();
2843 }
2844 if (s.ok()) {
2845 s = db_->EnableAutoCompaction(column_families_);
2846 }
2847 }
2848
2849 if (!s.ok()) {
2850 fprintf(stderr, "open error: %s\n", s.ToString().c_str());
2851 exit(1);
2852 }
2853 }
2854
2855 void StressTest::Reopen(ThreadState* thread) {
2856 #ifndef ROCKSDB_LITE
2857 // BG jobs in WritePrepared must be canceled first because i) they can access
2858 // the db via a callbac ii) they hold on to a snapshot and the upcoming
2859 // ::Close would complain about it.
2860 const bool write_prepared = FLAGS_use_txn && FLAGS_txn_write_policy != 0;
2861 bool bg_canceled __attribute__((unused)) = false;
2862 if (write_prepared || thread->rand.OneIn(2)) {
2863 const bool wait =
2864 write_prepared || static_cast<bool>(thread->rand.OneIn(2));
2865 CancelAllBackgroundWork(db_, wait);
2866 bg_canceled = wait;
2867 }
2868 assert(!write_prepared || bg_canceled);
2869 #else
2870 (void)thread;
2871 #endif
2872
2873 for (auto cf : column_families_) {
2874 delete cf;
2875 }
2876 column_families_.clear();
2877
2878 #ifndef ROCKSDB_LITE
2879 if (thread->rand.OneIn(2)) {
2880 Status s = db_->Close();
2881 if (!s.ok()) {
2882 fprintf(stderr, "Non-ok close status: %s\n", s.ToString().c_str());
2883 fflush(stderr);
2884 }
2885 assert(s.ok());
2886 }
2887 #endif
2888 delete db_;
2889 db_ = nullptr;
2890 #ifndef ROCKSDB_LITE
2891 txn_db_ = nullptr;
2892 #endif
2893
2894 num_times_reopened_++;
2895 auto now = clock_->NowMicros();
2896 fprintf(stdout, "%s Reopening database for the %dth time\n",
2897 clock_->TimeToString(now / 1000000).c_str(), num_times_reopened_);
2898 Open(thread->shared);
2899
2900 if ((FLAGS_sync_fault_injection || FLAGS_disable_wal ||
2901 FLAGS_manual_wal_flush_one_in > 0) &&
2902 IsStateTracked()) {
2903 Status s = thread->shared->SaveAtAndAfter(db_);
2904 if (!s.ok()) {
2905 fprintf(stderr, "Error enabling history tracing: %s\n",
2906 s.ToString().c_str());
2907 exit(1);
2908 }
2909 }
2910 }
2911
2912 bool StressTest::MaybeUseOlderTimestampForPointLookup(ThreadState* thread,
2913 std::string& ts_str,
2914 Slice& ts_slice,
2915 ReadOptions& read_opts) {
2916 if (FLAGS_user_timestamp_size == 0) {
2917 return false;
2918 }
2919
2920 assert(thread);
2921 if (!thread->rand.OneInOpt(3)) {
2922 return false;
2923 }
2924
2925 const SharedState* const shared = thread->shared;
2926 assert(shared);
2927 const uint64_t start_ts = shared->GetStartTimestamp();
2928
2929 uint64_t now = db_stress_env->NowNanos();
2930
2931 assert(now > start_ts);
2932 uint64_t time_diff = now - start_ts;
2933 uint64_t ts = start_ts + (thread->rand.Next64() % time_diff);
2934 ts_str.clear();
2935 PutFixed64(&ts_str, ts);
2936 ts_slice = ts_str;
2937 read_opts.timestamp = &ts_slice;
2938 return true;
2939 }
2940
2941 void StressTest::MaybeUseOlderTimestampForRangeScan(ThreadState* thread,
2942 std::string& ts_str,
2943 Slice& ts_slice,
2944 ReadOptions& read_opts) {
2945 if (FLAGS_user_timestamp_size == 0) {
2946 return;
2947 }
2948
2949 assert(thread);
2950 if (!thread->rand.OneInOpt(3)) {
2951 return;
2952 }
2953
2954 const Slice* const saved_ts = read_opts.timestamp;
2955 assert(saved_ts != nullptr);
2956
2957 const SharedState* const shared = thread->shared;
2958 assert(shared);
2959 const uint64_t start_ts = shared->GetStartTimestamp();
2960
2961 uint64_t now = db_stress_env->NowNanos();
2962
2963 assert(now > start_ts);
2964 uint64_t time_diff = now - start_ts;
2965 uint64_t ts = start_ts + (thread->rand.Next64() % time_diff);
2966 ts_str.clear();
2967 PutFixed64(&ts_str, ts);
2968 ts_slice = ts_str;
2969 read_opts.timestamp = &ts_slice;
2970
2971 // TODO (yanqin): support Merge with iter_start_ts
2972 if (!thread->rand.OneInOpt(3) || FLAGS_use_merge || FLAGS_use_full_merge_v1) {
2973 return;
2974 }
2975
2976 ts_str.clear();
2977 PutFixed64(&ts_str, start_ts);
2978 ts_slice = ts_str;
2979 read_opts.iter_start_ts = &ts_slice;
2980 read_opts.timestamp = saved_ts;
2981 }
2982
2983 void CheckAndSetOptionsForUserTimestamp(Options& options) {
2984 assert(FLAGS_user_timestamp_size > 0);
2985 const Comparator* const cmp = test::BytewiseComparatorWithU64TsWrapper();
2986 assert(cmp);
2987 if (FLAGS_user_timestamp_size != cmp->timestamp_size()) {
2988 fprintf(stderr,
2989 "Only -user_timestamp_size=%d is supported in stress test.\n",
2990 static_cast<int>(cmp->timestamp_size()));
2991 exit(1);
2992 }
2993 if (FLAGS_use_txn) {
2994 fprintf(stderr, "TransactionDB does not support timestamp yet.\n");
2995 exit(1);
2996 }
2997 #ifndef ROCKSDB_LITE
2998 if (FLAGS_enable_blob_files || FLAGS_use_blob_db) {
2999 fprintf(stderr, "BlobDB not supported with timestamp.\n");
3000 exit(1);
3001 }
3002 #endif // !ROCKSDB_LITE
3003 if (FLAGS_test_cf_consistency || FLAGS_test_batches_snapshots) {
3004 fprintf(stderr,
3005 "Due to per-key ts-seq ordering constraint, only the (default) "
3006 "non-batched test is supported with timestamp.\n");
3007 exit(1);
3008 }
3009 if (FLAGS_ingest_external_file_one_in > 0) {
3010 fprintf(stderr, "Bulk loading may not support timestamp yet.\n");
3011 exit(1);
3012 }
3013 options.comparator = cmp;
3014 }
3015
3016 bool InitializeOptionsFromFile(Options& options) {
3017 #ifndef ROCKSDB_LITE
3018 DBOptions db_options;
3019 std::vector<ColumnFamilyDescriptor> cf_descriptors;
3020 if (!FLAGS_options_file.empty()) {
3021 Status s = LoadOptionsFromFile(FLAGS_options_file, db_stress_env,
3022 &db_options, &cf_descriptors);
3023 if (!s.ok()) {
3024 fprintf(stderr, "Unable to load options file %s --- %s\n",
3025 FLAGS_options_file.c_str(), s.ToString().c_str());
3026 exit(1);
3027 }
3028 db_options.env = new DbStressEnvWrapper(db_stress_env);
3029 options = Options(db_options, cf_descriptors[0].options);
3030 return true;
3031 }
3032 #else
3033 (void)options;
3034 fprintf(stderr, "--options_file not supported in lite mode\n");
3035 exit(1);
3036 #endif //! ROCKSDB_LITE
3037 return false;
3038 }
3039
3040 void InitializeOptionsFromFlags(
3041 const std::shared_ptr<Cache>& cache,
3042 const std::shared_ptr<Cache>& block_cache_compressed,
3043 const std::shared_ptr<const FilterPolicy>& filter_policy,
3044 Options& options) {
3045 BlockBasedTableOptions block_based_options;
3046 block_based_options.block_cache = cache;
3047 block_based_options.cache_index_and_filter_blocks =
3048 FLAGS_cache_index_and_filter_blocks;
3049 block_based_options.metadata_cache_options.top_level_index_pinning =
3050 static_cast<PinningTier>(FLAGS_top_level_index_pinning);
3051 block_based_options.metadata_cache_options.partition_pinning =
3052 static_cast<PinningTier>(FLAGS_partition_pinning);
3053 block_based_options.metadata_cache_options.unpartitioned_pinning =
3054 static_cast<PinningTier>(FLAGS_unpartitioned_pinning);
3055 block_based_options.block_cache_compressed = block_cache_compressed;
3056 block_based_options.checksum = checksum_type_e;
3057 block_based_options.block_size = FLAGS_block_size;
3058 block_based_options.cache_usage_options.options_overrides.insert(
3059 {CacheEntryRole::kCompressionDictionaryBuildingBuffer,
3060 {/*.charged = */ FLAGS_charge_compression_dictionary_building_buffer
3061 ? CacheEntryRoleOptions::Decision::kEnabled
3062 : CacheEntryRoleOptions::Decision::kDisabled}});
3063 block_based_options.cache_usage_options.options_overrides.insert(
3064 {CacheEntryRole::kFilterConstruction,
3065 {/*.charged = */ FLAGS_charge_filter_construction
3066 ? CacheEntryRoleOptions::Decision::kEnabled
3067 : CacheEntryRoleOptions::Decision::kDisabled}});
3068 block_based_options.cache_usage_options.options_overrides.insert(
3069 {CacheEntryRole::kBlockBasedTableReader,
3070 {/*.charged = */ FLAGS_charge_table_reader
3071 ? CacheEntryRoleOptions::Decision::kEnabled
3072 : CacheEntryRoleOptions::Decision::kDisabled}});
3073 block_based_options.cache_usage_options.options_overrides.insert(
3074 {CacheEntryRole::kFileMetadata,
3075 {/*.charged = */ FLAGS_charge_file_metadata
3076 ? CacheEntryRoleOptions::Decision::kEnabled
3077 : CacheEntryRoleOptions::Decision::kDisabled}});
3078 block_based_options.cache_usage_options.options_overrides.insert(
3079 {CacheEntryRole::kBlobCache,
3080 {/*.charged = */ FLAGS_charge_blob_cache
3081 ? CacheEntryRoleOptions::Decision::kEnabled
3082 : CacheEntryRoleOptions::Decision::kDisabled}});
3083 block_based_options.format_version =
3084 static_cast<uint32_t>(FLAGS_format_version);
3085 block_based_options.index_block_restart_interval =
3086 static_cast<int32_t>(FLAGS_index_block_restart_interval);
3087 block_based_options.filter_policy = filter_policy;
3088 block_based_options.partition_filters = FLAGS_partition_filters;
3089 block_based_options.optimize_filters_for_memory =
3090 FLAGS_optimize_filters_for_memory;
3091 block_based_options.detect_filter_construct_corruption =
3092 FLAGS_detect_filter_construct_corruption;
3093 block_based_options.index_type =
3094 static_cast<BlockBasedTableOptions::IndexType>(FLAGS_index_type);
3095 block_based_options.data_block_index_type =
3096 static_cast<BlockBasedTableOptions::DataBlockIndexType>(
3097 FLAGS_data_block_index_type);
3098 block_based_options.prepopulate_block_cache =
3099 static_cast<BlockBasedTableOptions::PrepopulateBlockCache>(
3100 FLAGS_prepopulate_block_cache);
3101 block_based_options.initial_auto_readahead_size =
3102 FLAGS_initial_auto_readahead_size;
3103 block_based_options.max_auto_readahead_size = FLAGS_max_auto_readahead_size;
3104 block_based_options.num_file_reads_for_auto_readahead =
3105 FLAGS_num_file_reads_for_auto_readahead;
3106 options.table_factory.reset(NewBlockBasedTableFactory(block_based_options));
3107 options.db_write_buffer_size = FLAGS_db_write_buffer_size;
3108 options.write_buffer_size = FLAGS_write_buffer_size;
3109 options.max_write_buffer_number = FLAGS_max_write_buffer_number;
3110 options.min_write_buffer_number_to_merge =
3111 FLAGS_min_write_buffer_number_to_merge;
3112 options.max_write_buffer_number_to_maintain =
3113 FLAGS_max_write_buffer_number_to_maintain;
3114 options.max_write_buffer_size_to_maintain =
3115 FLAGS_max_write_buffer_size_to_maintain;
3116 options.memtable_prefix_bloom_size_ratio =
3117 FLAGS_memtable_prefix_bloom_size_ratio;
3118 options.memtable_whole_key_filtering = FLAGS_memtable_whole_key_filtering;
3119 options.disable_auto_compactions = FLAGS_disable_auto_compactions;
3120 options.max_background_compactions = FLAGS_max_background_compactions;
3121 options.max_background_flushes = FLAGS_max_background_flushes;
3122 options.compaction_style =
3123 static_cast<ROCKSDB_NAMESPACE::CompactionStyle>(FLAGS_compaction_style);
3124 options.compaction_pri =
3125 static_cast<ROCKSDB_NAMESPACE::CompactionPri>(FLAGS_compaction_pri);
3126 options.num_levels = FLAGS_num_levels;
3127 if (FLAGS_prefix_size >= 0) {
3128 options.prefix_extractor.reset(NewFixedPrefixTransform(FLAGS_prefix_size));
3129 }
3130 options.max_open_files = FLAGS_open_files;
3131 options.statistics = dbstats;
3132 options.env = db_stress_env;
3133 options.use_fsync = FLAGS_use_fsync;
3134 options.compaction_readahead_size = FLAGS_compaction_readahead_size;
3135 options.allow_mmap_reads = FLAGS_mmap_read;
3136 options.allow_mmap_writes = FLAGS_mmap_write;
3137 options.use_direct_reads = FLAGS_use_direct_reads;
3138 options.use_direct_io_for_flush_and_compaction =
3139 FLAGS_use_direct_io_for_flush_and_compaction;
3140 options.recycle_log_file_num =
3141 static_cast<size_t>(FLAGS_recycle_log_file_num);
3142 options.target_file_size_base = FLAGS_target_file_size_base;
3143 options.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
3144 options.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
3145 options.max_bytes_for_level_multiplier = FLAGS_max_bytes_for_level_multiplier;
3146 options.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger;
3147 options.level0_slowdown_writes_trigger = FLAGS_level0_slowdown_writes_trigger;
3148 options.level0_file_num_compaction_trigger =
3149 FLAGS_level0_file_num_compaction_trigger;
3150 options.compression = compression_type_e;
3151 options.bottommost_compression = bottommost_compression_type_e;
3152 options.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes;
3153 options.compression_opts.zstd_max_train_bytes =
3154 FLAGS_compression_zstd_max_train_bytes;
3155 options.compression_opts.parallel_threads =
3156 FLAGS_compression_parallel_threads;
3157 options.compression_opts.max_dict_buffer_bytes =
3158 FLAGS_compression_max_dict_buffer_bytes;
3159 if (ZSTD_FinalizeDictionarySupported()) {
3160 options.compression_opts.use_zstd_dict_trainer =
3161 FLAGS_compression_use_zstd_dict_trainer;
3162 } else if (!FLAGS_compression_use_zstd_dict_trainer) {
3163 fprintf(
3164 stderr,
3165 "WARNING: use_zstd_dict_trainer is false but zstd finalizeDictionary "
3166 "cannot be used because ZSTD 1.4.5+ is not linked with the binary."
3167 " zstd dictionary trainer will be used.\n");
3168 }
3169 options.max_manifest_file_size = FLAGS_max_manifest_file_size;
3170 options.inplace_update_support = FLAGS_in_place_update;
3171 options.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
3172 options.allow_concurrent_memtable_write =
3173 FLAGS_allow_concurrent_memtable_write;
3174 options.experimental_mempurge_threshold =
3175 FLAGS_experimental_mempurge_threshold;
3176 options.periodic_compaction_seconds = FLAGS_periodic_compaction_seconds;
3177 options.stats_dump_period_sec =
3178 static_cast<unsigned int>(FLAGS_stats_dump_period_sec);
3179 options.ttl = FLAGS_compaction_ttl;
3180 options.enable_pipelined_write = FLAGS_enable_pipelined_write;
3181 options.enable_write_thread_adaptive_yield =
3182 FLAGS_enable_write_thread_adaptive_yield;
3183 options.compaction_options_universal.size_ratio = FLAGS_universal_size_ratio;
3184 options.compaction_options_universal.min_merge_width =
3185 FLAGS_universal_min_merge_width;
3186 options.compaction_options_universal.max_merge_width =
3187 FLAGS_universal_max_merge_width;
3188 options.compaction_options_universal.max_size_amplification_percent =
3189 FLAGS_universal_max_size_amplification_percent;
3190 options.atomic_flush = FLAGS_atomic_flush;
3191 options.manual_wal_flush = FLAGS_manual_wal_flush_one_in > 0 ? true : false;
3192 options.avoid_unnecessary_blocking_io = FLAGS_avoid_unnecessary_blocking_io;
3193 options.write_dbid_to_manifest = FLAGS_write_dbid_to_manifest;
3194 options.avoid_flush_during_recovery = FLAGS_avoid_flush_during_recovery;
3195 options.max_write_batch_group_size_bytes =
3196 FLAGS_max_write_batch_group_size_bytes;
3197 options.level_compaction_dynamic_level_bytes =
3198 FLAGS_level_compaction_dynamic_level_bytes;
3199 options.track_and_verify_wals_in_manifest = true;
3200 options.verify_sst_unique_id_in_manifest =
3201 FLAGS_verify_sst_unique_id_in_manifest;
3202 options.memtable_protection_bytes_per_key =
3203 FLAGS_memtable_protection_bytes_per_key;
3204
3205 // Integrated BlobDB
3206 options.enable_blob_files = FLAGS_enable_blob_files;
3207 options.min_blob_size = FLAGS_min_blob_size;
3208 options.blob_file_size = FLAGS_blob_file_size;
3209 options.blob_compression_type =
3210 StringToCompressionType(FLAGS_blob_compression_type.c_str());
3211 options.enable_blob_garbage_collection = FLAGS_enable_blob_garbage_collection;
3212 options.blob_garbage_collection_age_cutoff =
3213 FLAGS_blob_garbage_collection_age_cutoff;
3214 options.blob_garbage_collection_force_threshold =
3215 FLAGS_blob_garbage_collection_force_threshold;
3216 options.blob_compaction_readahead_size = FLAGS_blob_compaction_readahead_size;
3217 options.blob_file_starting_level = FLAGS_blob_file_starting_level;
3218
3219 if (FLAGS_use_blob_cache) {
3220 if (FLAGS_use_shared_block_and_blob_cache) {
3221 options.blob_cache = cache;
3222 } else {
3223 if (FLAGS_blob_cache_size > 0) {
3224 LRUCacheOptions co;
3225 co.capacity = FLAGS_blob_cache_size;
3226 co.num_shard_bits = FLAGS_blob_cache_numshardbits;
3227 options.blob_cache = NewLRUCache(co);
3228 } else {
3229 fprintf(stderr,
3230 "Unable to create a standalone blob cache if blob_cache_size "
3231 "<= 0.\n");
3232 exit(1);
3233 }
3234 }
3235 switch (FLAGS_prepopulate_blob_cache) {
3236 case 0:
3237 options.prepopulate_blob_cache = PrepopulateBlobCache::kDisable;
3238 break;
3239 case 1:
3240 options.prepopulate_blob_cache = PrepopulateBlobCache::kFlushOnly;
3241 break;
3242 default:
3243 fprintf(stderr, "Unknown prepopulate blob cache mode\n");
3244 exit(1);
3245 }
3246 }
3247
3248 options.wal_compression =
3249 StringToCompressionType(FLAGS_wal_compression.c_str());
3250
3251 if (FLAGS_enable_tiered_storage) {
3252 options.bottommost_temperature = Temperature::kCold;
3253 }
3254 options.preclude_last_level_data_seconds =
3255 FLAGS_preclude_last_level_data_seconds;
3256 options.preserve_internal_time_seconds = FLAGS_preserve_internal_time_seconds;
3257
3258 switch (FLAGS_rep_factory) {
3259 case kSkipList:
3260 // no need to do anything
3261 break;
3262 #ifndef ROCKSDB_LITE
3263 case kHashSkipList:
3264 options.memtable_factory.reset(NewHashSkipListRepFactory(10000));
3265 break;
3266 case kVectorRep:
3267 options.memtable_factory.reset(new VectorRepFactory());
3268 break;
3269 #else
3270 default:
3271 fprintf(stderr,
3272 "RocksdbLite only supports skip list mem table. Skip "
3273 "--rep_factory\n");
3274 #endif // ROCKSDB_LITE
3275 }
3276
3277 if (FLAGS_use_full_merge_v1) {
3278 options.merge_operator = MergeOperators::CreateDeprecatedPutOperator();
3279 } else {
3280 options.merge_operator = MergeOperators::CreatePutOperator();
3281 }
3282
3283 if (FLAGS_enable_compaction_filter) {
3284 options.compaction_filter_factory =
3285 std::make_shared<DbStressCompactionFilterFactory>();
3286 }
3287
3288 options.best_efforts_recovery = FLAGS_best_efforts_recovery;
3289 options.paranoid_file_checks = FLAGS_paranoid_file_checks;
3290 options.fail_if_options_file_error = FLAGS_fail_if_options_file_error;
3291
3292 if (FLAGS_user_timestamp_size > 0) {
3293 CheckAndSetOptionsForUserTimestamp(options);
3294 }
3295
3296 options.allow_data_in_errors = FLAGS_allow_data_in_errors;
3297 }
3298
3299 void InitializeOptionsGeneral(
3300 const std::shared_ptr<Cache>& cache,
3301 const std::shared_ptr<Cache>& block_cache_compressed,
3302 const std::shared_ptr<const FilterPolicy>& filter_policy,
3303 Options& options) {
3304 options.create_missing_column_families = true;
3305 options.create_if_missing = true;
3306
3307 if (!options.statistics) {
3308 options.statistics = dbstats;
3309 }
3310
3311 if (options.env == Options().env) {
3312 options.env = db_stress_env;
3313 }
3314
3315 assert(options.table_factory);
3316 auto table_options =
3317 options.table_factory->GetOptions<BlockBasedTableOptions>();
3318 if (table_options) {
3319 if (FLAGS_cache_size > 0) {
3320 table_options->block_cache = cache;
3321 }
3322 if (!table_options->block_cache_compressed &&
3323 FLAGS_compressed_cache_size > 0) {
3324 table_options->block_cache_compressed = block_cache_compressed;
3325 }
3326 if (!table_options->filter_policy) {
3327 table_options->filter_policy = filter_policy;
3328 }
3329 }
3330
3331 // TODO: row_cache, thread-pool IO priority, CPU priority.
3332
3333 if (!options.rate_limiter) {
3334 if (FLAGS_rate_limiter_bytes_per_sec > 0) {
3335 options.rate_limiter.reset(NewGenericRateLimiter(
3336 FLAGS_rate_limiter_bytes_per_sec, 1000 /* refill_period_us */,
3337 10 /* fairness */,
3338 FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly
3339 : RateLimiter::Mode::kWritesOnly));
3340 }
3341 }
3342
3343 if (!options.file_checksum_gen_factory) {
3344 options.file_checksum_gen_factory =
3345 GetFileChecksumImpl(FLAGS_file_checksum_impl);
3346 }
3347
3348 if (FLAGS_sst_file_manager_bytes_per_sec > 0 ||
3349 FLAGS_sst_file_manager_bytes_per_truncate > 0) {
3350 Status status;
3351 options.sst_file_manager.reset(NewSstFileManager(
3352 db_stress_env, options.info_log, "" /* trash_dir */,
3353 static_cast<int64_t>(FLAGS_sst_file_manager_bytes_per_sec),
3354 true /* delete_existing_trash */, &status,
3355 0.25 /* max_trash_db_ratio */,
3356 FLAGS_sst_file_manager_bytes_per_truncate));
3357 if (!status.ok()) {
3358 fprintf(stderr, "SstFileManager creation failed: %s\n",
3359 status.ToString().c_str());
3360 exit(1);
3361 }
3362 }
3363
3364 if (FLAGS_preserve_unverified_changes) {
3365 if (!options.avoid_flush_during_recovery) {
3366 fprintf(stderr,
3367 "WARNING: flipping `avoid_flush_during_recovery` to true for "
3368 "`preserve_unverified_changes` to keep all files\n");
3369 options.avoid_flush_during_recovery = true;
3370 }
3371 // Together with `avoid_flush_during_recovery == true`, this will prevent
3372 // live files from becoming obsolete and deleted between `DB::Open()` and
3373 // `DisableFileDeletions()` due to flush or compaction. We do not need to
3374 // warn the user since we will reenable compaction soon.
3375 options.disable_auto_compactions = true;
3376 }
3377
3378 options.table_properties_collector_factories.emplace_back(
3379 std::make_shared<DbStressTablePropertiesCollectorFactory>());
3380 }
3381
3382 } // namespace ROCKSDB_NAMESPACE
3383 #endif // GFLAGS