1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
13 #include "util/compression.h"
15 #include "db_stress_tool/db_stress_common.h"
16 #include "db_stress_tool/db_stress_compaction_filter.h"
17 #include "db_stress_tool/db_stress_driver.h"
18 #include "db_stress_tool/db_stress_table_properties_collector.h"
19 #include "rocksdb/convenience.h"
20 #include "rocksdb/filter_policy.h"
21 #include "rocksdb/secondary_cache.h"
22 #include "rocksdb/sst_file_manager.h"
23 #include "rocksdb/types.h"
24 #include "rocksdb/utilities/object_registry.h"
25 #include "rocksdb/utilities/write_batch_with_index.h"
26 #include "test_util/testutil.h"
27 #include "util/cast_util.h"
28 #include "utilities/backup/backup_engine_impl.h"
29 #include "utilities/fault_injection_fs.h"
30 #include "utilities/fault_injection_secondary_cache.h"
32 namespace ROCKSDB_NAMESPACE
{
36 std::shared_ptr
<const FilterPolicy
> CreateFilterPolicy() {
37 if (FLAGS_bloom_bits
< 0) {
38 return BlockBasedTableOptions().filter_policy
;
40 const FilterPolicy
* new_policy
;
41 if (FLAGS_ribbon_starting_level
>= 999) {
43 new_policy
= NewBloomFilterPolicy(FLAGS_bloom_bits
, false);
45 new_policy
= NewRibbonFilterPolicy(
46 FLAGS_bloom_bits
, /* bloom_before_level */ FLAGS_ribbon_starting_level
);
48 return std::shared_ptr
<const FilterPolicy
>(new_policy
);
53 StressTest::StressTest()
54 : cache_(NewCache(FLAGS_cache_size
, FLAGS_cache_numshardbits
)),
55 compressed_cache_(NewLRUCache(FLAGS_compressed_cache_size
,
56 FLAGS_compressed_cache_numshardbits
)),
57 filter_policy_(CreateFilterPolicy()),
63 clock_(db_stress_env
->GetSystemClock().get()),
64 new_column_family_name_(1),
65 num_times_reopened_(0),
66 db_preload_finished_(false),
68 is_db_stopped_(false) {
69 if (FLAGS_destroy_db_initially
) {
70 std::vector
<std::string
> files
;
71 db_stress_env
->GetChildren(FLAGS_db
, &files
);
72 for (unsigned int i
= 0; i
< files
.size(); i
++) {
73 if (Slice(files
[i
]).starts_with("heap-")) {
74 db_stress_env
->DeleteFile(FLAGS_db
+ "/" + files
[i
]);
79 options
.env
= db_stress_env
;
80 // Remove files without preserving manfiest files
82 const Status s
= !FLAGS_use_blob_db
83 ? DestroyDB(FLAGS_db
, options
)
84 : blob_db::DestroyBlobDB(FLAGS_db
, options
,
85 blob_db::BlobDBOptions());
87 const Status s
= DestroyDB(FLAGS_db
, options
);
88 #endif // !ROCKSDB_LITE
91 fprintf(stderr
, "Cannot destroy original db: %s\n", s
.ToString().c_str());
97 StressTest::~StressTest() {
98 for (auto cf
: column_families_
) {
101 column_families_
.clear();
104 for (auto* cf
: cmp_cfhs_
) {
111 std::shared_ptr
<Cache
> StressTest::NewCache(size_t capacity
,
112 int32_t num_shard_bits
) {
113 ConfigOptions config_options
;
118 if (FLAGS_cache_type
== "clock_cache") {
119 fprintf(stderr
, "Old clock cache implementation has been removed.\n");
121 } else if (FLAGS_cache_type
== "hyper_clock_cache") {
122 return HyperClockCacheOptions(static_cast<size_t>(capacity
),
123 FLAGS_block_size
/*estimated_entry_charge*/,
126 } else if (FLAGS_cache_type
== "lru_cache") {
127 LRUCacheOptions opts
;
128 opts
.capacity
= capacity
;
129 opts
.num_shard_bits
= num_shard_bits
;
131 std::shared_ptr
<SecondaryCache
> secondary_cache
;
132 if (!FLAGS_secondary_cache_uri
.empty()) {
133 Status s
= SecondaryCache::CreateFromString(
134 config_options
, FLAGS_secondary_cache_uri
, &secondary_cache
);
135 if (secondary_cache
== nullptr) {
137 "No secondary cache registered matching string: %s status=%s\n",
138 FLAGS_secondary_cache_uri
.c_str(), s
.ToString().c_str());
141 if (FLAGS_secondary_cache_fault_one_in
> 0) {
142 secondary_cache
= std::make_shared
<FaultInjectionSecondaryCache
>(
143 secondary_cache
, static_cast<uint32_t>(FLAGS_seed
),
144 FLAGS_secondary_cache_fault_one_in
);
146 opts
.secondary_cache
= secondary_cache
;
149 return NewLRUCache(opts
);
151 fprintf(stderr
, "Cache type not supported.");
156 std::vector
<std::string
> StressTest::GetBlobCompressionTags() {
157 std::vector
<std::string
> compression_tags
{"kNoCompression"};
159 if (Snappy_Supported()) {
160 compression_tags
.emplace_back("kSnappyCompression");
162 if (LZ4_Supported()) {
163 compression_tags
.emplace_back("kLZ4Compression");
165 if (ZSTD_Supported()) {
166 compression_tags
.emplace_back("kZSTD");
169 return compression_tags
;
172 bool StressTest::BuildOptionsTable() {
173 if (FLAGS_set_options_one_in
<= 0) {
177 std::unordered_map
<std::string
, std::vector
<std::string
>> options_tbl
= {
178 {"write_buffer_size",
179 {std::to_string(options_
.write_buffer_size
),
180 std::to_string(options_
.write_buffer_size
* 2),
181 std::to_string(options_
.write_buffer_size
* 4)}},
182 {"max_write_buffer_number",
183 {std::to_string(options_
.max_write_buffer_number
),
184 std::to_string(options_
.max_write_buffer_number
* 2),
185 std::to_string(options_
.max_write_buffer_number
* 4)}},
188 std::to_string(options_
.arena_block_size
),
189 std::to_string(options_
.write_buffer_size
/ 4),
190 std::to_string(options_
.write_buffer_size
/ 8),
192 {"memtable_huge_page_size", {"0", std::to_string(2 * 1024 * 1024)}},
193 {"max_successive_merges", {"0", "2", "4"}},
194 {"inplace_update_num_locks", {"100", "200", "300"}},
195 // TODO: re-enable once internal task T124324915 is fixed.
196 // {"experimental_mempurge_threshold", {"0.0", "1.0"}},
197 // TODO(ljin): enable test for this option
198 // {"disable_auto_compactions", {"100", "200", "300"}},
199 {"level0_file_num_compaction_trigger",
201 std::to_string(options_
.level0_file_num_compaction_trigger
),
202 std::to_string(options_
.level0_file_num_compaction_trigger
+ 2),
203 std::to_string(options_
.level0_file_num_compaction_trigger
+ 4),
205 {"level0_slowdown_writes_trigger",
207 std::to_string(options_
.level0_slowdown_writes_trigger
),
208 std::to_string(options_
.level0_slowdown_writes_trigger
+ 2),
209 std::to_string(options_
.level0_slowdown_writes_trigger
+ 4),
211 {"level0_stop_writes_trigger",
213 std::to_string(options_
.level0_stop_writes_trigger
),
214 std::to_string(options_
.level0_stop_writes_trigger
+ 2),
215 std::to_string(options_
.level0_stop_writes_trigger
+ 4),
217 {"max_compaction_bytes",
219 std::to_string(options_
.target_file_size_base
* 5),
220 std::to_string(options_
.target_file_size_base
* 15),
221 std::to_string(options_
.target_file_size_base
* 100),
223 {"target_file_size_base",
225 std::to_string(options_
.target_file_size_base
),
226 std::to_string(options_
.target_file_size_base
* 2),
227 std::to_string(options_
.target_file_size_base
* 4),
229 {"target_file_size_multiplier",
231 std::to_string(options_
.target_file_size_multiplier
),
235 {"max_bytes_for_level_base",
237 std::to_string(options_
.max_bytes_for_level_base
/ 2),
238 std::to_string(options_
.max_bytes_for_level_base
),
239 std::to_string(options_
.max_bytes_for_level_base
* 2),
241 {"max_bytes_for_level_multiplier",
243 std::to_string(options_
.max_bytes_for_level_multiplier
),
247 {"max_sequential_skip_in_iterations", {"4", "8", "12"}},
250 if (FLAGS_allow_setting_blob_options_dynamically
) {
251 options_tbl
.emplace("enable_blob_files",
252 std::vector
<std::string
>{"false", "true"});
253 options_tbl
.emplace("min_blob_size",
254 std::vector
<std::string
>{"0", "8", "16"});
255 options_tbl
.emplace("blob_file_size",
256 std::vector
<std::string
>{"1M", "16M", "256M", "1G"});
257 options_tbl
.emplace("blob_compression_type", GetBlobCompressionTags());
258 options_tbl
.emplace("enable_blob_garbage_collection",
259 std::vector
<std::string
>{"false", "true"});
261 "blob_garbage_collection_age_cutoff",
262 std::vector
<std::string
>{"0.0", "0.25", "0.5", "0.75", "1.0"});
263 options_tbl
.emplace("blob_garbage_collection_force_threshold",
264 std::vector
<std::string
>{"0.5", "0.75", "1.0"});
265 options_tbl
.emplace("blob_compaction_readahead_size",
266 std::vector
<std::string
>{"0", "1M", "4M"});
267 options_tbl
.emplace("blob_file_starting_level",
268 std::vector
<std::string
>{"0", "1", "2"});
269 options_tbl
.emplace("prepopulate_blob_cache",
270 std::vector
<std::string
>{"kDisable", "kFlushOnly"});
273 options_table_
= std::move(options_tbl
);
275 for (const auto& iter
: options_table_
) {
276 options_index_
.push_back(iter
.first
);
281 void StressTest::InitDb(SharedState
* shared
) {
282 uint64_t now
= clock_
->NowMicros();
283 fprintf(stdout
, "%s Initializing db_stress\n",
284 clock_
->TimeToString(now
/ 1000000).c_str());
290 void StressTest::FinishInitDb(SharedState
* shared
) {
291 if (FLAGS_read_only
) {
292 uint64_t now
= clock_
->NowMicros();
293 fprintf(stdout
, "%s Preloading db with %" PRIu64
" KVs\n",
294 clock_
->TimeToString(now
/ 1000000).c_str(), FLAGS_max_key
);
295 PreloadDbAndReopenAsReadOnly(FLAGS_max_key
, shared
);
298 if (shared
->HasHistory()) {
299 // The way it works right now is, if there's any history, that means the
300 // previous run mutating the DB had all its operations traced, in which case
301 // we should always be able to `Restore()` the expected values to match the
302 // `db_`'s current seqno.
303 Status s
= shared
->Restore(db_
);
305 fprintf(stderr
, "Error restoring historical expected values: %s\n",
306 s
.ToString().c_str());
312 // It's OK here without sync because unsynced data cannot be lost at this
314 // - even with sync_fault_injection=1 as the
315 // file is still directly writable until after FinishInitDb()
316 ProcessRecoveredPreparedTxns(shared
);
319 if (FLAGS_enable_compaction_filter
) {
320 auto* compaction_filter_factory
=
321 reinterpret_cast<DbStressCompactionFilterFactory
*>(
322 options_
.compaction_filter_factory
.get());
323 assert(compaction_filter_factory
);
324 // This must be called only after any potential `SharedState::Restore()` has
325 // completed in order for the `compaction_filter_factory` to operate on the
326 // correct latest values file.
327 compaction_filter_factory
->SetSharedState(shared
);
328 fprintf(stdout
, "Compaction filter factory: %s\n",
329 compaction_filter_factory
->Name());
333 void StressTest::TrackExpectedState(SharedState
* shared
) {
334 // For `FLAGS_manual_wal_flush_one_inWAL`
335 // data can be lost when `manual_wal_flush_one_in > 0` and `FlushWAL()` is not
336 // explictly called by users of RocksDB (in our case, db stress).
337 // Therefore recovery from such potential WAL data loss is a prefix recovery
338 // that requires tracing
339 if ((FLAGS_sync_fault_injection
|| FLAGS_disable_wal
||
340 FLAGS_manual_wal_flush_one_in
> 0) &&
342 Status s
= shared
->SaveAtAndAfter(db_
);
344 fprintf(stderr
, "Error enabling history tracing: %s\n",
345 s
.ToString().c_str());
351 Status
StressTest::AssertSame(DB
* db
, ColumnFamilyHandle
* cf
,
352 ThreadState::SnapshotState
& snap_state
) {
354 if (cf
->GetName() != snap_state
.cf_at_name
) {
357 // This `ReadOptions` is for validation purposes. Ignore
358 // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
360 ropt
.snapshot
= snap_state
.snapshot
;
362 if (!snap_state
.timestamp
.empty()) {
363 ts
= snap_state
.timestamp
;
364 ropt
.timestamp
= &ts
;
366 PinnableSlice
exp_v(&snap_state
.value
);
369 s
= db
->Get(ropt
, cf
, snap_state
.key
, &v
);
370 if (!s
.ok() && !s
.IsNotFound()) {
373 if (snap_state
.status
!= s
) {
374 return Status::Corruption(
375 "The snapshot gave inconsistent results for key " +
376 std::to_string(Hash(snap_state
.key
.c_str(), snap_state
.key
.size(), 0)) +
377 " in cf " + cf
->GetName() + ": (" + snap_state
.status
.ToString() +
378 ") vs. (" + s
.ToString() + ")");
382 return Status::Corruption("The snapshot gave inconsistent values: (" +
383 exp_v
.ToString() + ") vs. (" + v
.ToString() +
387 if (snap_state
.key_vec
!= nullptr) {
388 // When `prefix_extractor` is set, seeking to beginning and scanning
389 // across prefixes are only supported with `total_order_seek` set.
390 ropt
.total_order_seek
= true;
391 std::unique_ptr
<Iterator
> iterator(db
->NewIterator(ropt
));
392 std::unique_ptr
<std::vector
<bool>> tmp_bitvec(
393 new std::vector
<bool>(FLAGS_max_key
));
394 for (iterator
->SeekToFirst(); iterator
->Valid(); iterator
->Next()) {
396 if (GetIntVal(iterator
->key().ToString(), &key_val
)) {
397 (*tmp_bitvec
.get())[key_val
] = true;
400 if (!std::equal(snap_state
.key_vec
->begin(), snap_state
.key_vec
->end(),
401 tmp_bitvec
.get()->begin())) {
402 return Status::Corruption("Found inconsistent keys at this snapshot");
408 void StressTest::VerificationAbort(SharedState
* shared
, std::string msg
,
410 fprintf(stderr
, "Verification failed: %s. Status is %s\n", msg
.c_str(),
411 s
.ToString().c_str());
412 shared
->SetVerificationFailure();
415 void StressTest::VerificationAbort(SharedState
* shared
, std::string msg
, int cf
,
417 auto key_str
= Key(key
);
418 Slice key_slice
= key_str
;
420 "Verification failed for column family %d key %s (%" PRIi64
"): %s\n",
421 cf
, key_slice
.ToString(true).c_str(), key
, msg
.c_str());
422 shared
->SetVerificationFailure();
425 void StressTest::VerificationAbort(SharedState
* shared
, std::string msg
, int cf
,
426 int64_t key
, Slice value_from_db
,
427 Slice value_from_expected
) const {
428 auto key_str
= Key(key
);
430 "Verification failed for column family %d key %s (%" PRIi64
431 "): value_from_db: %s, value_from_expected: %s, msg: %s\n",
432 cf
, Slice(key_str
).ToString(true).c_str(), key
,
433 value_from_db
.ToString(true).c_str(),
434 value_from_expected
.ToString(true).c_str(), msg
.c_str());
435 shared
->SetVerificationFailure();
438 void StressTest::VerificationAbort(SharedState
* shared
, int cf
, int64_t key
,
440 const WideColumns
& columns
,
441 const WideColumns
& expected_columns
) const {
444 auto key_str
= Key(key
);
447 "Verification failed for column family %d key %s (%" PRIi64
448 "): Value and columns inconsistent: %s\n",
449 cf
, Slice(key_str
).ToString(/* hex */ true).c_str(), key
,
450 DebugString(value
, columns
, expected_columns
).c_str());
452 shared
->SetVerificationFailure();
455 std::string
StressTest::DebugString(const Slice
& value
,
456 const WideColumns
& columns
,
457 const WideColumns
& expected_columns
) {
458 std::ostringstream oss
;
460 oss
<< "value: " << value
.ToString(/* hex */ true);
462 auto dump
= [](const WideColumns
& cols
, std::ostream
& os
) {
469 auto it
= cols
.begin();
471 for (++it
; it
!= cols
.end(); ++it
) {
476 oss
<< ", columns: ";
479 oss
<< ", expected_columns: ";
480 dump(expected_columns
, oss
);
485 void StressTest::PrintStatistics() {
487 fprintf(stdout
, "STATISTICS:\n%s\n", dbstats
->ToString().c_str());
489 if (dbstats_secondaries
) {
490 fprintf(stdout
, "Secondary instances STATISTICS:\n%s\n",
491 dbstats_secondaries
->ToString().c_str());
495 // Currently PreloadDb has to be single-threaded.
496 void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys
,
497 SharedState
* shared
) {
498 WriteOptions write_opts
;
499 write_opts
.disableWAL
= FLAGS_disable_wal
;
501 write_opts
.sync
= true;
503 if (FLAGS_rate_limit_auto_wal_flush
) {
504 write_opts
.rate_limiter_priority
= Env::IO_USER
;
509 for (auto cfh
: column_families_
) {
510 for (int64_t k
= 0; k
!= number_of_keys
; ++k
) {
511 const std::string key
= Key(k
);
513 constexpr uint32_t value_base
= 0;
514 const size_t sz
= GenerateValue(value_base
, value
, sizeof(value
));
516 const Slice
v(value
, sz
);
518 shared
->Put(cf_idx
, k
, value_base
, true /* pending */);
521 if (FLAGS_user_timestamp_size
> 0) {
525 if (FLAGS_use_merge
) {
526 if (!FLAGS_use_txn
) {
527 if (FLAGS_user_timestamp_size
> 0) {
528 s
= db_
->Merge(write_opts
, cfh
, key
, ts
, v
);
530 s
= db_
->Merge(write_opts
, cfh
, key
, v
);
535 s
= NewTxn(write_opts
, &txn
);
537 s
= txn
->Merge(cfh
, key
, v
);
544 } else if (FLAGS_use_put_entity_one_in
> 0) {
545 s
= db_
->PutEntity(write_opts
, cfh
, key
,
546 GenerateWideColumns(value_base
, v
));
548 if (!FLAGS_use_txn
) {
549 if (FLAGS_user_timestamp_size
> 0) {
550 s
= db_
->Put(write_opts
, cfh
, key
, ts
, v
);
552 s
= db_
->Put(write_opts
, cfh
, key
, v
);
557 s
= NewTxn(write_opts
, &txn
);
559 s
= txn
->Put(cfh
, key
, v
);
568 shared
->Put(cf_idx
, k
, value_base
, false /* pending */);
579 s
= db_
->Flush(FlushOptions(), column_families_
);
582 for (auto cf
: column_families_
) {
585 column_families_
.clear();
592 db_preload_finished_
.store(true);
593 auto now
= clock_
->NowMicros();
594 fprintf(stdout
, "%s Reopening database in read-only\n",
595 clock_
->TimeToString(now
/ 1000000).c_str());
596 // Reopen as read-only, can ignore all options related to updates
599 fprintf(stderr
, "Failed to preload db");
604 Status
StressTest::SetOptions(ThreadState
* thread
) {
605 assert(FLAGS_set_options_one_in
> 0);
606 std::unordered_map
<std::string
, std::string
> opts
;
608 options_index_
[thread
->rand
.Next() % options_index_
.size()];
609 int value_idx
= thread
->rand
.Next() % options_table_
[name
].size();
610 if (name
== "level0_file_num_compaction_trigger" ||
611 name
== "level0_slowdown_writes_trigger" ||
612 name
== "level0_stop_writes_trigger") {
613 opts
["level0_file_num_compaction_trigger"] =
614 options_table_
["level0_file_num_compaction_trigger"][value_idx
];
615 opts
["level0_slowdown_writes_trigger"] =
616 options_table_
["level0_slowdown_writes_trigger"][value_idx
];
617 opts
["level0_stop_writes_trigger"] =
618 options_table_
["level0_stop_writes_trigger"][value_idx
];
620 opts
[name
] = options_table_
[name
][value_idx
];
623 int rand_cf_idx
= thread
->rand
.Next() % FLAGS_column_families
;
624 auto cfh
= column_families_
[rand_cf_idx
];
625 return db_
->SetOptions(cfh
, opts
);
629 void StressTest::ProcessRecoveredPreparedTxns(SharedState
* shared
) {
631 std::vector
<Transaction
*> recovered_prepared_trans
;
632 txn_db_
->GetAllPreparedTransactions(&recovered_prepared_trans
);
633 for (Transaction
* txn
: recovered_prepared_trans
) {
634 ProcessRecoveredPreparedTxnsHelper(txn
, shared
);
637 recovered_prepared_trans
.clear();
638 txn_db_
->GetAllPreparedTransactions(&recovered_prepared_trans
);
639 assert(recovered_prepared_trans
.size() == 0);
642 void StressTest::ProcessRecoveredPreparedTxnsHelper(Transaction
* txn
,
643 SharedState
* shared
) {
644 thread_local Random
rand(static_cast<uint32_t>(FLAGS_seed
));
645 for (size_t i
= 0; i
< column_families_
.size(); ++i
) {
646 std::unique_ptr
<WBWIIterator
> wbwi_iter(
647 txn
->GetWriteBatch()->NewIterator(column_families_
[i
]));
648 for (wbwi_iter
->SeekToFirst(); wbwi_iter
->Valid(); wbwi_iter
->Next()) {
650 if (GetIntVal(wbwi_iter
->Entry().key
.ToString(), &key_val
)) {
651 shared
->Put(static_cast<int>(i
) /* cf_idx */, key_val
,
652 0 /* value_base */, true /* pending */);
657 Status s
= txn
->Commit();
660 Status s
= txn
->Rollback();
665 Status
StressTest::NewTxn(WriteOptions
& write_opts
, Transaction
** txn
) {
666 if (!FLAGS_use_txn
) {
667 return Status::InvalidArgument("NewTxn when FLAGS_use_txn is not set");
669 write_opts
.disableWAL
= FLAGS_disable_wal
;
670 static std::atomic
<uint64_t> txn_id
= {0};
671 TransactionOptions txn_options
;
672 txn_options
.use_only_the_last_commit_time_batch_for_recovery
=
673 FLAGS_use_only_the_last_commit_time_batch_for_recovery
;
674 txn_options
.lock_timeout
= 600000; // 10 min
675 txn_options
.deadlock_detect
= true;
676 *txn
= txn_db_
->BeginTransaction(write_opts
, txn_options
);
677 auto istr
= std::to_string(txn_id
.fetch_add(1));
678 Status s
= (*txn
)->SetName("xid" + istr
);
682 Status
StressTest::CommitTxn(Transaction
* txn
, ThreadState
* thread
) {
683 if (!FLAGS_use_txn
) {
684 return Status::InvalidArgument("CommitTxn when FLAGS_use_txn is not set");
687 Status s
= txn
->Prepare();
688 std::shared_ptr
<const Snapshot
> timestamped_snapshot
;
690 if (thread
&& FLAGS_create_timestamped_snapshot_one_in
&&
691 thread
->rand
.OneIn(FLAGS_create_timestamped_snapshot_one_in
)) {
692 uint64_t ts
= db_stress_env
->NowNanos();
693 s
= txn
->CommitAndTryCreateSnapshot(/*notifier=*/nullptr, ts
,
694 ×tamped_snapshot
);
696 std::pair
<Status
, std::shared_ptr
<const Snapshot
>> res
;
697 if (thread
->tid
== 0) {
698 uint64_t now
= db_stress_env
->NowNanos();
699 res
= txn_db_
->CreateTimestampedSnapshot(now
);
700 if (res
.first
.ok()) {
702 assert(res
.second
->GetTimestamp() == now
);
703 if (timestamped_snapshot
) {
704 assert(res
.second
->GetTimestamp() >
705 timestamped_snapshot
->GetTimestamp());
715 if (thread
&& FLAGS_create_timestamped_snapshot_one_in
> 0 &&
716 thread
->rand
.OneInOpt(50000)) {
717 uint64_t now
= db_stress_env
->NowNanos();
718 constexpr uint64_t time_diff
= static_cast<uint64_t>(1000) * 1000 * 1000;
719 txn_db_
->ReleaseTimestampedSnapshotsOlderThan(now
- time_diff
);
725 Status
StressTest::RollbackTxn(Transaction
* txn
) {
726 if (!FLAGS_use_txn
) {
727 return Status::InvalidArgument(
728 "RollbackTxn when FLAGS_use_txn is not"
731 Status s
= txn
->Rollback();
737 void StressTest::OperateDb(ThreadState
* thread
) {
738 ReadOptions
read_opts(FLAGS_verify_checksum
, true);
739 read_opts
.rate_limiter_priority
=
740 FLAGS_rate_limit_user_ops
? Env::IO_USER
: Env::IO_TOTAL
;
741 read_opts
.async_io
= FLAGS_async_io
;
742 read_opts
.adaptive_readahead
= FLAGS_adaptive_readahead
;
743 read_opts
.readahead_size
= FLAGS_readahead_size
;
744 WriteOptions write_opts
;
745 if (FLAGS_rate_limit_auto_wal_flush
) {
746 write_opts
.rate_limiter_priority
= Env::IO_USER
;
748 auto shared
= thread
->shared
;
752 write_opts
.sync
= true;
754 write_opts
.disableWAL
= FLAGS_disable_wal
;
755 write_opts
.protection_bytes_per_key
= FLAGS_batch_protection_bytes_per_key
;
756 const int prefix_bound
= static_cast<int>(FLAGS_readpercent
) +
757 static_cast<int>(FLAGS_prefixpercent
);
758 const int write_bound
= prefix_bound
+ static_cast<int>(FLAGS_writepercent
);
759 const int del_bound
= write_bound
+ static_cast<int>(FLAGS_delpercent
);
760 const int delrange_bound
=
761 del_bound
+ static_cast<int>(FLAGS_delrangepercent
);
762 const int iterate_bound
=
763 delrange_bound
+ static_cast<int>(FLAGS_iterpercent
);
765 const uint64_t ops_per_open
= FLAGS_ops_per_thread
/ (FLAGS_reopen
+ 1);
768 if (FLAGS_read_fault_one_in
) {
769 fault_fs_guard
->SetThreadLocalReadErrorContext(thread
->shared
->GetSeed(),
770 FLAGS_read_fault_one_in
);
773 if (FLAGS_write_fault_one_in
) {
775 if (FLAGS_injest_error_severity
<= 1 || FLAGS_injest_error_severity
> 2) {
776 error_msg
= IOStatus::IOError("Retryable IO Error");
777 error_msg
.SetRetryable(true);
778 } else if (FLAGS_injest_error_severity
== 2) {
779 // Ingest the fatal error
780 error_msg
= IOStatus::IOError("Fatal IO Error");
781 error_msg
.SetDataLoss(true);
783 std::vector
<FileType
> types
= {FileType::kTableFile
,
784 FileType::kDescriptorFile
,
785 FileType::kCurrentFile
};
786 fault_fs_guard
->SetRandomWriteError(
787 thread
->shared
->GetSeed(), FLAGS_write_fault_one_in
, error_msg
,
788 /*inject_for_all_file_types=*/false, types
);
790 thread
->stats
.Start();
791 for (int open_cnt
= 0; open_cnt
<= FLAGS_reopen
; ++open_cnt
) {
792 if (thread
->shared
->HasVerificationFailedYet() ||
793 thread
->shared
->ShouldStopTest()) {
797 thread
->stats
.FinishedSingleOp();
798 MutexLock
l(thread
->shared
->GetMutex());
799 while (!thread
->snapshot_queue
.empty()) {
800 db_
->ReleaseSnapshot(thread
->snapshot_queue
.front().second
.snapshot
);
801 delete thread
->snapshot_queue
.front().second
.key_vec
;
802 thread
->snapshot_queue
.pop();
804 thread
->shared
->IncVotedReopen();
805 if (thread
->shared
->AllVotedReopen()) {
806 thread
->shared
->GetStressTest()->Reopen(thread
);
807 thread
->shared
->GetCondVar()->SignalAll();
809 thread
->shared
->GetCondVar()->Wait();
811 // Commenting this out as we don't want to reset stats on each open.
812 // thread->stats.Start();
815 for (uint64_t i
= 0; i
< ops_per_open
; i
++) {
816 if (thread
->shared
->HasVerificationFailedYet()) {
821 if (thread
->rand
.OneInOpt(FLAGS_set_options_one_in
)) {
825 if (thread
->rand
.OneInOpt(FLAGS_set_in_place_one_in
)) {
826 options_
.inplace_update_support
^= options_
.inplace_update_support
;
829 if (thread
->tid
== 0 && FLAGS_verify_db_one_in
> 0 &&
830 thread
->rand
.OneIn(FLAGS_verify_db_one_in
)) {
831 ContinuouslyVerifyDb(thread
);
832 if (thread
->shared
->ShouldStopTest()) {
837 MaybeClearOneColumnFamily(thread
);
839 if (thread
->rand
.OneInOpt(FLAGS_manual_wal_flush_one_in
)) {
840 bool sync
= thread
->rand
.OneIn(2) ? true : false;
841 Status s
= db_
->FlushWAL(sync
);
842 if (!s
.ok() && !(sync
&& s
.IsNotSupported())) {
843 fprintf(stderr
, "FlushWAL(sync=%s) failed: %s\n",
844 (sync
? "true" : "false"), s
.ToString().c_str());
848 if (thread
->rand
.OneInOpt(FLAGS_sync_wal_one_in
)) {
849 Status s
= db_
->SyncWAL();
850 if (!s
.ok() && !s
.IsNotSupported()) {
851 fprintf(stderr
, "SyncWAL() failed: %s\n", s
.ToString().c_str());
855 int rand_column_family
= thread
->rand
.Next() % FLAGS_column_families
;
856 ColumnFamilyHandle
* column_family
= column_families_
[rand_column_family
];
858 if (thread
->rand
.OneInOpt(FLAGS_compact_files_one_in
)) {
859 TestCompactFiles(thread
, column_family
);
862 int64_t rand_key
= GenerateOneKey(thread
, i
);
863 std::string keystr
= Key(rand_key
);
866 if (thread
->rand
.OneInOpt(FLAGS_compact_range_one_in
)) {
867 TestCompactRange(thread
, rand_key
, key
, column_family
);
868 if (thread
->shared
->HasVerificationFailedYet()) {
873 std::vector
<int> rand_column_families
=
874 GenerateColumnFamilies(FLAGS_column_families
, rand_column_family
);
876 if (thread
->rand
.OneInOpt(FLAGS_flush_one_in
)) {
877 Status status
= TestFlush(rand_column_families
);
879 fprintf(stdout
, "Unable to perform Flush(): %s\n",
880 status
.ToString().c_str());
885 // Verify GetLiveFiles with a 1 in N chance.
886 if (thread
->rand
.OneInOpt(FLAGS_get_live_files_one_in
) &&
887 !FLAGS_write_fault_one_in
) {
888 Status status
= VerifyGetLiveFiles();
890 VerificationAbort(shared
, "VerifyGetLiveFiles status not OK", status
);
894 // Verify GetSortedWalFiles with a 1 in N chance.
895 if (thread
->rand
.OneInOpt(FLAGS_get_sorted_wal_files_one_in
)) {
896 Status status
= VerifyGetSortedWalFiles();
898 VerificationAbort(shared
, "VerifyGetSortedWalFiles status not OK",
903 // Verify GetCurrentWalFile with a 1 in N chance.
904 if (thread
->rand
.OneInOpt(FLAGS_get_current_wal_file_one_in
)) {
905 Status status
= VerifyGetCurrentWalFile();
907 VerificationAbort(shared
, "VerifyGetCurrentWalFile status not OK",
911 #endif // !ROCKSDB_LITE
913 if (thread
->rand
.OneInOpt(FLAGS_pause_background_one_in
)) {
914 Status status
= TestPauseBackground(thread
);
917 shared
, "Pause/ContinueBackgroundWork status not OK", status
);
922 if (thread
->rand
.OneInOpt(FLAGS_verify_checksum_one_in
)) {
923 Status status
= db_
->VerifyChecksum();
925 VerificationAbort(shared
, "VerifyChecksum status not OK", status
);
929 if (thread
->rand
.OneInOpt(FLAGS_get_property_one_in
)) {
930 TestGetProperty(thread
);
934 std::vector
<int64_t> rand_keys
= GenerateKeys(rand_key
);
936 if (thread
->rand
.OneInOpt(FLAGS_ingest_external_file_one_in
)) {
937 TestIngestExternalFile(thread
, rand_column_families
, rand_keys
);
940 if (thread
->rand
.OneInOpt(FLAGS_backup_one_in
)) {
941 // Beyond a certain DB size threshold, this test becomes heavier than
943 uint64_t total_size
= 0;
944 if (FLAGS_backup_max_size
> 0) {
945 std::vector
<FileAttributes
> files
;
946 db_stress_env
->GetChildrenFileAttributes(FLAGS_db
, &files
);
947 for (auto& file
: files
) {
948 total_size
+= file
.size_bytes
;
952 if (total_size
<= FLAGS_backup_max_size
) {
953 Status s
= TestBackupRestore(thread
, rand_column_families
, rand_keys
);
955 VerificationAbort(shared
, "Backup/restore gave inconsistent state",
961 if (thread
->rand
.OneInOpt(FLAGS_checkpoint_one_in
)) {
962 Status s
= TestCheckpoint(thread
, rand_column_families
, rand_keys
);
964 VerificationAbort(shared
, "Checkpoint gave inconsistent state", s
);
969 if (thread
->rand
.OneInOpt(FLAGS_approximate_size_one_in
)) {
971 TestApproximateSize(thread
, i
, rand_column_families
, rand_keys
);
973 VerificationAbort(shared
, "ApproximateSize Failed", s
);
976 #endif // !ROCKSDB_LITE
977 if (thread
->rand
.OneInOpt(FLAGS_acquire_snapshot_one_in
)) {
978 TestAcquireSnapshot(thread
, rand_column_family
, keystr
, i
);
982 Status s
= MaybeReleaseSnapshots(thread
, i
);
984 VerificationAbort(shared
, "Snapshot gave inconsistent state", s
);
988 // Assign timestamps if necessary.
989 std::string read_ts_str
;
991 if (FLAGS_user_timestamp_size
> 0) {
992 read_ts_str
= GetNowNanos();
993 read_ts
= read_ts_str
;
994 read_opts
.timestamp
= &read_ts
;
997 int prob_op
= thread
->rand
.Uniform(100);
998 // Reset this in case we pick something other than a read op. We don't
999 // want to use a stale value when deciding at the beginning of the loop
1000 // whether to vote to reopen
1001 if (prob_op
>= 0 && prob_op
< static_cast<int>(FLAGS_readpercent
)) {
1002 assert(0 <= prob_op
);
1004 if (FLAGS_use_multiget
) {
1005 // Leave room for one more iteration of the loop with a single key
1006 // batch. This is to ensure that each thread does exactly the same
1008 int multiget_batch_size
= static_cast<int>(
1009 std::min(static_cast<uint64_t>(thread
->rand
.Uniform(64)),
1010 FLAGS_ops_per_thread
- i
- 1));
1011 // If its the last iteration, ensure that multiget_batch_size is 1
1012 multiget_batch_size
= std::max(multiget_batch_size
, 1);
1013 rand_keys
= GenerateNKeys(thread
, multiget_batch_size
, i
);
1014 TestMultiGet(thread
, read_opts
, rand_column_families
, rand_keys
);
1015 i
+= multiget_batch_size
- 1;
1017 TestGet(thread
, read_opts
, rand_column_families
, rand_keys
);
1019 } else if (prob_op
< prefix_bound
) {
1020 assert(static_cast<int>(FLAGS_readpercent
) <= prob_op
);
1021 // OPERATION prefix scan
1022 // keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are
1023 // (8 - FLAGS_prefix_size) bytes besides the prefix. So there will
1024 // be 2 ^ ((8 - FLAGS_prefix_size) * 8) possible keys with the same
1026 TestPrefixScan(thread
, read_opts
, rand_column_families
, rand_keys
);
1027 } else if (prob_op
< write_bound
) {
1028 assert(prefix_bound
<= prob_op
);
1030 TestPut(thread
, write_opts
, read_opts
, rand_column_families
, rand_keys
,
1032 } else if (prob_op
< del_bound
) {
1033 assert(write_bound
<= prob_op
);
1035 TestDelete(thread
, write_opts
, rand_column_families
, rand_keys
);
1036 } else if (prob_op
< delrange_bound
) {
1037 assert(del_bound
<= prob_op
);
1038 // OPERATION delete range
1039 TestDeleteRange(thread
, write_opts
, rand_column_families
, rand_keys
);
1040 } else if (prob_op
< iterate_bound
) {
1041 assert(delrange_bound
<= prob_op
);
1042 // OPERATION iterate
1043 if (!FLAGS_skip_verifydb
&&
1044 thread
->rand
.OneInOpt(
1045 FLAGS_verify_iterator_with_expected_state_one_in
)) {
1046 TestIterateAgainstExpected(thread
, read_opts
, rand_column_families
,
1049 int num_seeks
= static_cast<int>(
1050 std::min(static_cast<uint64_t>(thread
->rand
.Uniform(4)),
1051 FLAGS_ops_per_thread
- i
- 1));
1052 rand_keys
= GenerateNKeys(thread
, num_seeks
, i
);
1054 TestIterate(thread
, read_opts
, rand_column_families
, rand_keys
);
1057 assert(iterate_bound
<= prob_op
);
1058 TestCustomOperations(thread
, rand_column_families
);
1060 thread
->stats
.FinishedSingleOp();
1063 while (!thread
->snapshot_queue
.empty()) {
1064 db_
->ReleaseSnapshot(thread
->snapshot_queue
.front().second
.snapshot
);
1065 delete thread
->snapshot_queue
.front().second
.key_vec
;
1066 thread
->snapshot_queue
.pop();
1069 thread
->stats
.Stop();
1072 #ifndef ROCKSDB_LITE
1073 // Generated a list of keys that close to boundaries of SST keys.
1074 // If there isn't any SST file in the DB, return empty list.
1075 std::vector
<std::string
> StressTest::GetWhiteBoxKeys(ThreadState
* thread
,
1077 ColumnFamilyHandle
* cfh
,
1079 ColumnFamilyMetaData cfmd
;
1080 db
->GetColumnFamilyMetaData(cfh
, &cfmd
);
1081 std::vector
<std::string
> boundaries
;
1082 for (const LevelMetaData
& lmd
: cfmd
.levels
) {
1083 for (const SstFileMetaData
& sfmd
: lmd
.files
) {
1084 // If FLAGS_user_timestamp_size > 0, then both smallestkey and largestkey
1086 const auto& skey
= sfmd
.smallestkey
;
1087 const auto& lkey
= sfmd
.largestkey
;
1088 assert(skey
.size() >= FLAGS_user_timestamp_size
);
1089 assert(lkey
.size() >= FLAGS_user_timestamp_size
);
1090 boundaries
.push_back(
1091 skey
.substr(0, skey
.size() - FLAGS_user_timestamp_size
));
1092 boundaries
.push_back(
1093 lkey
.substr(0, lkey
.size() - FLAGS_user_timestamp_size
));
1096 if (boundaries
.empty()) {
1100 std::vector
<std::string
> ret
;
1101 for (size_t j
= 0; j
< num_keys
; j
++) {
1103 boundaries
[thread
->rand
.Uniform(static_cast<int>(boundaries
.size()))];
1104 if (thread
->rand
.OneIn(3)) {
1105 // Reduce one byte from the string
1106 for (int i
= static_cast<int>(k
.length()) - 1; i
>= 0; i
--) {
1109 k
[i
] = static_cast<char>(cur
- 1);
1115 } else if (thread
->rand
.OneIn(2)) {
1116 // Add one byte to the string
1117 for (int i
= static_cast<int>(k
.length()) - 1; i
>= 0; i
--) {
1120 k
[i
] = static_cast<char>(cur
+ 1);
1131 #endif // !ROCKSDB_LITE
1133 // Given a key K, this creates an iterator which scans to K and then
1134 // does a random sequence of Next/Prev operations.
1135 Status
StressTest::TestIterate(ThreadState
* thread
,
1136 const ReadOptions
& read_opts
,
1137 const std::vector
<int>& rand_column_families
,
1138 const std::vector
<int64_t>& rand_keys
) {
1139 assert(!rand_column_families
.empty());
1140 assert(!rand_keys
.empty());
1142 ManagedSnapshot
snapshot_guard(db_
);
1144 ReadOptions ro
= read_opts
;
1145 ro
.snapshot
= snapshot_guard
.snapshot();
1147 std::string read_ts_str
;
1148 Slice read_ts_slice
;
1149 MaybeUseOlderTimestampForRangeScan(thread
, read_ts_str
, read_ts_slice
, ro
);
1151 bool expect_total_order
= false;
1152 if (thread
->rand
.OneIn(16)) {
1153 // When prefix extractor is used, it's useful to cover total order seek.
1154 ro
.total_order_seek
= true;
1155 expect_total_order
= true;
1156 } else if (thread
->rand
.OneIn(4)) {
1157 ro
.total_order_seek
= false;
1158 ro
.auto_prefix_mode
= true;
1159 expect_total_order
= true;
1160 } else if (options_
.prefix_extractor
.get() == nullptr) {
1161 expect_total_order
= true;
1164 std::string upper_bound_str
;
1166 if (thread
->rand
.OneIn(16)) {
1167 // With a 1/16 chance, set an iterator upper bound.
1168 // Note: upper_bound can be smaller than the seek key.
1169 const int64_t rand_upper_key
= GenerateOneKey(thread
, FLAGS_ops_per_thread
);
1170 upper_bound_str
= Key(rand_upper_key
);
1171 upper_bound
= Slice(upper_bound_str
);
1172 ro
.iterate_upper_bound
= &upper_bound
;
1174 std::string lower_bound_str
;
1176 if (thread
->rand
.OneIn(16)) {
1177 // With a 1/16 chance, enable iterator lower bound.
1178 // Note: lower_bound can be greater than the seek key.
1179 const int64_t rand_lower_key
= GenerateOneKey(thread
, FLAGS_ops_per_thread
);
1180 lower_bound_str
= Key(rand_lower_key
);
1181 lower_bound
= Slice(lower_bound_str
);
1182 ro
.iterate_lower_bound
= &lower_bound
;
1185 ColumnFamilyHandle
* const cfh
= column_families_
[rand_column_families
[0]];
1188 std::unique_ptr
<Iterator
> iter(db_
->NewIterator(ro
, cfh
));
1190 std::vector
<std::string
> key_strs
;
1191 if (thread
->rand
.OneIn(16)) {
1192 // Generate keys close to lower or upper bound of SST files.
1193 key_strs
= GetWhiteBoxKeys(thread
, db_
, cfh
, rand_keys
.size());
1195 if (key_strs
.empty()) {
1196 // Use the random keys passed in.
1197 for (int64_t rkey
: rand_keys
) {
1198 key_strs
.push_back(Key(rkey
));
1202 std::string op_logs
;
1203 constexpr size_t kOpLogsLimit
= 10000;
1205 for (const std::string
& key_str
: key_strs
) {
1206 if (op_logs
.size() > kOpLogsLimit
) {
1207 // Shouldn't take too much memory for the history log. Clear it.
1208 op_logs
= "(cleared...)\n";
1211 if (ro
.iterate_upper_bound
!= nullptr && thread
->rand
.OneIn(2)) {
1212 // With a 1/2 chance, change the upper bound.
1213 // It is possible that it is changed before first use, but there is no
1214 // problem with that.
1215 const int64_t rand_upper_key
=
1216 GenerateOneKey(thread
, FLAGS_ops_per_thread
);
1217 upper_bound_str
= Key(rand_upper_key
);
1218 upper_bound
= Slice(upper_bound_str
);
1220 if (ro
.iterate_lower_bound
!= nullptr && thread
->rand
.OneIn(4)) {
1221 // With a 1/4 chance, change the lower bound.
1222 // It is possible that it is changed before first use, but there is no
1223 // problem with that.
1224 const int64_t rand_lower_key
=
1225 GenerateOneKey(thread
, FLAGS_ops_per_thread
);
1226 lower_bound_str
= Key(rand_lower_key
);
1227 lower_bound
= Slice(lower_bound_str
);
1230 // Record some options to op_logs
1231 op_logs
+= "total_order_seek: ";
1232 op_logs
+= (ro
.total_order_seek
? "1 " : "0 ");
1233 op_logs
+= "auto_prefix_mode: ";
1234 op_logs
+= (ro
.auto_prefix_mode
? "1 " : "0 ");
1235 if (ro
.iterate_upper_bound
!= nullptr) {
1236 op_logs
+= "ub: " + upper_bound
.ToString(true) + " ";
1238 if (ro
.iterate_lower_bound
!= nullptr) {
1239 op_logs
+= "lb: " + lower_bound
.ToString(true) + " ";
1242 // Set up an iterator, perform the same operations without bounds and with
1243 // total order seek, and compare the results. This is to identify bugs
1244 // related to bounds, prefix extractor, or reseeking. Sometimes we are
1245 // comparing iterators with the same set-up, and it doesn't hurt to check
1246 // them to be equal.
1248 // This `ReadOptions` is for validation purposes. Ignore
1249 // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
1251 cmp_ro
.timestamp
= ro
.timestamp
;
1252 cmp_ro
.iter_start_ts
= ro
.iter_start_ts
;
1253 cmp_ro
.snapshot
= snapshot_guard
.snapshot();
1254 cmp_ro
.total_order_seek
= true;
1256 ColumnFamilyHandle
* const cmp_cfh
=
1257 GetControlCfh(thread
, rand_column_families
[0]);
1260 std::unique_ptr
<Iterator
> cmp_iter(db_
->NewIterator(cmp_ro
, cmp_cfh
));
1262 bool diverged
= false;
1266 const bool support_seek_first_or_last
= expect_total_order
;
1268 LastIterateOp last_op
;
1269 if (support_seek_first_or_last
&& thread
->rand
.OneIn(100)) {
1270 iter
->SeekToFirst();
1271 cmp_iter
->SeekToFirst();
1272 last_op
= kLastOpSeekToFirst
;
1274 } else if (support_seek_first_or_last
&& thread
->rand
.OneIn(100)) {
1276 cmp_iter
->SeekToLast();
1277 last_op
= kLastOpSeekToLast
;
1279 } else if (thread
->rand
.OneIn(8)) {
1280 iter
->SeekForPrev(key
);
1281 cmp_iter
->SeekForPrev(key
);
1282 last_op
= kLastOpSeekForPrev
;
1283 op_logs
+= "SFP " + key
.ToString(true) + " ";
1286 cmp_iter
->Seek(key
);
1287 last_op
= kLastOpSeek
;
1288 op_logs
+= "S " + key
.ToString(true) + " ";
1291 VerifyIterator(thread
, cmp_cfh
, ro
, iter
.get(), cmp_iter
.get(), last_op
,
1292 key
, op_logs
, &diverged
);
1294 const bool no_reverse
=
1295 (FLAGS_memtablerep
== "prefix_hash" && !expect_total_order
);
1296 for (uint64_t i
= 0; i
< FLAGS_num_iterations
&& iter
->Valid(); ++i
) {
1297 if (no_reverse
|| thread
->rand
.OneIn(2)) {
1300 assert(cmp_iter
->Valid());
1307 assert(cmp_iter
->Valid());
1313 last_op
= kLastOpNextOrPrev
;
1315 VerifyIterator(thread
, cmp_cfh
, ro
, iter
.get(), cmp_iter
.get(), last_op
,
1316 key
, op_logs
, &diverged
);
1319 thread
->stats
.AddIterations(1);
1324 return Status::OK();
1327 #ifndef ROCKSDB_LITE
1328 // Test the return status of GetLiveFiles.
1329 Status
StressTest::VerifyGetLiveFiles() const {
1330 std::vector
<std::string
> live_file
;
1331 uint64_t manifest_size
= 0;
1332 return db_
->GetLiveFiles(live_file
, &manifest_size
);
1335 // Test the return status of GetSortedWalFiles.
1336 Status
StressTest::VerifyGetSortedWalFiles() const {
1337 VectorLogPtr log_ptr
;
1338 return db_
->GetSortedWalFiles(log_ptr
);
1341 // Test the return status of GetCurrentWalFile.
1342 Status
StressTest::VerifyGetCurrentWalFile() const {
1343 std::unique_ptr
<LogFile
> cur_wal_file
;
1344 return db_
->GetCurrentWalFile(&cur_wal_file
);
1346 #endif // !ROCKSDB_LITE
1348 // Compare the two iterator, iter and cmp_iter are in the same position,
1349 // unless iter might be made invalidate or undefined because of
1350 // upper or lower bounds, or prefix extractor.
1351 // Will flag failure if the verification fails.
1352 // diverged = true if the two iterator is already diverged.
1353 // True if verification passed, false if not.
1354 void StressTest::VerifyIterator(ThreadState
* thread
,
1355 ColumnFamilyHandle
* cmp_cfh
,
1356 const ReadOptions
& ro
, Iterator
* iter
,
1357 Iterator
* cmp_iter
, LastIterateOp op
,
1358 const Slice
& seek_key
,
1359 const std::string
& op_logs
, bool* diverged
) {
1366 if (ro
.iter_start_ts
!= nullptr) {
1367 assert(FLAGS_user_timestamp_size
> 0);
1368 // We currently do not verify iterator when dumping history of internal
1374 if (op
== kLastOpSeekToFirst
&& ro
.iterate_lower_bound
!= nullptr) {
1375 // SeekToFirst() with lower bound is not well defined.
1378 } else if (op
== kLastOpSeekToLast
&& ro
.iterate_upper_bound
!= nullptr) {
1379 // SeekToLast() with higher bound is not well defined.
1382 } else if (op
== kLastOpSeek
&& ro
.iterate_lower_bound
!= nullptr &&
1383 (options_
.comparator
->CompareWithoutTimestamp(
1384 *ro
.iterate_lower_bound
, /*a_has_ts=*/false, seek_key
,
1385 /*b_has_ts=*/false) >= 0 ||
1386 (ro
.iterate_upper_bound
!= nullptr &&
1387 options_
.comparator
->CompareWithoutTimestamp(
1388 *ro
.iterate_lower_bound
, /*a_has_ts=*/false,
1389 *ro
.iterate_upper_bound
, /*b_has_ts*/ false) >= 0))) {
1390 // Lower bound behavior is not well defined if it is larger than
1391 // seek key or upper bound. Disable the check for now.
1394 } else if (op
== kLastOpSeekForPrev
&& ro
.iterate_upper_bound
!= nullptr &&
1395 (options_
.comparator
->CompareWithoutTimestamp(
1396 *ro
.iterate_upper_bound
, /*a_has_ts=*/false, seek_key
,
1397 /*b_has_ts=*/false) <= 0 ||
1398 (ro
.iterate_lower_bound
!= nullptr &&
1399 options_
.comparator
->CompareWithoutTimestamp(
1400 *ro
.iterate_lower_bound
, /*a_has_ts=*/false,
1401 *ro
.iterate_upper_bound
, /*b_has_ts=*/false) >= 0))) {
1402 // Uppder bound behavior is not well defined if it is smaller than
1403 // seek key or lower bound. Disable the check for now.
1408 const SliceTransform
* pe
= (ro
.total_order_seek
|| ro
.auto_prefix_mode
)
1410 : options_
.prefix_extractor
.get();
1411 const Comparator
* cmp
= options_
.comparator
;
1413 if (iter
->Valid() && !cmp_iter
->Valid()) {
1414 if (pe
!= nullptr) {
1415 if (!pe
->InDomain(seek_key
)) {
1416 // Prefix seek a non-in-domain key is undefined. Skip checking for
1420 } else if (!pe
->InDomain(iter
->key())) {
1421 // out of range is iterator key is not in domain anymore.
1424 } else if (pe
->Transform(iter
->key()) != pe
->Transform(seek_key
)) {
1430 "Control interator is invalid but iterator has key %s "
1432 iter
->key().ToString(true).c_str(), op_logs
.c_str());
1435 } else if (cmp_iter
->Valid()) {
1436 // Iterator is not valid. It can be legimate if it has already been
1437 // out of upper or lower bound, or filtered out by prefix iterator.
1438 const Slice
& total_order_key
= cmp_iter
->key();
1440 if (pe
!= nullptr) {
1441 if (!pe
->InDomain(seek_key
)) {
1442 // Prefix seek a non-in-domain key is undefined. Skip checking for
1448 if (!pe
->InDomain(total_order_key
) ||
1449 pe
->Transform(total_order_key
) != pe
->Transform(seek_key
)) {
1450 // If the prefix is exhausted, the only thing needs to check
1451 // is the iterator isn't return a position in prefix.
1452 // Either way, checking can stop from here.
1454 if (!iter
->Valid() || !pe
->InDomain(iter
->key()) ||
1455 pe
->Transform(iter
->key()) != pe
->Transform(seek_key
)) {
1459 "Iterator stays in prefix but contol doesn't"
1460 " iterator key %s control iterator key %s %s\n",
1461 iter
->key().ToString(true).c_str(),
1462 cmp_iter
->key().ToString(true).c_str(), op_logs
.c_str());
1465 // Check upper or lower bounds.
1467 if ((iter
->Valid() && iter
->key() != cmp_iter
->key()) ||
1469 (ro
.iterate_upper_bound
== nullptr ||
1470 cmp
->CompareWithoutTimestamp(total_order_key
, /*a_has_ts=*/false,
1471 *ro
.iterate_upper_bound
,
1472 /*b_has_ts=*/false) < 0) &&
1473 (ro
.iterate_lower_bound
== nullptr ||
1474 cmp
->CompareWithoutTimestamp(total_order_key
, /*a_has_ts=*/false,
1475 *ro
.iterate_lower_bound
,
1476 /*b_has_ts=*/false) > 0))) {
1478 "Iterator diverged from control iterator which"
1479 " has value %s %s\n",
1480 total_order_key
.ToString(true).c_str(), op_logs
.c_str());
1481 if (iter
->Valid()) {
1482 fprintf(stderr
, "iterator has value %s\n",
1483 iter
->key().ToString(true).c_str());
1485 fprintf(stderr
, "iterator is not valid\n");
1492 if (!*diverged
&& iter
->Valid()) {
1493 const WideColumns expected_columns
=
1494 GenerateExpectedWideColumns(GetValueBase(iter
->value()), iter
->value());
1495 if (iter
->columns() != expected_columns
) {
1496 fprintf(stderr
, "Value and columns inconsistent for iterator: %s\n",
1497 DebugString(iter
->value(), iter
->columns(), expected_columns
)
1505 fprintf(stderr
, "Control CF %s\n", cmp_cfh
->GetName().c_str());
1506 thread
->stats
.AddErrors(1);
1507 // Fail fast to preserve the DB state.
1508 thread
->shared
->SetVerificationFailure();
1513 Status
StressTest::TestBackupRestore(
1514 ThreadState
* /* thread */,
1515 const std::vector
<int>& /* rand_column_families */,
1516 const std::vector
<int64_t>& /* rand_keys */) {
1519 "RocksDB lite does not support "
1520 "TestBackupRestore\n");
1524 Status
StressTest::TestCheckpoint(
1525 ThreadState
* /* thread */,
1526 const std::vector
<int>& /* rand_column_families */,
1527 const std::vector
<int64_t>& /* rand_keys */) {
1530 "RocksDB lite does not support "
1531 "TestCheckpoint\n");
1535 void StressTest::TestCompactFiles(ThreadState
* /* thread */,
1536 ColumnFamilyHandle
* /* column_family */) {
1539 "RocksDB lite does not support "
1543 #else // ROCKSDB_LITE
1544 Status
StressTest::TestBackupRestore(
1545 ThreadState
* thread
, const std::vector
<int>& rand_column_families
,
1546 const std::vector
<int64_t>& rand_keys
) {
1547 std::vector
<std::unique_ptr
<MutexLock
>> locks
;
1548 if (ShouldAcquireMutexOnKey()) {
1549 for (int rand_column_family
: rand_column_families
) {
1550 // `rand_keys[0]` on each chosen CF will be verified.
1551 locks
.emplace_back(new MutexLock(
1552 thread
->shared
->GetMutexForKey(rand_column_family
, rand_keys
[0])));
1556 const std::string backup_dir
=
1557 FLAGS_db
+ "/.backup" + std::to_string(thread
->tid
);
1558 const std::string restore_dir
=
1559 FLAGS_db
+ "/.restore" + std::to_string(thread
->tid
);
1560 BackupEngineOptions
backup_opts(backup_dir
);
1561 // For debugging, get info_log from live options
1562 backup_opts
.info_log
= db_
->GetDBOptions().info_log
.get();
1563 if (thread
->rand
.OneIn(10)) {
1564 backup_opts
.share_table_files
= false;
1566 backup_opts
.share_table_files
= true;
1567 if (thread
->rand
.OneIn(5)) {
1568 backup_opts
.share_files_with_checksum
= false;
1570 backup_opts
.share_files_with_checksum
= true;
1571 if (thread
->rand
.OneIn(2)) {
1573 backup_opts
.share_files_with_checksum_naming
=
1574 BackupEngineOptions::kLegacyCrc32cAndFileSize
;
1577 backup_opts
.share_files_with_checksum_naming
=
1578 BackupEngineOptions::kUseDbSessionId
;
1580 if (thread
->rand
.OneIn(2)) {
1581 backup_opts
.share_files_with_checksum_naming
=
1582 backup_opts
.share_files_with_checksum_naming
|
1583 BackupEngineOptions::kFlagIncludeFileSize
;
1587 if (thread
->rand
.OneIn(2)) {
1588 backup_opts
.schema_version
= 1;
1590 backup_opts
.schema_version
= 2;
1592 BackupEngine
* backup_engine
= nullptr;
1593 std::string from
= "a backup/restore operation";
1594 Status s
= BackupEngine::Open(db_stress_env
, backup_opts
, &backup_engine
);
1596 from
= "BackupEngine::Open";
1599 if (backup_opts
.schema_version
>= 2 && thread
->rand
.OneIn(2)) {
1600 TEST_BackupMetaSchemaOptions test_opts
;
1601 test_opts
.crc32c_checksums
= thread
->rand
.OneIn(2) == 0;
1602 test_opts
.file_sizes
= thread
->rand
.OneIn(2) == 0;
1603 TEST_SetBackupMetaSchemaOptions(backup_engine
, test_opts
);
1605 CreateBackupOptions create_opts
;
1606 if (FLAGS_disable_wal
) {
1607 // The verification can only work when latest value of `key` is backed up,
1608 // which requires flushing in case of WAL disabled.
1610 // Note this triggers a flush with a key lock held. Meanwhile, operations
1611 // like flush/compaction may attempt to grab key locks like in
1612 // `DbStressCompactionFilter`. The philosophy around preventing deadlock
1613 // is the background operation key lock acquisition only tries but does
1614 // not wait for the lock. So here in the foreground it is OK to hold the
1615 // lock and wait on a background operation (flush).
1616 create_opts
.flush_before_backup
= true;
1618 s
= backup_engine
->CreateNewBackup(create_opts
, db_
);
1620 from
= "BackupEngine::CreateNewBackup";
1624 delete backup_engine
;
1625 backup_engine
= nullptr;
1626 s
= BackupEngine::Open(db_stress_env
, backup_opts
, &backup_engine
);
1628 from
= "BackupEngine::Open (again)";
1631 std::vector
<BackupInfo
> backup_info
;
1632 // If inplace_not_restore, we verify the backup by opening it as a
1633 // read-only DB. If !inplace_not_restore, we restore it to a temporary
1634 // directory for verification.
1635 bool inplace_not_restore
= thread
->rand
.OneIn(3);
1637 backup_engine
->GetBackupInfo(&backup_info
,
1638 /*include_file_details*/ inplace_not_restore
);
1639 if (backup_info
.empty()) {
1640 s
= Status::NotFound("no backups found");
1641 from
= "BackupEngine::GetBackupInfo";
1644 if (s
.ok() && thread
->rand
.OneIn(2)) {
1645 s
= backup_engine
->VerifyBackup(
1646 backup_info
.front().backup_id
,
1647 thread
->rand
.OneIn(2) /* verify_with_checksum */);
1649 from
= "BackupEngine::VerifyBackup";
1652 const bool allow_persistent
= thread
->tid
== 0; // not too many
1653 bool from_latest
= false;
1654 int count
= static_cast<int>(backup_info
.size());
1655 if (s
.ok() && !inplace_not_restore
) {
1657 s
= backup_engine
->RestoreDBFromBackup(
1658 RestoreOptions(), backup_info
[thread
->rand
.Uniform(count
)].backup_id
,
1659 restore_dir
/* db_dir */, restore_dir
/* wal_dir */);
1661 from
= "BackupEngine::RestoreDBFromBackup";
1665 s
= backup_engine
->RestoreDBFromLatestBackup(RestoreOptions(),
1666 restore_dir
/* db_dir */,
1667 restore_dir
/* wal_dir */);
1669 from
= "BackupEngine::RestoreDBFromLatestBackup";
1673 if (s
.ok() && !inplace_not_restore
) {
1674 // Purge early if restoring, to ensure the restored directory doesn't
1675 // have some secret dependency on the backup directory.
1676 uint32_t to_keep
= 0;
1677 if (allow_persistent
) {
1678 // allow one thread to keep up to 2 backups
1679 to_keep
= thread
->rand
.Uniform(3);
1681 s
= backup_engine
->PurgeOldBackups(to_keep
);
1683 from
= "BackupEngine::PurgeOldBackups";
1686 DB
* restored_db
= nullptr;
1687 std::vector
<ColumnFamilyHandle
*> restored_cf_handles
;
1688 // Not yet implemented: opening restored BlobDB or TransactionDB
1689 if (s
.ok() && !FLAGS_use_txn
&& !FLAGS_use_blob_db
) {
1690 Options
restore_options(options_
);
1691 restore_options
.best_efforts_recovery
= false;
1692 restore_options
.listeners
.clear();
1693 // Avoid dangling/shared file descriptors, for reliable destroy
1694 restore_options
.sst_file_manager
= nullptr;
1695 std::vector
<ColumnFamilyDescriptor
> cf_descriptors
;
1696 // TODO(ajkr): `column_family_names_` is not safe to access here when
1697 // `clear_column_family_one_in != 0`. But we can't easily switch to
1698 // `ListColumnFamilies` to get names because it won't necessarily give
1699 // the same order as `column_family_names_`.
1700 assert(FLAGS_clear_column_family_one_in
== 0);
1701 for (auto name
: column_family_names_
) {
1702 cf_descriptors
.emplace_back(name
, ColumnFamilyOptions(restore_options
));
1704 if (inplace_not_restore
) {
1705 BackupInfo
& info
= backup_info
[thread
->rand
.Uniform(count
)];
1706 restore_options
.env
= info
.env_for_open
.get();
1707 s
= DB::OpenForReadOnly(DBOptions(restore_options
), info
.name_for_open
,
1708 cf_descriptors
, &restored_cf_handles
,
1711 from
= "DB::OpenForReadOnly in backup/restore";
1714 s
= DB::Open(DBOptions(restore_options
), restore_dir
, cf_descriptors
,
1715 &restored_cf_handles
, &restored_db
);
1717 from
= "DB::Open in backup/restore";
1721 // Note the column families chosen by `rand_column_families` cannot be
1722 // dropped while the locks for `rand_keys` are held. So we should not have
1723 // to worry about accessing those column families throughout this function.
1725 // For simplicity, currently only verifies existence/non-existence of a
1727 for (size_t i
= 0; restored_db
&& s
.ok() && i
< rand_column_families
.size();
1729 std::string key_str
= Key(rand_keys
[0]);
1730 Slice key
= key_str
;
1731 std::string restored_value
;
1732 // This `ReadOptions` is for validation purposes. Ignore
1733 // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
1734 ReadOptions read_opts
;
1737 if (FLAGS_user_timestamp_size
> 0) {
1738 ts_str
= GetNowNanos();
1740 read_opts
.timestamp
= &ts
;
1742 Status get_status
= restored_db
->Get(
1743 read_opts
, restored_cf_handles
[rand_column_families
[i
]], key
,
1745 bool exists
= thread
->shared
->Exists(rand_column_families
[i
], rand_keys
[0]);
1746 if (get_status
.ok()) {
1747 if (!exists
&& from_latest
&& ShouldAcquireMutexOnKey()) {
1748 std::ostringstream oss
;
1749 oss
<< "0x" << key
.ToString(true)
1750 << " exists in restore but not in original db";
1751 s
= Status::Corruption(oss
.str());
1753 } else if (get_status
.IsNotFound()) {
1754 if (exists
&& from_latest
&& ShouldAcquireMutexOnKey()) {
1755 std::ostringstream oss
;
1756 oss
<< "0x" << key
.ToString(true)
1757 << " exists in original db but not in restore";
1758 s
= Status::Corruption(oss
.str());
1763 from
= "DB::Get in backup/restore";
1767 if (restored_db
!= nullptr) {
1768 for (auto* cf_handle
: restored_cf_handles
) {
1769 restored_db
->DestroyColumnFamilyHandle(cf_handle
);
1772 restored_db
= nullptr;
1774 if (s
.ok() && inplace_not_restore
) {
1775 // Purge late if inplace open read-only
1776 uint32_t to_keep
= 0;
1777 if (allow_persistent
) {
1778 // allow one thread to keep up to 2 backups
1779 to_keep
= thread
->rand
.Uniform(3);
1781 s
= backup_engine
->PurgeOldBackups(to_keep
);
1783 from
= "BackupEngine::PurgeOldBackups";
1786 if (backup_engine
!= nullptr) {
1787 delete backup_engine
;
1788 backup_engine
= nullptr;
1791 // Preserve directories on failure, or allowed persistent backup
1792 if (!allow_persistent
) {
1793 s
= DestroyDir(db_stress_env
, backup_dir
);
1795 from
= "Destroy backup dir";
1800 s
= DestroyDir(db_stress_env
, restore_dir
);
1802 from
= "Destroy restore dir";
1806 fprintf(stderr
, "Failure in %s with: %s\n", from
.c_str(),
1807 s
.ToString().c_str());
1812 Status
StressTest::TestApproximateSize(
1813 ThreadState
* thread
, uint64_t iteration
,
1814 const std::vector
<int>& rand_column_families
,
1815 const std::vector
<int64_t>& rand_keys
) {
1816 // rand_keys likely only has one key. Just use the first one.
1817 assert(!rand_keys
.empty());
1818 assert(!rand_column_families
.empty());
1819 int64_t key1
= rand_keys
[0];
1821 if (thread
->rand
.OneIn(2)) {
1822 // Two totally random keys. This tends to cover large ranges.
1823 key2
= GenerateOneKey(thread
, iteration
);
1825 std::swap(key1
, key2
);
1828 // Unless users pass a very large FLAGS_max_key, it we should not worry
1829 // about overflow. It is for testing, so we skip the overflow checking
1831 key2
= key1
+ static_cast<int64_t>(thread
->rand
.Uniform(1000));
1833 std::string key1_str
= Key(key1
);
1834 std::string key2_str
= Key(key2
);
1835 Range range
{Slice(key1_str
), Slice(key2_str
)};
1836 SizeApproximationOptions sao
;
1837 sao
.include_memtables
= thread
->rand
.OneIn(2);
1838 if (sao
.include_memtables
) {
1839 sao
.include_files
= thread
->rand
.OneIn(2);
1841 if (thread
->rand
.OneIn(2)) {
1842 if (thread
->rand
.OneIn(2)) {
1843 sao
.files_size_error_margin
= 0.0;
1845 sao
.files_size_error_margin
=
1846 static_cast<double>(thread
->rand
.Uniform(3));
1850 return db_
->GetApproximateSizes(
1851 sao
, column_families_
[rand_column_families
[0]], &range
, 1, &result
);
1854 Status
StressTest::TestCheckpoint(ThreadState
* thread
,
1855 const std::vector
<int>& rand_column_families
,
1856 const std::vector
<int64_t>& rand_keys
) {
1857 std::vector
<std::unique_ptr
<MutexLock
>> locks
;
1858 if (ShouldAcquireMutexOnKey()) {
1859 for (int rand_column_family
: rand_column_families
) {
1860 // `rand_keys[0]` on each chosen CF will be verified.
1861 locks
.emplace_back(new MutexLock(
1862 thread
->shared
->GetMutexForKey(rand_column_family
, rand_keys
[0])));
1866 std::string checkpoint_dir
=
1867 FLAGS_db
+ "/.checkpoint" + std::to_string(thread
->tid
);
1868 Options
tmp_opts(options_
);
1869 tmp_opts
.listeners
.clear();
1870 tmp_opts
.env
= db_stress_env
;
1872 DestroyDB(checkpoint_dir
, tmp_opts
);
1874 if (db_stress_env
->FileExists(checkpoint_dir
).ok()) {
1875 // If the directory might still exist, try to delete the files one by one.
1876 // Likely a trash file is still there.
1877 Status my_s
= DestroyDir(db_stress_env
, checkpoint_dir
);
1879 fprintf(stderr
, "Fail to destory directory before checkpoint: %s",
1880 my_s
.ToString().c_str());
1884 Checkpoint
* checkpoint
= nullptr;
1885 Status s
= Checkpoint::Create(db_
, &checkpoint
);
1887 s
= checkpoint
->CreateCheckpoint(checkpoint_dir
);
1889 fprintf(stderr
, "Fail to create checkpoint to %s\n",
1890 checkpoint_dir
.c_str());
1891 std::vector
<std::string
> files
;
1892 Status my_s
= db_stress_env
->GetChildren(checkpoint_dir
, &files
);
1894 for (const auto& f
: files
) {
1895 fprintf(stderr
, " %s\n", f
.c_str());
1898 fprintf(stderr
, "Fail to get files under the directory to %s\n",
1899 my_s
.ToString().c_str());
1904 checkpoint
= nullptr;
1905 std::vector
<ColumnFamilyHandle
*> cf_handles
;
1906 DB
* checkpoint_db
= nullptr;
1908 Options
options(options_
);
1909 options
.best_efforts_recovery
= false;
1910 options
.listeners
.clear();
1911 // Avoid race condition in trash handling after delete checkpoint_db
1912 options
.sst_file_manager
.reset();
1913 std::vector
<ColumnFamilyDescriptor
> cf_descs
;
1914 // TODO(ajkr): `column_family_names_` is not safe to access here when
1915 // `clear_column_family_one_in != 0`. But we can't easily switch to
1916 // `ListColumnFamilies` to get names because it won't necessarily give
1917 // the same order as `column_family_names_`.
1918 assert(FLAGS_clear_column_family_one_in
== 0);
1919 if (FLAGS_clear_column_family_one_in
== 0) {
1920 for (const auto& name
: column_family_names_
) {
1921 cf_descs
.emplace_back(name
, ColumnFamilyOptions(options
));
1923 s
= DB::OpenForReadOnly(DBOptions(options
), checkpoint_dir
, cf_descs
,
1924 &cf_handles
, &checkpoint_db
);
1927 if (checkpoint_db
!= nullptr) {
1928 // Note the column families chosen by `rand_column_families` cannot be
1929 // dropped while the locks for `rand_keys` are held. So we should not have
1930 // to worry about accessing those column families throughout this function.
1931 for (size_t i
= 0; s
.ok() && i
< rand_column_families
.size(); ++i
) {
1932 std::string key_str
= Key(rand_keys
[0]);
1933 Slice key
= key_str
;
1936 ReadOptions read_opts
;
1937 if (FLAGS_user_timestamp_size
> 0) {
1938 ts_str
= GetNowNanos();
1940 read_opts
.timestamp
= &ts
;
1943 Status get_status
= checkpoint_db
->Get(
1944 read_opts
, cf_handles
[rand_column_families
[i
]], key
, &value
);
1946 thread
->shared
->Exists(rand_column_families
[i
], rand_keys
[0]);
1947 if (get_status
.ok()) {
1948 if (!exists
&& ShouldAcquireMutexOnKey()) {
1949 std::ostringstream oss
;
1950 oss
<< "0x" << key
.ToString(true) << " exists in checkpoint "
1951 << checkpoint_dir
<< " but not in original db";
1952 s
= Status::Corruption(oss
.str());
1954 } else if (get_status
.IsNotFound()) {
1955 if (exists
&& ShouldAcquireMutexOnKey()) {
1956 std::ostringstream oss
;
1957 oss
<< "0x" << key
.ToString(true)
1958 << " exists in original db but not in checkpoint "
1960 s
= Status::Corruption(oss
.str());
1966 for (auto cfh
: cf_handles
) {
1970 delete checkpoint_db
;
1971 checkpoint_db
= nullptr;
1975 fprintf(stderr
, "A checkpoint operation failed with: %s\n",
1976 s
.ToString().c_str());
1978 DestroyDB(checkpoint_dir
, tmp_opts
);
1983 void StressTest::TestGetProperty(ThreadState
* thread
) const {
1984 std::unordered_set
<std::string
> levelPropertyNames
= {
1985 DB::Properties::kAggregatedTablePropertiesAtLevel
,
1986 DB::Properties::kCompressionRatioAtLevelPrefix
,
1987 DB::Properties::kNumFilesAtLevelPrefix
,
1989 std::unordered_set
<std::string
> unknownPropertyNames
= {
1990 DB::Properties::kEstimateOldestKeyTime
,
1991 DB::Properties::kOptionsStatistics
,
1993 kLiveSstFilesSizeAtTemperature
, // similar to levelPropertyNames, it
1994 // requires a number suffix
1996 unknownPropertyNames
.insert(levelPropertyNames
.begin(),
1997 levelPropertyNames
.end());
1999 std::unordered_set
<std::string
> blobCachePropertyNames
= {
2000 DB::Properties::kBlobCacheCapacity
,
2001 DB::Properties::kBlobCacheUsage
,
2002 DB::Properties::kBlobCachePinnedUsage
,
2004 if (db_
->GetOptions().blob_cache
== nullptr) {
2005 unknownPropertyNames
.insert(blobCachePropertyNames
.begin(),
2006 blobCachePropertyNames
.end());
2010 for (const auto& ppt_name_and_info
: InternalStats::ppt_name_to_info
) {
2011 bool res
= db_
->GetProperty(ppt_name_and_info
.first
, &prop
);
2012 if (unknownPropertyNames
.find(ppt_name_and_info
.first
) ==
2013 unknownPropertyNames
.end()) {
2015 fprintf(stderr
, "Failed to get DB property: %s\n",
2016 ppt_name_and_info
.first
.c_str());
2017 thread
->shared
->SetVerificationFailure();
2019 if (ppt_name_and_info
.second
.handle_int
!= nullptr) {
2021 if (!db_
->GetIntProperty(ppt_name_and_info
.first
, &prop_int
)) {
2022 fprintf(stderr
, "Failed to get Int property: %s\n",
2023 ppt_name_and_info
.first
.c_str());
2024 thread
->shared
->SetVerificationFailure();
2027 if (ppt_name_and_info
.second
.handle_map
!= nullptr) {
2028 std::map
<std::string
, std::string
> prop_map
;
2029 if (!db_
->GetMapProperty(ppt_name_and_info
.first
, &prop_map
)) {
2030 fprintf(stderr
, "Failed to get Map property: %s\n",
2031 ppt_name_and_info
.first
.c_str());
2032 thread
->shared
->SetVerificationFailure();
2038 ROCKSDB_NAMESPACE::ColumnFamilyMetaData cf_meta_data
;
2039 db_
->GetColumnFamilyMetaData(&cf_meta_data
);
2040 int level_size
= static_cast<int>(cf_meta_data
.levels
.size());
2041 for (int level
= 0; level
< level_size
; level
++) {
2042 for (const auto& ppt_name
: levelPropertyNames
) {
2043 bool res
= db_
->GetProperty(ppt_name
+ std::to_string(level
), &prop
);
2045 fprintf(stderr
, "Failed to get DB property: %s\n",
2046 (ppt_name
+ std::to_string(level
)).c_str());
2047 thread
->shared
->SetVerificationFailure();
2052 // Test for an invalid property name
2053 if (thread
->rand
.OneIn(100)) {
2054 if (db_
->GetProperty("rocksdb.invalid_property_name", &prop
)) {
2055 fprintf(stderr
, "Failed to return false for invalid property name\n");
2056 thread
->shared
->SetVerificationFailure();
2061 void StressTest::TestCompactFiles(ThreadState
* thread
,
2062 ColumnFamilyHandle
* column_family
) {
2063 ROCKSDB_NAMESPACE::ColumnFamilyMetaData cf_meta_data
;
2064 db_
->GetColumnFamilyMetaData(column_family
, &cf_meta_data
);
2066 if (cf_meta_data
.levels
.empty()) {
2070 // Randomly compact up to three consecutive files from a level
2071 const int kMaxRetry
= 3;
2072 for (int attempt
= 0; attempt
< kMaxRetry
; ++attempt
) {
2073 size_t random_level
=
2074 thread
->rand
.Uniform(static_cast<int>(cf_meta_data
.levels
.size()));
2076 const auto& files
= cf_meta_data
.levels
[random_level
].files
;
2077 if (files
.size() > 0) {
2078 size_t random_file_index
=
2079 thread
->rand
.Uniform(static_cast<int>(files
.size()));
2080 if (files
[random_file_index
].being_compacted
) {
2081 // Retry as the selected file is currently being compacted
2085 std::vector
<std::string
> input_files
;
2086 input_files
.push_back(files
[random_file_index
].name
);
2087 if (random_file_index
> 0 &&
2088 !files
[random_file_index
- 1].being_compacted
) {
2089 input_files
.push_back(files
[random_file_index
- 1].name
);
2091 if (random_file_index
+ 1 < files
.size() &&
2092 !files
[random_file_index
+ 1].being_compacted
) {
2093 input_files
.push_back(files
[random_file_index
+ 1].name
);
2096 size_t output_level
=
2097 std::min(random_level
+ 1, cf_meta_data
.levels
.size() - 1);
2098 auto s
= db_
->CompactFiles(CompactionOptions(), column_family
,
2099 input_files
, static_cast<int>(output_level
));
2101 fprintf(stdout
, "Unable to perform CompactFiles(): %s\n",
2102 s
.ToString().c_str());
2103 thread
->stats
.AddNumCompactFilesFailed(1);
2105 thread
->stats
.AddNumCompactFilesSucceed(1);
2111 #endif // ROCKSDB_LITE
2113 Status
StressTest::TestFlush(const std::vector
<int>& rand_column_families
) {
2114 FlushOptions flush_opts
;
2115 if (FLAGS_atomic_flush
) {
2116 return db_
->Flush(flush_opts
, column_families_
);
2118 std::vector
<ColumnFamilyHandle
*> cfhs
;
2119 std::for_each(rand_column_families
.begin(), rand_column_families
.end(),
2120 [this, &cfhs
](int k
) { cfhs
.push_back(column_families_
[k
]); });
2121 return db_
->Flush(flush_opts
, cfhs
);
2124 Status
StressTest::TestPauseBackground(ThreadState
* thread
) {
2125 Status status
= db_
->PauseBackgroundWork();
2129 // To avoid stalling/deadlocking ourself in this thread, just
2130 // sleep here during pause and let other threads do db operations.
2131 // Sleep up to ~16 seconds (2**24 microseconds), but very skewed
2132 // toward short pause. (1 chance in 25 of pausing >= 1s;
2133 // 1 chance in 625 of pausing full 16s.)
2135 std::min(thread
->rand
.Uniform(25), thread
->rand
.Uniform(25));
2136 clock_
->SleepForMicroseconds(1 << pwr2_micros
);
2137 return db_
->ContinueBackgroundWork();
2140 void StressTest::TestAcquireSnapshot(ThreadState
* thread
,
2141 int rand_column_family
,
2142 const std::string
& keystr
, uint64_t i
) {
2144 ColumnFamilyHandle
* column_family
= column_families_
[rand_column_family
];
2145 // This `ReadOptions` is for validation purposes. Ignore
2146 // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
2148 #ifndef ROCKSDB_LITE
2149 auto db_impl
= static_cast_with_check
<DBImpl
>(db_
->GetRootDB());
2150 const bool ww_snapshot
= thread
->rand
.OneIn(10);
2151 const Snapshot
* snapshot
=
2152 ww_snapshot
? db_impl
->GetSnapshotForWriteConflictBoundary()
2153 : db_
->GetSnapshot();
2155 const Snapshot
* snapshot
= db_
->GetSnapshot();
2156 #endif // !ROCKSDB_LITE
2157 ropt
.snapshot
= snapshot
;
2159 // Ideally, we want snapshot taking and timestamp generation to be atomic
2160 // here, so that the snapshot corresponds to the timestamp. However, it is
2161 // not possible with current GetSnapshot() API.
2164 if (FLAGS_user_timestamp_size
> 0) {
2165 ts_str
= GetNowNanos();
2167 ropt
.timestamp
= &ts
;
2170 std::string value_at
;
2171 // When taking a snapshot, we also read a key from that snapshot. We
2172 // will later read the same key before releasing the snapshot and
2173 // verify that the results are the same.
2174 auto status_at
= db_
->Get(ropt
, column_family
, key
, &value_at
);
2175 std::vector
<bool>* key_vec
= nullptr;
2177 if (FLAGS_compare_full_db_state_snapshot
&& (thread
->tid
== 0)) {
2178 key_vec
= new std::vector
<bool>(FLAGS_max_key
);
2179 // When `prefix_extractor` is set, seeking to beginning and scanning
2180 // across prefixes are only supported with `total_order_seek` set.
2181 ropt
.total_order_seek
= true;
2182 std::unique_ptr
<Iterator
> iterator(db_
->NewIterator(ropt
));
2183 for (iterator
->SeekToFirst(); iterator
->Valid(); iterator
->Next()) {
2185 if (GetIntVal(iterator
->key().ToString(), &key_val
)) {
2186 (*key_vec
)[key_val
] = true;
2191 ThreadState::SnapshotState snap_state
= {snapshot
,
2193 column_family
->GetName(),
2199 uint64_t hold_for
= FLAGS_snapshot_hold_ops
;
2200 if (FLAGS_long_running_snapshots
) {
2201 // Hold 10% of snapshots for 10x more
2202 if (thread
->rand
.OneIn(10)) {
2203 assert(hold_for
< std::numeric_limits
<uint64_t>::max() / 10);
2205 // Hold 1% of snapshots for 100x more
2206 if (thread
->rand
.OneIn(10)) {
2207 assert(hold_for
< std::numeric_limits
<uint64_t>::max() / 10);
2212 uint64_t release_at
= std::min(FLAGS_ops_per_thread
- 1, i
+ hold_for
);
2213 thread
->snapshot_queue
.emplace(release_at
, snap_state
);
2216 Status
StressTest::MaybeReleaseSnapshots(ThreadState
* thread
, uint64_t i
) {
2217 while (!thread
->snapshot_queue
.empty() &&
2218 i
>= thread
->snapshot_queue
.front().first
) {
2219 auto snap_state
= thread
->snapshot_queue
.front().second
;
2220 assert(snap_state
.snapshot
);
2221 // Note: this is unsafe as the cf might be dropped concurrently. But
2222 // it is ok since unclean cf drop is cunnrently not supported by write
2223 // prepared transactions.
2224 Status s
= AssertSame(db_
, column_families_
[snap_state
.cf_at
], snap_state
);
2225 db_
->ReleaseSnapshot(snap_state
.snapshot
);
2226 delete snap_state
.key_vec
;
2227 thread
->snapshot_queue
.pop();
2232 return Status::OK();
2235 void StressTest::TestCompactRange(ThreadState
* thread
, int64_t rand_key
,
2236 const Slice
& start_key
,
2237 ColumnFamilyHandle
* column_family
) {
2238 int64_t end_key_num
;
2239 if (std::numeric_limits
<int64_t>::max() - rand_key
<
2240 FLAGS_compact_range_width
) {
2241 end_key_num
= std::numeric_limits
<int64_t>::max();
2243 end_key_num
= FLAGS_compact_range_width
+ rand_key
;
2245 std::string end_key_buf
= Key(end_key_num
);
2246 Slice
end_key(end_key_buf
);
2248 CompactRangeOptions cro
;
2249 cro
.exclusive_manual_compaction
= static_cast<bool>(thread
->rand
.Next() % 2);
2250 cro
.change_level
= static_cast<bool>(thread
->rand
.Next() % 2);
2251 std::vector
<BottommostLevelCompaction
> bottom_level_styles
= {
2252 BottommostLevelCompaction::kSkip
,
2253 BottommostLevelCompaction::kIfHaveCompactionFilter
,
2254 BottommostLevelCompaction::kForce
,
2255 BottommostLevelCompaction::kForceOptimized
};
2256 cro
.bottommost_level_compaction
=
2257 bottom_level_styles
[thread
->rand
.Next() %
2258 static_cast<uint32_t>(bottom_level_styles
.size())];
2259 cro
.allow_write_stall
= static_cast<bool>(thread
->rand
.Next() % 2);
2260 cro
.max_subcompactions
= static_cast<uint32_t>(thread
->rand
.Next() % 4);
2261 std::vector
<BlobGarbageCollectionPolicy
> blob_gc_policies
= {
2262 BlobGarbageCollectionPolicy::kForce
,
2263 BlobGarbageCollectionPolicy::kDisable
,
2264 BlobGarbageCollectionPolicy::kUseDefault
};
2265 cro
.blob_garbage_collection_policy
=
2266 blob_gc_policies
[thread
->rand
.Next() %
2267 static_cast<uint32_t>(blob_gc_policies
.size())];
2268 cro
.blob_garbage_collection_age_cutoff
=
2269 static_cast<double>(thread
->rand
.Next() % 100) / 100.0;
2271 const Snapshot
* pre_snapshot
= nullptr;
2272 uint32_t pre_hash
= 0;
2273 if (thread
->rand
.OneIn(2)) {
2274 // Do some validation by declaring a snapshot and compare the data before
2275 // and after the compaction
2276 pre_snapshot
= db_
->GetSnapshot();
2278 GetRangeHash(thread
, pre_snapshot
, column_family
, start_key
, end_key
);
2281 Status status
= db_
->CompactRange(cro
, column_family
, &start_key
, &end_key
);
2284 fprintf(stdout
, "Unable to perform CompactRange(): %s\n",
2285 status
.ToString().c_str());
2288 if (pre_snapshot
!= nullptr) {
2289 uint32_t post_hash
=
2290 GetRangeHash(thread
, pre_snapshot
, column_family
, start_key
, end_key
);
2291 if (pre_hash
!= post_hash
) {
2293 "Data hash different before and after compact range "
2294 "start_key %s end_key %s\n",
2295 start_key
.ToString(true).c_str(), end_key
.ToString(true).c_str());
2296 thread
->stats
.AddErrors(1);
2297 // Fail fast to preserve the DB state.
2298 thread
->shared
->SetVerificationFailure();
2300 db_
->ReleaseSnapshot(pre_snapshot
);
2304 uint32_t StressTest::GetRangeHash(ThreadState
* thread
, const Snapshot
* snapshot
,
2305 ColumnFamilyHandle
* column_family
,
2306 const Slice
& start_key
,
2307 const Slice
& end_key
) {
2308 // This `ReadOptions` is for validation purposes. Ignore
2309 // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
2311 ro
.snapshot
= snapshot
;
2312 ro
.total_order_seek
= true;
2315 if (FLAGS_user_timestamp_size
> 0) {
2316 ts_str
= GetNowNanos();
2321 std::unique_ptr
<Iterator
> it(db_
->NewIterator(ro
, column_family
));
2323 constexpr char kCrcCalculatorSepearator
= ';';
2327 for (it
->Seek(start_key
);
2328 it
->Valid() && options_
.comparator
->Compare(it
->key(), end_key
) <= 0;
2330 crc
= crc32c::Extend(crc
, it
->key().data(), it
->key().size());
2331 crc
= crc32c::Extend(crc
, &kCrcCalculatorSepearator
, sizeof(char));
2332 crc
= crc32c::Extend(crc
, it
->value().data(), it
->value().size());
2333 crc
= crc32c::Extend(crc
, &kCrcCalculatorSepearator
, sizeof(char));
2335 for (const auto& column
: it
->columns()) {
2336 crc
= crc32c::Extend(crc
, column
.name().data(), column
.name().size());
2337 crc
= crc32c::Extend(crc
, &kCrcCalculatorSepearator
, sizeof(char));
2338 crc
= crc32c::Extend(crc
, column
.value().data(), column
.value().size());
2339 crc
= crc32c::Extend(crc
, &kCrcCalculatorSepearator
, sizeof(char));
2343 if (!it
->status().ok()) {
2344 fprintf(stderr
, "Iterator non-OK when calculating range CRC: %s\n",
2345 it
->status().ToString().c_str());
2346 thread
->stats
.AddErrors(1);
2347 // Fail fast to preserve the DB state.
2348 thread
->shared
->SetVerificationFailure();
2354 void StressTest::PrintEnv() const {
2355 fprintf(stdout
, "RocksDB version : %d.%d\n", kMajorVersion
,
2357 fprintf(stdout
, "Format version : %d\n", FLAGS_format_version
);
2358 fprintf(stdout
, "TransactionDB : %s\n",
2359 FLAGS_use_txn
? "true" : "false");
2361 if (FLAGS_use_txn
) {
2362 #ifndef ROCKSDB_LITE
2363 fprintf(stdout
, "Two write queues: : %s\n",
2364 FLAGS_two_write_queues
? "true" : "false");
2365 fprintf(stdout
, "Write policy : %d\n",
2366 static_cast<int>(FLAGS_txn_write_policy
));
2367 if (static_cast<uint64_t>(TxnDBWritePolicy::WRITE_PREPARED
) ==
2368 FLAGS_txn_write_policy
||
2369 static_cast<uint64_t>(TxnDBWritePolicy::WRITE_UNPREPARED
) ==
2370 FLAGS_txn_write_policy
) {
2371 fprintf(stdout
, "Snapshot cache bits : %d\n",
2372 static_cast<int>(FLAGS_wp_snapshot_cache_bits
));
2373 fprintf(stdout
, "Commit cache bits : %d\n",
2374 static_cast<int>(FLAGS_wp_commit_cache_bits
));
2376 fprintf(stdout
, "last cwb for recovery : %s\n",
2377 FLAGS_use_only_the_last_commit_time_batch_for_recovery
? "true"
2379 #endif // !ROCKSDB_LITE
2382 #ifndef ROCKSDB_LITE
2383 fprintf(stdout
, "Stacked BlobDB : %s\n",
2384 FLAGS_use_blob_db
? "true" : "false");
2385 #endif // !ROCKSDB_LITE
2386 fprintf(stdout
, "Read only mode : %s\n",
2387 FLAGS_read_only
? "true" : "false");
2388 fprintf(stdout
, "Atomic flush : %s\n",
2389 FLAGS_atomic_flush
? "true" : "false");
2390 fprintf(stdout
, "Manual WAL flush : %s\n",
2391 FLAGS_manual_wal_flush_one_in
> 0 ? "true" : "false");
2392 fprintf(stdout
, "Column families : %d\n", FLAGS_column_families
);
2393 if (!FLAGS_test_batches_snapshots
) {
2394 fprintf(stdout
, "Clear CFs one in : %d\n",
2395 FLAGS_clear_column_family_one_in
);
2397 fprintf(stdout
, "Number of threads : %d\n", FLAGS_threads
);
2398 fprintf(stdout
, "Ops per thread : %lu\n",
2399 (unsigned long)FLAGS_ops_per_thread
);
2400 std::string
ttl_state("unused");
2401 if (FLAGS_ttl
> 0) {
2402 ttl_state
= std::to_string(FLAGS_ttl
);
2404 fprintf(stdout
, "Time to live(sec) : %s\n", ttl_state
.c_str());
2405 fprintf(stdout
, "Read percentage : %d%%\n", FLAGS_readpercent
);
2406 fprintf(stdout
, "Prefix percentage : %d%%\n", FLAGS_prefixpercent
);
2407 fprintf(stdout
, "Write percentage : %d%%\n", FLAGS_writepercent
);
2408 fprintf(stdout
, "Delete percentage : %d%%\n", FLAGS_delpercent
);
2409 fprintf(stdout
, "Delete range percentage : %d%%\n", FLAGS_delrangepercent
);
2410 fprintf(stdout
, "No overwrite percentage : %d%%\n",
2411 FLAGS_nooverwritepercent
);
2412 fprintf(stdout
, "Iterate percentage : %d%%\n", FLAGS_iterpercent
);
2413 fprintf(stdout
, "Custom ops percentage : %d%%\n", FLAGS_customopspercent
);
2414 fprintf(stdout
, "DB-write-buffer-size : %" PRIu64
"\n",
2415 FLAGS_db_write_buffer_size
);
2416 fprintf(stdout
, "Write-buffer-size : %d\n", FLAGS_write_buffer_size
);
2417 fprintf(stdout
, "Iterations : %lu\n",
2418 (unsigned long)FLAGS_num_iterations
);
2419 fprintf(stdout
, "Max key : %lu\n",
2420 (unsigned long)FLAGS_max_key
);
2421 fprintf(stdout
, "Ratio #ops/#keys : %f\n",
2422 (1.0 * FLAGS_ops_per_thread
* FLAGS_threads
) / FLAGS_max_key
);
2423 fprintf(stdout
, "Num times DB reopens : %d\n", FLAGS_reopen
);
2424 fprintf(stdout
, "Batches/snapshots : %d\n",
2425 FLAGS_test_batches_snapshots
);
2426 fprintf(stdout
, "Do update in place : %d\n", FLAGS_in_place_update
);
2427 fprintf(stdout
, "Num keys per lock : %d\n",
2428 1 << FLAGS_log2_keys_per_lock
);
2429 std::string compression
= CompressionTypeToString(compression_type_e
);
2430 fprintf(stdout
, "Compression : %s\n", compression
.c_str());
2431 std::string bottommost_compression
=
2432 CompressionTypeToString(bottommost_compression_type_e
);
2433 fprintf(stdout
, "Bottommost Compression : %s\n",
2434 bottommost_compression
.c_str());
2435 std::string checksum
= ChecksumTypeToString(checksum_type_e
);
2436 fprintf(stdout
, "Checksum type : %s\n", checksum
.c_str());
2437 fprintf(stdout
, "File checksum impl : %s\n",
2438 FLAGS_file_checksum_impl
.c_str());
2439 fprintf(stdout
, "Bloom bits / key : %s\n",
2440 FormatDoubleParam(FLAGS_bloom_bits
).c_str());
2441 fprintf(stdout
, "Max subcompactions : %" PRIu64
"\n",
2442 FLAGS_subcompactions
);
2443 fprintf(stdout
, "Use MultiGet : %s\n",
2444 FLAGS_use_multiget
? "true" : "false");
2446 const char* memtablerep
= "";
2447 switch (FLAGS_rep_factory
) {
2449 memtablerep
= "skip_list";
2452 memtablerep
= "prefix_hash";
2455 memtablerep
= "vector";
2459 fprintf(stdout
, "Memtablerep : %s\n", memtablerep
);
2462 KillPoint
* kp
= KillPoint::GetInstance();
2463 fprintf(stdout
, "Test kill odd : %d\n", kp
->rocksdb_kill_odds
);
2464 if (!kp
->rocksdb_kill_exclude_prefixes
.empty()) {
2465 fprintf(stdout
, "Skipping kill points prefixes:\n");
2466 for (auto& p
: kp
->rocksdb_kill_exclude_prefixes
) {
2467 fprintf(stdout
, " %s\n", p
.c_str());
2471 fprintf(stdout
, "Periodic Compaction Secs : %" PRIu64
"\n",
2472 FLAGS_periodic_compaction_seconds
);
2473 fprintf(stdout
, "Compaction TTL : %" PRIu64
"\n",
2474 FLAGS_compaction_ttl
);
2475 const char* compaction_pri
= "";
2476 switch (FLAGS_compaction_pri
) {
2477 case kByCompensatedSize
:
2478 compaction_pri
= "kByCompensatedSize";
2480 case kOldestLargestSeqFirst
:
2481 compaction_pri
= "kOldestLargestSeqFirst";
2483 case kOldestSmallestSeqFirst
:
2484 compaction_pri
= "kOldestSmallestSeqFirst";
2486 case kMinOverlappingRatio
:
2487 compaction_pri
= "kMinOverlappingRatio";
2490 compaction_pri
= "kRoundRobin";
2493 fprintf(stdout
, "Compaction Pri : %s\n", compaction_pri
);
2494 fprintf(stdout
, "Background Purge : %d\n",
2495 static_cast<int>(FLAGS_avoid_unnecessary_blocking_io
));
2496 fprintf(stdout
, "Write DB ID to manifest : %d\n",
2497 static_cast<int>(FLAGS_write_dbid_to_manifest
));
2498 fprintf(stdout
, "Max Write Batch Group Size: %" PRIu64
"\n",
2499 FLAGS_max_write_batch_group_size_bytes
);
2500 fprintf(stdout
, "Use dynamic level : %d\n",
2501 static_cast<int>(FLAGS_level_compaction_dynamic_level_bytes
));
2502 fprintf(stdout
, "Read fault one in : %d\n", FLAGS_read_fault_one_in
);
2503 fprintf(stdout
, "Write fault one in : %d\n", FLAGS_write_fault_one_in
);
2504 fprintf(stdout
, "Open metadata write fault one in:\n");
2505 fprintf(stdout
, " %d\n",
2506 FLAGS_open_metadata_write_fault_one_in
);
2507 fprintf(stdout
, "Sync fault injection : %d\n",
2508 FLAGS_sync_fault_injection
);
2509 fprintf(stdout
, "Best efforts recovery : %d\n",
2510 static_cast<int>(FLAGS_best_efforts_recovery
));
2511 fprintf(stdout
, "Fail if OPTIONS file error: %d\n",
2512 static_cast<int>(FLAGS_fail_if_options_file_error
));
2513 fprintf(stdout
, "User timestamp size bytes : %d\n",
2514 static_cast<int>(FLAGS_user_timestamp_size
));
2515 fprintf(stdout
, "WAL compression : %s\n",
2516 FLAGS_wal_compression
.c_str());
2517 fprintf(stdout
, "Try verify sst unique id : %d\n",
2518 static_cast<int>(FLAGS_verify_sst_unique_id_in_manifest
));
2520 fprintf(stdout
, "------------------------------------------------\n");
2523 void StressTest::Open(SharedState
* shared
) {
2524 assert(db_
== nullptr);
2525 #ifndef ROCKSDB_LITE
2526 assert(txn_db_
== nullptr);
2530 if (!InitializeOptionsFromFile(options_
)) {
2531 InitializeOptionsFromFlags(cache_
, compressed_cache_
, filter_policy_
,
2534 InitializeOptionsGeneral(cache_
, compressed_cache_
, filter_policy_
, options_
);
2536 if (FLAGS_prefix_size
== 0 && FLAGS_rep_factory
== kHashSkipList
) {
2538 "prefeix_size cannot be zero if memtablerep == prefix_hash\n");
2541 if (FLAGS_prefix_size
!= 0 && FLAGS_rep_factory
!= kHashSkipList
) {
2543 "WARNING: prefix_size is non-zero but "
2544 "memtablerep != prefix_hash\n");
2547 if ((options_
.enable_blob_files
|| options_
.enable_blob_garbage_collection
||
2548 FLAGS_allow_setting_blob_options_dynamically
) &&
2549 FLAGS_best_efforts_recovery
) {
2551 "Integrated BlobDB is currently incompatible with best-effort "
2557 "Integrated BlobDB: blob files enabled %d, min blob size %" PRIu64
2558 ", blob file size %" PRIu64
2559 ", blob compression type %s, blob GC enabled %d, cutoff %f, force "
2560 "threshold %f, blob compaction readahead size %" PRIu64
2561 ", blob file starting level %d\n",
2562 options_
.enable_blob_files
, options_
.min_blob_size
,
2563 options_
.blob_file_size
,
2564 CompressionTypeToString(options_
.blob_compression_type
).c_str(),
2565 options_
.enable_blob_garbage_collection
,
2566 options_
.blob_garbage_collection_age_cutoff
,
2567 options_
.blob_garbage_collection_force_threshold
,
2568 options_
.blob_compaction_readahead_size
,
2569 options_
.blob_file_starting_level
);
2571 if (FLAGS_use_blob_cache
) {
2573 "Integrated BlobDB: blob cache enabled"
2574 ", block and blob caches shared: %d",
2575 FLAGS_use_shared_block_and_blob_cache
);
2576 if (!FLAGS_use_shared_block_and_blob_cache
) {
2578 ", blob cache size %" PRIu64
", blob cache num shard bits: %d",
2579 FLAGS_blob_cache_size
, FLAGS_blob_cache_numshardbits
);
2581 fprintf(stdout
, ", blob cache prepopulated: %d\n",
2582 FLAGS_prepopulate_blob_cache
);
2584 fprintf(stdout
, "Integrated BlobDB: blob cache disabled\n");
2587 fprintf(stdout
, "DB path: [%s]\n", FLAGS_db
.c_str());
2591 if (FLAGS_ttl
== -1) {
2592 std::vector
<std::string
> existing_column_families
;
2593 s
= DB::ListColumnFamilies(DBOptions(options_
), FLAGS_db
,
2594 &existing_column_families
); // ignore errors
2597 assert(existing_column_families
.empty());
2598 assert(column_family_names_
.empty());
2599 column_family_names_
.push_back(kDefaultColumnFamilyName
);
2600 } else if (column_family_names_
.empty()) {
2601 // this is the first call to the function Open()
2602 column_family_names_
= existing_column_families
;
2604 // this is a reopen. just assert that existing column_family_names are
2605 // equivalent to what we remember
2606 auto sorted_cfn
= column_family_names_
;
2607 std::sort(sorted_cfn
.begin(), sorted_cfn
.end());
2608 std::sort(existing_column_families
.begin(),
2609 existing_column_families
.end());
2610 if (sorted_cfn
!= existing_column_families
) {
2611 fprintf(stderr
, "Expected column families differ from the existing:\n");
2612 fprintf(stderr
, "Expected: {");
2613 for (auto cf
: sorted_cfn
) {
2614 fprintf(stderr
, "%s ", cf
.c_str());
2616 fprintf(stderr
, "}\n");
2617 fprintf(stderr
, "Existing: {");
2618 for (auto cf
: existing_column_families
) {
2619 fprintf(stderr
, "%s ", cf
.c_str());
2621 fprintf(stderr
, "}\n");
2623 assert(sorted_cfn
== existing_column_families
);
2625 std::vector
<ColumnFamilyDescriptor
> cf_descriptors
;
2626 for (auto name
: column_family_names_
) {
2627 if (name
!= kDefaultColumnFamilyName
) {
2628 new_column_family_name_
=
2629 std::max(new_column_family_name_
.load(), std::stoi(name
) + 1);
2631 cf_descriptors
.emplace_back(name
, ColumnFamilyOptions(options_
));
2633 while (cf_descriptors
.size() < (size_t)FLAGS_column_families
) {
2634 std::string name
= std::to_string(new_column_family_name_
.load());
2635 new_column_family_name_
++;
2636 cf_descriptors
.emplace_back(name
, ColumnFamilyOptions(options_
));
2637 column_family_names_
.push_back(name
);
2640 options_
.listeners
.clear();
2641 #ifndef ROCKSDB_LITE
2642 options_
.listeners
.emplace_back(new DbStressListener(
2643 FLAGS_db
, options_
.db_paths
, cf_descriptors
, db_stress_listener_env
));
2644 #endif // !ROCKSDB_LITE
2645 RegisterAdditionalListeners();
2647 if (!FLAGS_use_txn
) {
2648 // Determine whether we need to ingest file metadata write failures
2649 // during DB reopen. If it does, enable it.
2650 // Only ingest metadata error if it is reopening, as initial open
2651 // failure doesn't need to be handled.
2652 // TODO cover transaction DB is not covered in this fault test too.
2653 bool ingest_meta_error
= false;
2654 bool ingest_write_error
= false;
2655 bool ingest_read_error
= false;
2656 if ((FLAGS_open_metadata_write_fault_one_in
||
2657 FLAGS_open_write_fault_one_in
|| FLAGS_open_read_fault_one_in
) &&
2659 ->FileExists(FLAGS_db
+ "/CURRENT", IOOptions(), nullptr)
2662 // When DB Stress is not sync mode, we expect all WAL writes to
2663 // WAL is durable. Buffering unsynced writes will cause false
2664 // positive in crash tests. Before we figure out a way to
2665 // solve it, skip WAL from failure injection.
2666 fault_fs_guard
->SetSkipDirectWritableTypes({kWalFile
});
2668 ingest_meta_error
= FLAGS_open_metadata_write_fault_one_in
;
2669 ingest_write_error
= FLAGS_open_write_fault_one_in
;
2670 ingest_read_error
= FLAGS_open_read_fault_one_in
;
2671 if (ingest_meta_error
) {
2672 fault_fs_guard
->EnableMetadataWriteErrorInjection();
2673 fault_fs_guard
->SetRandomMetadataWriteError(
2674 FLAGS_open_metadata_write_fault_one_in
);
2676 if (ingest_write_error
) {
2677 fault_fs_guard
->SetFilesystemDirectWritable(false);
2678 fault_fs_guard
->EnableWriteErrorInjection();
2679 fault_fs_guard
->SetRandomWriteError(
2680 static_cast<uint32_t>(FLAGS_seed
), FLAGS_open_write_fault_one_in
,
2681 IOStatus::IOError("Injected Open Error"),
2682 /*inject_for_all_file_types=*/true, /*types=*/{});
2684 if (ingest_read_error
) {
2685 fault_fs_guard
->SetRandomReadError(FLAGS_open_read_fault_one_in
);
2689 #ifndef ROCKSDB_LITE
2690 // StackableDB-based BlobDB
2691 if (FLAGS_use_blob_db
) {
2692 blob_db::BlobDBOptions blob_db_options
;
2693 blob_db_options
.min_blob_size
= FLAGS_blob_db_min_blob_size
;
2694 blob_db_options
.bytes_per_sync
= FLAGS_blob_db_bytes_per_sync
;
2695 blob_db_options
.blob_file_size
= FLAGS_blob_db_file_size
;
2696 blob_db_options
.enable_garbage_collection
= FLAGS_blob_db_enable_gc
;
2697 blob_db_options
.garbage_collection_cutoff
= FLAGS_blob_db_gc_cutoff
;
2699 blob_db::BlobDB
* blob_db
= nullptr;
2700 s
= blob_db::BlobDB::Open(options_
, blob_db_options
, FLAGS_db
,
2701 cf_descriptors
, &column_families_
,
2707 #endif // !ROCKSDB_LITE
2709 if (db_preload_finished_
.load() && FLAGS_read_only
) {
2710 s
= DB::OpenForReadOnly(DBOptions(options_
), FLAGS_db
,
2711 cf_descriptors
, &column_families_
, &db_
);
2713 s
= DB::Open(DBOptions(options_
), FLAGS_db
, cf_descriptors
,
2714 &column_families_
, &db_
);
2718 if (ingest_meta_error
|| ingest_write_error
|| ingest_read_error
) {
2719 fault_fs_guard
->SetFilesystemDirectWritable(true);
2720 fault_fs_guard
->DisableMetadataWriteErrorInjection();
2721 fault_fs_guard
->DisableWriteErrorInjection();
2722 fault_fs_guard
->SetSkipDirectWritableTypes({});
2723 fault_fs_guard
->SetRandomReadError(0);
2725 // Ingested errors might happen in background compactions. We
2726 // wait for all compactions to finish to make sure DB is in
2727 // clean state before executing queries.
2728 s
= static_cast_with_check
<DBImpl
>(db_
->GetRootDB())
2729 ->WaitForCompact(true /* wait_unscheduled */);
2731 for (auto cf
: column_families_
) {
2734 column_families_
.clear();
2740 // After failure to opening a DB due to IO error, retry should
2741 // successfully open the DB with correct data if no IO error shows
2743 ingest_meta_error
= false;
2744 ingest_write_error
= false;
2745 ingest_read_error
= false;
2747 Random
rand(static_cast<uint32_t>(FLAGS_seed
));
2748 if (rand
.OneIn(2)) {
2749 fault_fs_guard
->DeleteFilesCreatedAfterLastDirSync(IOOptions(),
2752 if (rand
.OneIn(3)) {
2753 fault_fs_guard
->DropUnsyncedFileData();
2754 } else if (rand
.OneIn(2)) {
2755 fault_fs_guard
->DropRandomUnsyncedFileData(&rand
);
2763 #ifndef ROCKSDB_LITE
2764 TransactionDBOptions txn_db_options
;
2765 assert(FLAGS_txn_write_policy
<= TxnDBWritePolicy::WRITE_UNPREPARED
);
2766 txn_db_options
.write_policy
=
2767 static_cast<TxnDBWritePolicy
>(FLAGS_txn_write_policy
);
2768 if (FLAGS_unordered_write
) {
2769 assert(txn_db_options
.write_policy
== TxnDBWritePolicy::WRITE_PREPARED
);
2770 options_
.unordered_write
= true;
2771 options_
.two_write_queues
= true;
2772 txn_db_options
.skip_concurrency_control
= true;
2774 options_
.two_write_queues
= FLAGS_two_write_queues
;
2776 txn_db_options
.wp_snapshot_cache_bits
=
2777 static_cast<size_t>(FLAGS_wp_snapshot_cache_bits
);
2778 txn_db_options
.wp_commit_cache_bits
=
2779 static_cast<size_t>(FLAGS_wp_commit_cache_bits
);
2780 PrepareTxnDbOptions(shared
, txn_db_options
);
2781 s
= TransactionDB::Open(options_
, txn_db_options
, FLAGS_db
,
2782 cf_descriptors
, &column_families_
, &txn_db_
);
2784 fprintf(stderr
, "Error in opening the TransactionDB [%s]\n",
2785 s
.ToString().c_str());
2790 // Do not swap the order of the following.
2793 db_aptr_
.store(txn_db_
, std::memory_order_release
);
2798 fprintf(stderr
, "Error in opening the DB [%s]\n", s
.ToString().c_str());
2802 assert(column_families_
.size() ==
2803 static_cast<size_t>(FLAGS_column_families
));
2805 // Secondary instance does not support write-prepared/write-unprepared
2806 // transactions, thus just disable secondary instance if we use
2808 if (s
.ok() && FLAGS_test_secondary
&& !FLAGS_use_txn
) {
2809 #ifndef ROCKSDB_LITE
2811 // TODO(yanqin) support max_open_files != -1 for secondary instance.
2812 tmp_opts
.max_open_files
= -1;
2813 tmp_opts
.env
= db_stress_env
;
2814 const std::string
& secondary_path
= FLAGS_secondaries_base
;
2815 s
= DB::OpenAsSecondary(tmp_opts
, FLAGS_db
, secondary_path
,
2816 cf_descriptors
, &cmp_cfhs_
, &cmp_db_
);
2818 assert(cmp_cfhs_
.size() == static_cast<size_t>(FLAGS_column_families
));
2820 fprintf(stderr
, "Secondary is not supported in RocksDBLite\n");
2822 #endif // !ROCKSDB_LITE
2825 #ifndef ROCKSDB_LITE
2826 DBWithTTL
* db_with_ttl
;
2827 s
= DBWithTTL::Open(options_
, FLAGS_db
, &db_with_ttl
, FLAGS_ttl
);
2830 fprintf(stderr
, "TTL is not supported in RocksDBLite\n");
2835 if (FLAGS_preserve_unverified_changes
) {
2836 // Up until now, no live file should have become obsolete due to these
2837 // options. After `DisableFileDeletions()` we can reenable auto compactions
2838 // since, even if live files become obsolete, they won't be deleted.
2839 assert(options_
.avoid_flush_during_recovery
);
2840 assert(options_
.disable_auto_compactions
);
2842 s
= db_
->DisableFileDeletions();
2845 s
= db_
->EnableAutoCompaction(column_families_
);
2850 fprintf(stderr
, "open error: %s\n", s
.ToString().c_str());
2855 void StressTest::Reopen(ThreadState
* thread
) {
2856 #ifndef ROCKSDB_LITE
2857 // BG jobs in WritePrepared must be canceled first because i) they can access
2858 // the db via a callbac ii) they hold on to a snapshot and the upcoming
2859 // ::Close would complain about it.
2860 const bool write_prepared
= FLAGS_use_txn
&& FLAGS_txn_write_policy
!= 0;
2861 bool bg_canceled
__attribute__((unused
)) = false;
2862 if (write_prepared
|| thread
->rand
.OneIn(2)) {
2864 write_prepared
|| static_cast<bool>(thread
->rand
.OneIn(2));
2865 CancelAllBackgroundWork(db_
, wait
);
2868 assert(!write_prepared
|| bg_canceled
);
2873 for (auto cf
: column_families_
) {
2876 column_families_
.clear();
2878 #ifndef ROCKSDB_LITE
2879 if (thread
->rand
.OneIn(2)) {
2880 Status s
= db_
->Close();
2882 fprintf(stderr
, "Non-ok close status: %s\n", s
.ToString().c_str());
2890 #ifndef ROCKSDB_LITE
2894 num_times_reopened_
++;
2895 auto now
= clock_
->NowMicros();
2896 fprintf(stdout
, "%s Reopening database for the %dth time\n",
2897 clock_
->TimeToString(now
/ 1000000).c_str(), num_times_reopened_
);
2898 Open(thread
->shared
);
2900 if ((FLAGS_sync_fault_injection
|| FLAGS_disable_wal
||
2901 FLAGS_manual_wal_flush_one_in
> 0) &&
2903 Status s
= thread
->shared
->SaveAtAndAfter(db_
);
2905 fprintf(stderr
, "Error enabling history tracing: %s\n",
2906 s
.ToString().c_str());
2912 bool StressTest::MaybeUseOlderTimestampForPointLookup(ThreadState
* thread
,
2913 std::string
& ts_str
,
2915 ReadOptions
& read_opts
) {
2916 if (FLAGS_user_timestamp_size
== 0) {
2921 if (!thread
->rand
.OneInOpt(3)) {
2925 const SharedState
* const shared
= thread
->shared
;
2927 const uint64_t start_ts
= shared
->GetStartTimestamp();
2929 uint64_t now
= db_stress_env
->NowNanos();
2931 assert(now
> start_ts
);
2932 uint64_t time_diff
= now
- start_ts
;
2933 uint64_t ts
= start_ts
+ (thread
->rand
.Next64() % time_diff
);
2935 PutFixed64(&ts_str
, ts
);
2937 read_opts
.timestamp
= &ts_slice
;
2941 void StressTest::MaybeUseOlderTimestampForRangeScan(ThreadState
* thread
,
2942 std::string
& ts_str
,
2944 ReadOptions
& read_opts
) {
2945 if (FLAGS_user_timestamp_size
== 0) {
2950 if (!thread
->rand
.OneInOpt(3)) {
2954 const Slice
* const saved_ts
= read_opts
.timestamp
;
2955 assert(saved_ts
!= nullptr);
2957 const SharedState
* const shared
= thread
->shared
;
2959 const uint64_t start_ts
= shared
->GetStartTimestamp();
2961 uint64_t now
= db_stress_env
->NowNanos();
2963 assert(now
> start_ts
);
2964 uint64_t time_diff
= now
- start_ts
;
2965 uint64_t ts
= start_ts
+ (thread
->rand
.Next64() % time_diff
);
2967 PutFixed64(&ts_str
, ts
);
2969 read_opts
.timestamp
= &ts_slice
;
2971 // TODO (yanqin): support Merge with iter_start_ts
2972 if (!thread
->rand
.OneInOpt(3) || FLAGS_use_merge
|| FLAGS_use_full_merge_v1
) {
2977 PutFixed64(&ts_str
, start_ts
);
2979 read_opts
.iter_start_ts
= &ts_slice
;
2980 read_opts
.timestamp
= saved_ts
;
2983 void CheckAndSetOptionsForUserTimestamp(Options
& options
) {
2984 assert(FLAGS_user_timestamp_size
> 0);
2985 const Comparator
* const cmp
= test::BytewiseComparatorWithU64TsWrapper();
2987 if (FLAGS_user_timestamp_size
!= cmp
->timestamp_size()) {
2989 "Only -user_timestamp_size=%d is supported in stress test.\n",
2990 static_cast<int>(cmp
->timestamp_size()));
2993 if (FLAGS_use_txn
) {
2994 fprintf(stderr
, "TransactionDB does not support timestamp yet.\n");
2997 #ifndef ROCKSDB_LITE
2998 if (FLAGS_enable_blob_files
|| FLAGS_use_blob_db
) {
2999 fprintf(stderr
, "BlobDB not supported with timestamp.\n");
3002 #endif // !ROCKSDB_LITE
3003 if (FLAGS_test_cf_consistency
|| FLAGS_test_batches_snapshots
) {
3005 "Due to per-key ts-seq ordering constraint, only the (default) "
3006 "non-batched test is supported with timestamp.\n");
3009 if (FLAGS_ingest_external_file_one_in
> 0) {
3010 fprintf(stderr
, "Bulk loading may not support timestamp yet.\n");
3013 options
.comparator
= cmp
;
3016 bool InitializeOptionsFromFile(Options
& options
) {
3017 #ifndef ROCKSDB_LITE
3018 DBOptions db_options
;
3019 std::vector
<ColumnFamilyDescriptor
> cf_descriptors
;
3020 if (!FLAGS_options_file
.empty()) {
3021 Status s
= LoadOptionsFromFile(FLAGS_options_file
, db_stress_env
,
3022 &db_options
, &cf_descriptors
);
3024 fprintf(stderr
, "Unable to load options file %s --- %s\n",
3025 FLAGS_options_file
.c_str(), s
.ToString().c_str());
3028 db_options
.env
= new DbStressEnvWrapper(db_stress_env
);
3029 options
= Options(db_options
, cf_descriptors
[0].options
);
3034 fprintf(stderr
, "--options_file not supported in lite mode\n");
3036 #endif //! ROCKSDB_LITE
3040 void InitializeOptionsFromFlags(
3041 const std::shared_ptr
<Cache
>& cache
,
3042 const std::shared_ptr
<Cache
>& block_cache_compressed
,
3043 const std::shared_ptr
<const FilterPolicy
>& filter_policy
,
3045 BlockBasedTableOptions block_based_options
;
3046 block_based_options
.block_cache
= cache
;
3047 block_based_options
.cache_index_and_filter_blocks
=
3048 FLAGS_cache_index_and_filter_blocks
;
3049 block_based_options
.metadata_cache_options
.top_level_index_pinning
=
3050 static_cast<PinningTier
>(FLAGS_top_level_index_pinning
);
3051 block_based_options
.metadata_cache_options
.partition_pinning
=
3052 static_cast<PinningTier
>(FLAGS_partition_pinning
);
3053 block_based_options
.metadata_cache_options
.unpartitioned_pinning
=
3054 static_cast<PinningTier
>(FLAGS_unpartitioned_pinning
);
3055 block_based_options
.block_cache_compressed
= block_cache_compressed
;
3056 block_based_options
.checksum
= checksum_type_e
;
3057 block_based_options
.block_size
= FLAGS_block_size
;
3058 block_based_options
.cache_usage_options
.options_overrides
.insert(
3059 {CacheEntryRole::kCompressionDictionaryBuildingBuffer
,
3060 {/*.charged = */ FLAGS_charge_compression_dictionary_building_buffer
3061 ? CacheEntryRoleOptions::Decision::kEnabled
3062 : CacheEntryRoleOptions::Decision::kDisabled
}});
3063 block_based_options
.cache_usage_options
.options_overrides
.insert(
3064 {CacheEntryRole::kFilterConstruction
,
3065 {/*.charged = */ FLAGS_charge_filter_construction
3066 ? CacheEntryRoleOptions::Decision::kEnabled
3067 : CacheEntryRoleOptions::Decision::kDisabled
}});
3068 block_based_options
.cache_usage_options
.options_overrides
.insert(
3069 {CacheEntryRole::kBlockBasedTableReader
,
3070 {/*.charged = */ FLAGS_charge_table_reader
3071 ? CacheEntryRoleOptions::Decision::kEnabled
3072 : CacheEntryRoleOptions::Decision::kDisabled
}});
3073 block_based_options
.cache_usage_options
.options_overrides
.insert(
3074 {CacheEntryRole::kFileMetadata
,
3075 {/*.charged = */ FLAGS_charge_file_metadata
3076 ? CacheEntryRoleOptions::Decision::kEnabled
3077 : CacheEntryRoleOptions::Decision::kDisabled
}});
3078 block_based_options
.cache_usage_options
.options_overrides
.insert(
3079 {CacheEntryRole::kBlobCache
,
3080 {/*.charged = */ FLAGS_charge_blob_cache
3081 ? CacheEntryRoleOptions::Decision::kEnabled
3082 : CacheEntryRoleOptions::Decision::kDisabled
}});
3083 block_based_options
.format_version
=
3084 static_cast<uint32_t>(FLAGS_format_version
);
3085 block_based_options
.index_block_restart_interval
=
3086 static_cast<int32_t>(FLAGS_index_block_restart_interval
);
3087 block_based_options
.filter_policy
= filter_policy
;
3088 block_based_options
.partition_filters
= FLAGS_partition_filters
;
3089 block_based_options
.optimize_filters_for_memory
=
3090 FLAGS_optimize_filters_for_memory
;
3091 block_based_options
.detect_filter_construct_corruption
=
3092 FLAGS_detect_filter_construct_corruption
;
3093 block_based_options
.index_type
=
3094 static_cast<BlockBasedTableOptions::IndexType
>(FLAGS_index_type
);
3095 block_based_options
.data_block_index_type
=
3096 static_cast<BlockBasedTableOptions::DataBlockIndexType
>(
3097 FLAGS_data_block_index_type
);
3098 block_based_options
.prepopulate_block_cache
=
3099 static_cast<BlockBasedTableOptions::PrepopulateBlockCache
>(
3100 FLAGS_prepopulate_block_cache
);
3101 block_based_options
.initial_auto_readahead_size
=
3102 FLAGS_initial_auto_readahead_size
;
3103 block_based_options
.max_auto_readahead_size
= FLAGS_max_auto_readahead_size
;
3104 block_based_options
.num_file_reads_for_auto_readahead
=
3105 FLAGS_num_file_reads_for_auto_readahead
;
3106 options
.table_factory
.reset(NewBlockBasedTableFactory(block_based_options
));
3107 options
.db_write_buffer_size
= FLAGS_db_write_buffer_size
;
3108 options
.write_buffer_size
= FLAGS_write_buffer_size
;
3109 options
.max_write_buffer_number
= FLAGS_max_write_buffer_number
;
3110 options
.min_write_buffer_number_to_merge
=
3111 FLAGS_min_write_buffer_number_to_merge
;
3112 options
.max_write_buffer_number_to_maintain
=
3113 FLAGS_max_write_buffer_number_to_maintain
;
3114 options
.max_write_buffer_size_to_maintain
=
3115 FLAGS_max_write_buffer_size_to_maintain
;
3116 options
.memtable_prefix_bloom_size_ratio
=
3117 FLAGS_memtable_prefix_bloom_size_ratio
;
3118 options
.memtable_whole_key_filtering
= FLAGS_memtable_whole_key_filtering
;
3119 options
.disable_auto_compactions
= FLAGS_disable_auto_compactions
;
3120 options
.max_background_compactions
= FLAGS_max_background_compactions
;
3121 options
.max_background_flushes
= FLAGS_max_background_flushes
;
3122 options
.compaction_style
=
3123 static_cast<ROCKSDB_NAMESPACE::CompactionStyle
>(FLAGS_compaction_style
);
3124 options
.compaction_pri
=
3125 static_cast<ROCKSDB_NAMESPACE::CompactionPri
>(FLAGS_compaction_pri
);
3126 options
.num_levels
= FLAGS_num_levels
;
3127 if (FLAGS_prefix_size
>= 0) {
3128 options
.prefix_extractor
.reset(NewFixedPrefixTransform(FLAGS_prefix_size
));
3130 options
.max_open_files
= FLAGS_open_files
;
3131 options
.statistics
= dbstats
;
3132 options
.env
= db_stress_env
;
3133 options
.use_fsync
= FLAGS_use_fsync
;
3134 options
.compaction_readahead_size
= FLAGS_compaction_readahead_size
;
3135 options
.allow_mmap_reads
= FLAGS_mmap_read
;
3136 options
.allow_mmap_writes
= FLAGS_mmap_write
;
3137 options
.use_direct_reads
= FLAGS_use_direct_reads
;
3138 options
.use_direct_io_for_flush_and_compaction
=
3139 FLAGS_use_direct_io_for_flush_and_compaction
;
3140 options
.recycle_log_file_num
=
3141 static_cast<size_t>(FLAGS_recycle_log_file_num
);
3142 options
.target_file_size_base
= FLAGS_target_file_size_base
;
3143 options
.target_file_size_multiplier
= FLAGS_target_file_size_multiplier
;
3144 options
.max_bytes_for_level_base
= FLAGS_max_bytes_for_level_base
;
3145 options
.max_bytes_for_level_multiplier
= FLAGS_max_bytes_for_level_multiplier
;
3146 options
.level0_stop_writes_trigger
= FLAGS_level0_stop_writes_trigger
;
3147 options
.level0_slowdown_writes_trigger
= FLAGS_level0_slowdown_writes_trigger
;
3148 options
.level0_file_num_compaction_trigger
=
3149 FLAGS_level0_file_num_compaction_trigger
;
3150 options
.compression
= compression_type_e
;
3151 options
.bottommost_compression
= bottommost_compression_type_e
;
3152 options
.compression_opts
.max_dict_bytes
= FLAGS_compression_max_dict_bytes
;
3153 options
.compression_opts
.zstd_max_train_bytes
=
3154 FLAGS_compression_zstd_max_train_bytes
;
3155 options
.compression_opts
.parallel_threads
=
3156 FLAGS_compression_parallel_threads
;
3157 options
.compression_opts
.max_dict_buffer_bytes
=
3158 FLAGS_compression_max_dict_buffer_bytes
;
3159 if (ZSTD_FinalizeDictionarySupported()) {
3160 options
.compression_opts
.use_zstd_dict_trainer
=
3161 FLAGS_compression_use_zstd_dict_trainer
;
3162 } else if (!FLAGS_compression_use_zstd_dict_trainer
) {
3165 "WARNING: use_zstd_dict_trainer is false but zstd finalizeDictionary "
3166 "cannot be used because ZSTD 1.4.5+ is not linked with the binary."
3167 " zstd dictionary trainer will be used.\n");
3169 options
.max_manifest_file_size
= FLAGS_max_manifest_file_size
;
3170 options
.inplace_update_support
= FLAGS_in_place_update
;
3171 options
.max_subcompactions
= static_cast<uint32_t>(FLAGS_subcompactions
);
3172 options
.allow_concurrent_memtable_write
=
3173 FLAGS_allow_concurrent_memtable_write
;
3174 options
.experimental_mempurge_threshold
=
3175 FLAGS_experimental_mempurge_threshold
;
3176 options
.periodic_compaction_seconds
= FLAGS_periodic_compaction_seconds
;
3177 options
.stats_dump_period_sec
=
3178 static_cast<unsigned int>(FLAGS_stats_dump_period_sec
);
3179 options
.ttl
= FLAGS_compaction_ttl
;
3180 options
.enable_pipelined_write
= FLAGS_enable_pipelined_write
;
3181 options
.enable_write_thread_adaptive_yield
=
3182 FLAGS_enable_write_thread_adaptive_yield
;
3183 options
.compaction_options_universal
.size_ratio
= FLAGS_universal_size_ratio
;
3184 options
.compaction_options_universal
.min_merge_width
=
3185 FLAGS_universal_min_merge_width
;
3186 options
.compaction_options_universal
.max_merge_width
=
3187 FLAGS_universal_max_merge_width
;
3188 options
.compaction_options_universal
.max_size_amplification_percent
=
3189 FLAGS_universal_max_size_amplification_percent
;
3190 options
.atomic_flush
= FLAGS_atomic_flush
;
3191 options
.manual_wal_flush
= FLAGS_manual_wal_flush_one_in
> 0 ? true : false;
3192 options
.avoid_unnecessary_blocking_io
= FLAGS_avoid_unnecessary_blocking_io
;
3193 options
.write_dbid_to_manifest
= FLAGS_write_dbid_to_manifest
;
3194 options
.avoid_flush_during_recovery
= FLAGS_avoid_flush_during_recovery
;
3195 options
.max_write_batch_group_size_bytes
=
3196 FLAGS_max_write_batch_group_size_bytes
;
3197 options
.level_compaction_dynamic_level_bytes
=
3198 FLAGS_level_compaction_dynamic_level_bytes
;
3199 options
.track_and_verify_wals_in_manifest
= true;
3200 options
.verify_sst_unique_id_in_manifest
=
3201 FLAGS_verify_sst_unique_id_in_manifest
;
3202 options
.memtable_protection_bytes_per_key
=
3203 FLAGS_memtable_protection_bytes_per_key
;
3205 // Integrated BlobDB
3206 options
.enable_blob_files
= FLAGS_enable_blob_files
;
3207 options
.min_blob_size
= FLAGS_min_blob_size
;
3208 options
.blob_file_size
= FLAGS_blob_file_size
;
3209 options
.blob_compression_type
=
3210 StringToCompressionType(FLAGS_blob_compression_type
.c_str());
3211 options
.enable_blob_garbage_collection
= FLAGS_enable_blob_garbage_collection
;
3212 options
.blob_garbage_collection_age_cutoff
=
3213 FLAGS_blob_garbage_collection_age_cutoff
;
3214 options
.blob_garbage_collection_force_threshold
=
3215 FLAGS_blob_garbage_collection_force_threshold
;
3216 options
.blob_compaction_readahead_size
= FLAGS_blob_compaction_readahead_size
;
3217 options
.blob_file_starting_level
= FLAGS_blob_file_starting_level
;
3219 if (FLAGS_use_blob_cache
) {
3220 if (FLAGS_use_shared_block_and_blob_cache
) {
3221 options
.blob_cache
= cache
;
3223 if (FLAGS_blob_cache_size
> 0) {
3225 co
.capacity
= FLAGS_blob_cache_size
;
3226 co
.num_shard_bits
= FLAGS_blob_cache_numshardbits
;
3227 options
.blob_cache
= NewLRUCache(co
);
3230 "Unable to create a standalone blob cache if blob_cache_size "
3235 switch (FLAGS_prepopulate_blob_cache
) {
3237 options
.prepopulate_blob_cache
= PrepopulateBlobCache::kDisable
;
3240 options
.prepopulate_blob_cache
= PrepopulateBlobCache::kFlushOnly
;
3243 fprintf(stderr
, "Unknown prepopulate blob cache mode\n");
3248 options
.wal_compression
=
3249 StringToCompressionType(FLAGS_wal_compression
.c_str());
3251 if (FLAGS_enable_tiered_storage
) {
3252 options
.bottommost_temperature
= Temperature::kCold
;
3254 options
.preclude_last_level_data_seconds
=
3255 FLAGS_preclude_last_level_data_seconds
;
3256 options
.preserve_internal_time_seconds
= FLAGS_preserve_internal_time_seconds
;
3258 switch (FLAGS_rep_factory
) {
3260 // no need to do anything
3262 #ifndef ROCKSDB_LITE
3264 options
.memtable_factory
.reset(NewHashSkipListRepFactory(10000));
3267 options
.memtable_factory
.reset(new VectorRepFactory());
3272 "RocksdbLite only supports skip list mem table. Skip "
3274 #endif // ROCKSDB_LITE
3277 if (FLAGS_use_full_merge_v1
) {
3278 options
.merge_operator
= MergeOperators::CreateDeprecatedPutOperator();
3280 options
.merge_operator
= MergeOperators::CreatePutOperator();
3283 if (FLAGS_enable_compaction_filter
) {
3284 options
.compaction_filter_factory
=
3285 std::make_shared
<DbStressCompactionFilterFactory
>();
3288 options
.best_efforts_recovery
= FLAGS_best_efforts_recovery
;
3289 options
.paranoid_file_checks
= FLAGS_paranoid_file_checks
;
3290 options
.fail_if_options_file_error
= FLAGS_fail_if_options_file_error
;
3292 if (FLAGS_user_timestamp_size
> 0) {
3293 CheckAndSetOptionsForUserTimestamp(options
);
3296 options
.allow_data_in_errors
= FLAGS_allow_data_in_errors
;
3299 void InitializeOptionsGeneral(
3300 const std::shared_ptr
<Cache
>& cache
,
3301 const std::shared_ptr
<Cache
>& block_cache_compressed
,
3302 const std::shared_ptr
<const FilterPolicy
>& filter_policy
,
3304 options
.create_missing_column_families
= true;
3305 options
.create_if_missing
= true;
3307 if (!options
.statistics
) {
3308 options
.statistics
= dbstats
;
3311 if (options
.env
== Options().env
) {
3312 options
.env
= db_stress_env
;
3315 assert(options
.table_factory
);
3316 auto table_options
=
3317 options
.table_factory
->GetOptions
<BlockBasedTableOptions
>();
3318 if (table_options
) {
3319 if (FLAGS_cache_size
> 0) {
3320 table_options
->block_cache
= cache
;
3322 if (!table_options
->block_cache_compressed
&&
3323 FLAGS_compressed_cache_size
> 0) {
3324 table_options
->block_cache_compressed
= block_cache_compressed
;
3326 if (!table_options
->filter_policy
) {
3327 table_options
->filter_policy
= filter_policy
;
3331 // TODO: row_cache, thread-pool IO priority, CPU priority.
3333 if (!options
.rate_limiter
) {
3334 if (FLAGS_rate_limiter_bytes_per_sec
> 0) {
3335 options
.rate_limiter
.reset(NewGenericRateLimiter(
3336 FLAGS_rate_limiter_bytes_per_sec
, 1000 /* refill_period_us */,
3338 FLAGS_rate_limit_bg_reads
? RateLimiter::Mode::kReadsOnly
3339 : RateLimiter::Mode::kWritesOnly
));
3343 if (!options
.file_checksum_gen_factory
) {
3344 options
.file_checksum_gen_factory
=
3345 GetFileChecksumImpl(FLAGS_file_checksum_impl
);
3348 if (FLAGS_sst_file_manager_bytes_per_sec
> 0 ||
3349 FLAGS_sst_file_manager_bytes_per_truncate
> 0) {
3351 options
.sst_file_manager
.reset(NewSstFileManager(
3352 db_stress_env
, options
.info_log
, "" /* trash_dir */,
3353 static_cast<int64_t>(FLAGS_sst_file_manager_bytes_per_sec
),
3354 true /* delete_existing_trash */, &status
,
3355 0.25 /* max_trash_db_ratio */,
3356 FLAGS_sst_file_manager_bytes_per_truncate
));
3358 fprintf(stderr
, "SstFileManager creation failed: %s\n",
3359 status
.ToString().c_str());
3364 if (FLAGS_preserve_unverified_changes
) {
3365 if (!options
.avoid_flush_during_recovery
) {
3367 "WARNING: flipping `avoid_flush_during_recovery` to true for "
3368 "`preserve_unverified_changes` to keep all files\n");
3369 options
.avoid_flush_during_recovery
= true;
3371 // Together with `avoid_flush_during_recovery == true`, this will prevent
3372 // live files from becoming obsolete and deleted between `DB::Open()` and
3373 // `DisableFileDeletions()` due to flush or compaction. We do not need to
3374 // warn the user since we will reenable compaction soon.
3375 options
.disable_auto_compactions
= true;
3378 options
.table_properties_collector_factories
.emplace_back(
3379 std::make_shared
<DbStressTablePropertiesCollectorFactory
>());
3382 } // namespace ROCKSDB_NAMESPACE