static const bool FLAGS_seed_dummy __attribute__((__unused__)) =
RegisterFlagValidator(&FLAGS_seed, &ValidateUint32Range);
+DEFINE_bool(read_only, false, "True if open DB in read-only mode during tests");
+
DEFINE_int64(max_key, 1 * KB* KB,
"Max number of key/values to place in database");
"\t(b) No long validation at the end (more speed up)\n"
"\t(c) Test snapshot and atomicity of batch writes");
+DEFINE_bool(atomic_flush, false,
+ "If set, enables atomic flush in the options.\n");
+
+DEFINE_bool(test_atomic_flush, false,
+ "If set, runs the stress test dedicated to verifying atomic flush "
+ "functionality. Setting this implies `atomic_flush=true`.\n");
+
DEFINE_int32(threads, 32, "Number of concurrent threads to run.");
DEFINE_int32(ttl, -1,
"creates prefix blooms for memtables, each with size "
"`write_buffer_size * memtable_prefix_bloom_size_ratio`.");
+DEFINE_bool(memtable_whole_key_filtering,
+ rocksdb::Options().memtable_whole_key_filtering,
+ "Enable whole key filtering in memtables.");
+
DEFINE_int32(open_files, rocksdb::Options().max_open_files,
"Maximum number of files to keep open at the same time "
"(use default if == 0)");
DEFINE_bool(disable_wal, false, "If true, do not write WAL for write.");
+DEFINE_uint64(recycle_log_file_num, rocksdb::Options().recycle_log_file_num,
+ "Number of old WAL files to keep around for later recycling");
+
DEFINE_int64(target_file_size_base, rocksdb::Options().target_file_size_base,
"Target level-1 file size for compaction");
}
}
- void AddBytesForWrites(int nwrites, size_t nbytes) {
+ void AddBytesForWrites(long nwrites, size_t nbytes) {
writes_ += nwrites;
bytes_ += nbytes;
}
- void AddGets(int ngets, int nfounds) {
+ void AddGets(long ngets, long nfounds) {
founds_ += nfounds;
gets_ += ngets;
}
- void AddPrefixes(int nprefixes, int count) {
+ void AddPrefixes(long nprefixes, long count) {
prefixes_ += nprefixes;
iterator_size_sums_ += count;
}
- void AddIterations(int n) {
- iterations_ += n;
- }
+ void AddIterations(long n) { iterations_ += n; }
- void AddDeletes(int n) {
- deletes_ += n;
- }
+ void AddDeletes(long n) { deletes_ += n; }
void AddSingleDeletes(size_t n) { single_deletes_ += n; }
- void AddRangeDeletions(int n) {
- range_deletions_ += n;
- }
+ void AddRangeDeletions(long n) { range_deletions_ += n; }
- void AddCoveredByRangeDeletions(int n) {
- covered_by_range_deletions_ += n;
- }
+ void AddCoveredByRangeDeletions(long n) { covered_by_range_deletions_ += n; }
- void AddErrors(int n) {
- errors_ += n;
- }
+ void AddErrors(long n) { errors_ += n; }
- void AddNumCompactFilesSucceed(int n) { num_compact_files_succeed_ += n; }
+ void AddNumCompactFilesSucceed(long n) { num_compact_files_succeed_ += n; }
- void AddNumCompactFilesFailed(int n) { num_compact_files_failed_ += n; }
+ void AddNumCompactFilesFailed(long n) { num_compact_files_failed_ += n; }
void Report(const char* name) {
std::string extra;
if (status.ok()) {
status = FLAGS_env->GetFileSize(FLAGS_expected_values_path, &size);
}
- unique_ptr<WritableFile> wfile;
+ std::unique_ptr<WritableFile> wfile;
if (status.ok() && size == 0) {
const EnvOptions soptions;
status = FLAGS_env->NewWritableFile(FLAGS_expected_values_path, &wfile,
}
assert(info.job_id > 0 || FLAGS_compact_files_one_in > 0);
if (info.status.ok() && info.file_size > 0) {
- assert(info.table_properties.data_size > 0);
+ assert(info.table_properties.data_size > 0 ||
+ info.table_properties.num_range_deletions > 0);
assert(info.table_properties.raw_key_size > 0);
assert(info.table_properties.num_entries > 0);
}
txn_db_(nullptr),
#endif
new_column_family_name_(1),
- num_times_reopened_(0) {
+ num_times_reopened_(0),
+ db_preload_finished_(false) {
if (FLAGS_destroy_db_initially) {
std::vector<std::string> files;
FLAGS_env->GetChildren(FLAGS_db, &files);
FLAGS_env->DeleteFile(FLAGS_db + "/" + files[i]);
}
}
- DestroyDB(FLAGS_db, Options());
+ Options options;
+ options.env = FLAGS_env;
+ Status s = DestroyDB(FLAGS_db, options);
+ if (!s.ok()) {
+ fprintf(stderr, "Cannot destroy original db: %s\n",
+ s.ToString().c_str());
+ exit(1);
+ }
}
}
Open();
BuildOptionsTable();
SharedState shared(this);
+
+ if (FLAGS_read_only) {
+ now = FLAGS_env->NowMicros();
+ fprintf(stdout, "%s Preloading db with %" PRIu64 " KVs\n",
+ FLAGS_env->TimeToString(now / 1000000).c_str(), FLAGS_max_key);
+ PreloadDbAndReopenAsReadOnly(FLAGS_max_key, &shared);
+ }
uint32_t n = shared.GetNumThreads();
now = FLAGS_env->NowMicros();
}
}
if (snap_state.key_vec != nullptr) {
+ // When `prefix_extractor` is set, seeking to beginning and scanning
+ // across prefixes are only supported with `total_order_seek` set.
+ ropt.total_order_seek = true;
std::unique_ptr<Iterator> iterator(db->NewIterator(ropt));
std::unique_ptr<std::vector<bool>> tmp_bitvec(new std::vector<bool>(FLAGS_max_key));
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
return Status::OK();
}
+ // Currently PreloadDb has to be single-threaded.
+ void PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
+ SharedState* shared) {
+ WriteOptions write_opts;
+ write_opts.disableWAL = FLAGS_disable_wal;
+ if (FLAGS_sync) {
+ write_opts.sync = true;
+ }
+ char value[100];
+ int cf_idx = 0;
+ Status s;
+ for (auto cfh : column_families_) {
+ for (int64_t k = 0; k != number_of_keys; ++k) {
+ std::string key_str = Key(k);
+ Slice key = key_str;
+ size_t sz = GenerateValue(0 /*value_base*/, value, sizeof(value));
+ Slice v(value, sz);
+ shared->Put(cf_idx, k, 0, true /* pending */);
+
+ if (FLAGS_use_merge) {
+ if (!FLAGS_use_txn) {
+ s = db_->Merge(write_opts, cfh, key, v);
+ } else {
+#ifndef ROCKSDB_LITE
+ Transaction* txn;
+ s = NewTxn(write_opts, &txn);
+ if (s.ok()) {
+ s = txn->Merge(cfh, key, v);
+ if (s.ok()) {
+ s = CommitTxn(txn);
+ }
+ }
+#endif
+ }
+ } else {
+ if (!FLAGS_use_txn) {
+ s = db_->Put(write_opts, cfh, key, v);
+ } else {
+#ifndef ROCKSDB_LITE
+ Transaction* txn;
+ s = NewTxn(write_opts, &txn);
+ if (s.ok()) {
+ s = txn->Put(cfh, key, v);
+ if (s.ok()) {
+ s = CommitTxn(txn);
+ }
+ }
+#endif
+ }
+ }
+
+ shared->Put(cf_idx, k, 0, false /* pending */);
+ if (!s.ok()) {
+ break;
+ }
+ }
+ if (!s.ok()) {
+ break;
+ }
+ ++cf_idx;
+ }
+ if (s.ok()) {
+ s = db_->Flush(FlushOptions(), column_families_);
+ }
+ if (s.ok()) {
+ for (auto cf : column_families_) {
+ delete cf;
+ }
+ column_families_.clear();
+ delete db_;
+ db_ = nullptr;
+#ifndef ROCKSDB_LITE
+ txn_db_ = nullptr;
+#endif
+
+ db_preload_finished_.store(true);
+ auto now = FLAGS_env->NowMicros();
+ fprintf(stdout, "%s Reopening database in read-only\n",
+ FLAGS_env->TimeToString(now / 1000000).c_str());
+ // Reopen as read-only, can ignore all options related to updates
+ Open();
+ } else {
+ fprintf(stderr, "Failed to preload db");
+ exit(1);
+ }
+ }
+
Status SetOptions(ThreadState* thread) {
assert(FLAGS_set_options_one_in > 0);
std::unordered_map<std::string, std::string> opts;
if (thread->shared->AllVotedReopen()) {
thread->shared->GetStressTest()->Reopen();
thread->shared->GetCondVar()->SignalAll();
- }
- else {
+ } else {
thread->shared->GetCondVar()->Wait();
}
// Commenting this out as we don't want to reset stats on each open.
MaybeClearOneColumnFamily(thread);
#ifndef ROCKSDB_LITE
- if (FLAGS_checkpoint_one_in > 0 &&
- thread->rand.Uniform(FLAGS_checkpoint_one_in) == 0) {
- std::string checkpoint_dir =
- FLAGS_db + "/.checkpoint" + ToString(thread->tid);
- DestroyDB(checkpoint_dir, Options());
- Checkpoint* checkpoint;
- Status s = Checkpoint::Create(db_, &checkpoint);
- if (s.ok()) {
- s = checkpoint->CreateCheckpoint(checkpoint_dir);
- }
- std::vector<std::string> files;
- if (s.ok()) {
- s = FLAGS_env->GetChildren(checkpoint_dir, &files);
- }
- DestroyDB(checkpoint_dir, Options());
- delete checkpoint;
- if (!s.ok()) {
- printf("A checkpoint operation failed with: %s\n",
- s.ToString().c_str());
- }
- }
-
- if (FLAGS_backup_one_in > 0 &&
- thread->rand.Uniform(FLAGS_backup_one_in) == 0) {
- std::string backup_dir = FLAGS_db + "/.backup" + ToString(thread->tid);
- BackupableDBOptions backup_opts(backup_dir);
- BackupEngine* backup_engine = nullptr;
- Status s = BackupEngine::Open(FLAGS_env, backup_opts, &backup_engine);
- if (s.ok()) {
- s = backup_engine->CreateNewBackup(db_);
- }
- if (s.ok()) {
- s = backup_engine->PurgeOldBackups(0 /* num_backups_to_keep */);
- }
- if (!s.ok()) {
- printf("A BackupEngine operation failed with: %s\n",
- s.ToString().c_str());
- }
- if (backup_engine != nullptr) {
- delete backup_engine;
- }
- }
-
if (FLAGS_compact_files_one_in > 0 &&
thread->rand.Uniform(FLAGS_compact_files_one_in) == 0) {
auto* random_cf =
auto column_family = column_families_[rand_column_family];
- if (FLAGS_flush_one_in > 0 &&
- thread->rand.Uniform(FLAGS_flush_one_in) == 0) {
- FlushOptions flush_opts;
- Status status = db_->Flush(flush_opts, column_family);
- if (!status.ok()) {
- fprintf(stdout, "Unable to perform Flush(): %s\n", status.ToString().c_str());
- }
- }
-
if (FLAGS_compact_range_one_in > 0 &&
thread->rand.Uniform(FLAGS_compact_range_one_in) == 0) {
int64_t end_key_num;
std::vector<int> rand_column_families =
GenerateColumnFamilies(FLAGS_column_families, rand_column_family);
+
+ if (FLAGS_flush_one_in > 0 &&
+ thread->rand.Uniform(FLAGS_flush_one_in) == 0) {
+ FlushOptions flush_opts;
+ std::vector<ColumnFamilyHandle*> cfhs;
+ std::for_each(
+ rand_column_families.begin(), rand_column_families.end(),
+ [this, &cfhs](int k) { cfhs.push_back(column_families_[k]); });
+ Status status = db_->Flush(flush_opts, cfhs);
+ if (!status.ok()) {
+ fprintf(stdout, "Unable to perform Flush(): %s\n",
+ status.ToString().c_str());
+ }
+ }
+
std::vector<int64_t> rand_keys = GenerateKeys(rand_key);
if (FLAGS_ingest_external_file_one_in > 0 &&
TestIngestExternalFile(thread, rand_column_families, rand_keys, lock);
}
+ if (FLAGS_backup_one_in > 0 &&
+ thread->rand.Uniform(FLAGS_backup_one_in) == 0) {
+ Status s = TestBackupRestore(thread, rand_column_families, rand_keys);
+ if (!s.ok()) {
+ VerificationAbort(shared, "Backup/restore gave inconsistent state",
+ s);
+ }
+ }
+
+ if (FLAGS_checkpoint_one_in > 0 &&
+ thread->rand.Uniform(FLAGS_checkpoint_one_in) == 0) {
+ Status s = TestCheckpoint(thread, rand_column_families, rand_keys);
+ if (!s.ok()) {
+ VerificationAbort(shared, "Checkpoint gave inconsistent state", s);
+ }
+ }
+
if (FLAGS_acquire_snapshot_one_in > 0 &&
thread->rand.Uniform(FLAGS_acquire_snapshot_one_in) == 0) {
auto snapshot = db_->GetSnapshot();
if (FLAGS_compare_full_db_state_snapshot &&
(thread->tid == 0)) {
key_vec = new std::vector<bool>(FLAGS_max_key);
+ // When `prefix_extractor` is set, seeking to beginning and scanning
+ // across prefixes are only supported with `total_order_seek` set.
+ ropt.total_order_seek = true;
std::unique_ptr<Iterator> iterator(db_->NewIterator(ropt));
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
uint64_t key_val;
return s;
}
+#ifdef ROCKSDB_LITE
+ virtual Status TestBackupRestore(
+ ThreadState* /* thread */,
+ const std::vector<int>& /* rand_column_families */,
+ const std::vector<int64_t>& /* rand_keys */) {
+ assert(false);
+ fprintf(stderr,
+ "RocksDB lite does not support "
+ "TestBackupRestore\n");
+ std::terminate();
+ }
+
+ virtual Status TestCheckpoint(
+ ThreadState* /* thread */,
+ const std::vector<int>& /* rand_column_families */,
+ const std::vector<int64_t>& /* rand_keys */) {
+ assert(false);
+ fprintf(stderr,
+ "RocksDB lite does not support "
+ "TestCheckpoint\n");
+ std::terminate();
+ }
+#else // ROCKSDB_LITE
+ virtual Status TestBackupRestore(ThreadState* thread,
+ const std::vector<int>& rand_column_families,
+ const std::vector<int64_t>& rand_keys) {
+ // Note the column families chosen by `rand_column_families` cannot be
+ // dropped while the locks for `rand_keys` are held. So we should not have
+ // to worry about accessing those column families throughout this function.
+ assert(rand_column_families.size() == rand_keys.size());
+ std::string backup_dir = FLAGS_db + "/.backup" + ToString(thread->tid);
+ std::string restore_dir = FLAGS_db + "/.restore" + ToString(thread->tid);
+ BackupableDBOptions backup_opts(backup_dir);
+ BackupEngine* backup_engine = nullptr;
+ Status s = BackupEngine::Open(FLAGS_env, backup_opts, &backup_engine);
+ if (s.ok()) {
+ s = backup_engine->CreateNewBackup(db_);
+ }
+ if (s.ok()) {
+ delete backup_engine;
+ backup_engine = nullptr;
+ s = BackupEngine::Open(FLAGS_env, backup_opts, &backup_engine);
+ }
+ if (s.ok()) {
+ s = backup_engine->RestoreDBFromLatestBackup(restore_dir /* db_dir */,
+ restore_dir /* wal_dir */);
+ }
+ if (s.ok()) {
+ s = backup_engine->PurgeOldBackups(0 /* num_backups_to_keep */);
+ }
+ DB* restored_db = nullptr;
+ std::vector<ColumnFamilyHandle*> restored_cf_handles;
+ if (s.ok()) {
+ Options restore_options(options_);
+ restore_options.listeners.clear();
+ std::vector<ColumnFamilyDescriptor> cf_descriptors;
+ // TODO(ajkr): `column_family_names_` is not safe to access here when
+ // `clear_column_family_one_in != 0`. But we can't easily switch to
+ // `ListColumnFamilies` to get names because it won't necessarily give
+ // the same order as `column_family_names_`.
+ assert(FLAGS_clear_column_family_one_in == 0);
+ for (auto name : column_family_names_) {
+ cf_descriptors.emplace_back(name, ColumnFamilyOptions(restore_options));
+ }
+ s = DB::Open(DBOptions(restore_options), restore_dir, cf_descriptors,
+ &restored_cf_handles, &restored_db);
+ }
+ // for simplicity, currently only verifies existence/non-existence of a few
+ // keys
+ for (size_t i = 0; s.ok() && i < rand_column_families.size(); ++i) {
+ std::string key_str = Key(rand_keys[i]);
+ Slice key = key_str;
+ std::string restored_value;
+ Status get_status = restored_db->Get(
+ ReadOptions(), restored_cf_handles[rand_column_families[i]], key,
+ &restored_value);
+ bool exists =
+ thread->shared->Exists(rand_column_families[i], rand_keys[i]);
+ if (get_status.ok()) {
+ if (!exists) {
+ s = Status::Corruption(
+ "key exists in restore but not in original db");
+ }
+ } else if (get_status.IsNotFound()) {
+ if (exists) {
+ s = Status::Corruption(
+ "key exists in original db but not in restore");
+ }
+ } else {
+ s = get_status;
+ }
+ }
+ if (backup_engine != nullptr) {
+ delete backup_engine;
+ backup_engine = nullptr;
+ }
+ if (restored_db != nullptr) {
+ for (auto* cf_handle : restored_cf_handles) {
+ restored_db->DestroyColumnFamilyHandle(cf_handle);
+ }
+ delete restored_db;
+ restored_db = nullptr;
+ }
+ if (!s.ok()) {
+ printf("A backup/restore operation failed with: %s\n",
+ s.ToString().c_str());
+ }
+ return s;
+ }
+
+ virtual Status TestCheckpoint(ThreadState* thread,
+ const std::vector<int>& rand_column_families,
+ const std::vector<int64_t>& rand_keys) {
+ // Note the column families chosen by `rand_column_families` cannot be
+ // dropped while the locks for `rand_keys` are held. So we should not have
+ // to worry about accessing those column families throughout this function.
+ assert(rand_column_families.size() == rand_keys.size());
+ std::string checkpoint_dir =
+ FLAGS_db + "/.checkpoint" + ToString(thread->tid);
+ DestroyDB(checkpoint_dir, Options());
+ Checkpoint* checkpoint = nullptr;
+ Status s = Checkpoint::Create(db_, &checkpoint);
+ if (s.ok()) {
+ s = checkpoint->CreateCheckpoint(checkpoint_dir);
+ }
+ std::vector<ColumnFamilyHandle*> cf_handles;
+ DB* checkpoint_db = nullptr;
+ if (s.ok()) {
+ delete checkpoint;
+ checkpoint = nullptr;
+ Options options(options_);
+ options.listeners.clear();
+ std::vector<ColumnFamilyDescriptor> cf_descs;
+ // TODO(ajkr): `column_family_names_` is not safe to access here when
+ // `clear_column_family_one_in != 0`. But we can't easily switch to
+ // `ListColumnFamilies` to get names because it won't necessarily give
+ // the same order as `column_family_names_`.
+ if (FLAGS_clear_column_family_one_in == 0) {
+ for (const auto& name : column_family_names_) {
+ cf_descs.emplace_back(name, ColumnFamilyOptions(options));
+ }
+ s = DB::OpenForReadOnly(DBOptions(options), checkpoint_dir, cf_descs,
+ &cf_handles, &checkpoint_db);
+ }
+ }
+ if (checkpoint_db != nullptr) {
+ for (size_t i = 0; s.ok() && i < rand_column_families.size(); ++i) {
+ std::string key_str = Key(rand_keys[i]);
+ Slice key = key_str;
+ std::string value;
+ Status get_status = checkpoint_db->Get(
+ ReadOptions(), cf_handles[rand_column_families[i]], key, &value);
+ bool exists =
+ thread->shared->Exists(rand_column_families[i], rand_keys[i]);
+ if (get_status.ok()) {
+ if (!exists) {
+ s = Status::Corruption(
+ "key exists in checkpoint but not in original db");
+ }
+ } else if (get_status.IsNotFound()) {
+ if (exists) {
+ s = Status::Corruption(
+ "key exists in original db but not in checkpoint");
+ }
+ } else {
+ s = get_status;
+ }
+ }
+ for (auto cfh : cf_handles) {
+ delete cfh;
+ }
+ cf_handles.clear();
+ delete checkpoint_db;
+ checkpoint_db = nullptr;
+ }
+ DestroyDB(checkpoint_dir, Options());
+ if (!s.ok()) {
+ fprintf(stderr, "A checkpoint operation failed with: %s\n",
+ s.ToString().c_str());
+ }
+ return s;
+ }
+#endif // ROCKSDB_LITE
+
void VerificationAbort(SharedState* shared, std::string msg, Status s) const {
printf("Verification failed: %s. Status is %s\n", msg.c_str(),
s.ToString().c_str());
fprintf(stdout, "Format version : %d\n", FLAGS_format_version);
fprintf(stdout, "TransactionDB : %s\n",
FLAGS_use_txn ? "true" : "false");
+ fprintf(stdout, "Read only mode : %s\n",
+ FLAGS_read_only ? "true" : "false");
+ fprintf(stdout, "Atomic flush : %s\n",
+ FLAGS_atomic_flush ? "true" : "false");
fprintf(stdout, "Column families : %d\n", FLAGS_column_families);
if (!FLAGS_test_batches_snapshots) {
fprintf(stdout, "Clear CFs one in : %d\n",
FLAGS_max_write_buffer_number_to_maintain;
options_.memtable_prefix_bloom_size_ratio =
FLAGS_memtable_prefix_bloom_size_ratio;
+ options_.memtable_whole_key_filtering =
+ FLAGS_memtable_whole_key_filtering;
options_.max_background_compactions = FLAGS_max_background_compactions;
options_.max_background_flushes = FLAGS_max_background_flushes;
options_.compaction_style =
options_.use_direct_reads = FLAGS_use_direct_reads;
options_.use_direct_io_for_flush_and_compaction =
FLAGS_use_direct_io_for_flush_and_compaction;
+ options_.recycle_log_file_num =
+ static_cast<size_t>(FLAGS_recycle_log_file_num);
options_.target_file_size_base = FLAGS_target_file_size_base;
options_.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
options_.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
FLAGS_universal_max_merge_width;
options_.compaction_options_universal.max_size_amplification_percent =
FLAGS_universal_max_size_amplification_percent;
+ options_.atomic_flush = FLAGS_atomic_flush;
} else {
#ifdef ROCKSDB_LITE
fprintf(stderr, "--options_file not supported in lite mode\n");
new DbStressListener(FLAGS_db, options_.db_paths, cf_descriptors));
options_.create_missing_column_families = true;
if (!FLAGS_use_txn) {
- s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
- &column_families_, &db_);
+ if (db_preload_finished_.load() && FLAGS_read_only) {
+ s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db, cf_descriptors,
+ &column_families_, &db_);
+ } else {
+ s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
+ &column_families_, &db_);
+ }
} else {
#ifndef ROCKSDB_LITE
TransactionDBOptions txn_db_options;
int num_times_reopened_;
std::unordered_map<std::string, std::vector<std::string>> options_table_;
std::vector<std::string> options_index_;
+ std::atomic<bool> db_preload_finished_;
};
class NonBatchedOpsStressTest : public StressTest {
}
if (!thread->rand.OneIn(2)) {
// Use iterator to verify this range
- unique_ptr<Iterator> iter(
+ std::unique_ptr<Iterator> iter(
db_->NewIterator(options, column_families_[cf]));
iter->Seek(Key(start));
for (auto i = start; i < end; i++) {
}
Iterator* iter = db_->NewIterator(ro_copy, cfh);
- int64_t count = 0;
+ long count = 0;
for (iter->Seek(prefix);
iter->Valid() && iter->key().starts_with(prefix); iter->Next()) {
++count;
}
- assert(count <=
- (static_cast<int64_t>(1) << ((8 - FLAGS_prefix_size) * 8)));
+ assert(count <= (static_cast<long>(1) << ((8 - FLAGS_prefix_size) * 8)));
Status s = iter->status();
if (iter->status().ok()) {
- thread->stats.AddPrefixes(1, static_cast<int>(count));
+ thread->stats.AddPrefixes(1, count);
} else {
thread->stats.AddErrors(1);
}
iters[i]->Seek(prefix_slices[i]);
}
- int count = 0;
+ long count = 0;
while (iters[0]->Valid() && iters[0]->key().starts_with(prefix_slices[0])) {
count++;
std::string values[10];
virtual void VerifyDb(ThreadState* /* thread */) const {}
};
+class AtomicFlushStressTest : public StressTest {
+ public:
+ AtomicFlushStressTest() : batch_id_(0) {}
+
+ virtual ~AtomicFlushStressTest() {}
+
+ virtual Status TestPut(ThreadState* thread, WriteOptions& write_opts,
+ const ReadOptions& /* read_opts */,
+ const std::vector<int>& rand_column_families,
+ const std::vector<int64_t>& rand_keys,
+ char (&value)[100],
+ std::unique_ptr<MutexLock>& /* lock */) {
+ std::string key_str = Key(rand_keys[0]);
+ Slice key = key_str;
+ uint64_t value_base = batch_id_.fetch_add(1);
+ size_t sz =
+ GenerateValue(static_cast<uint32_t>(value_base), value, sizeof(value));
+ Slice v(value, sz);
+ WriteBatch batch;
+ for (auto cf : rand_column_families) {
+ ColumnFamilyHandle* cfh = column_families_[cf];
+ if (FLAGS_use_merge) {
+ batch.Merge(cfh, key, v);
+ } else { /* !FLAGS_use_merge */
+ batch.Put(cfh, key, v);
+ }
+ }
+ Status s = db_->Write(write_opts, &batch);
+ if (!s.ok()) {
+ fprintf(stderr, "multi put or merge error: %s\n", s.ToString().c_str());
+ thread->stats.AddErrors(1);
+ } else {
+ auto num = static_cast<long>(rand_column_families.size());
+ thread->stats.AddBytesForWrites(num, (sz + 1) * num);
+ }
+
+ return s;
+ }
+
+ virtual Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
+ const std::vector<int>& rand_column_families,
+ const std::vector<int64_t>& rand_keys,
+ std::unique_ptr<MutexLock>& /* lock */) {
+ std::string key_str = Key(rand_keys[0]);
+ Slice key = key_str;
+ WriteBatch batch;
+ for (auto cf : rand_column_families) {
+ ColumnFamilyHandle* cfh = column_families_[cf];
+ batch.Delete(cfh, key);
+ }
+ Status s = db_->Write(write_opts, &batch);
+ if (!s.ok()) {
+ fprintf(stderr, "multidel error: %s\n", s.ToString().c_str());
+ thread->stats.AddErrors(1);
+ } else {
+ thread->stats.AddDeletes(static_cast<long>(rand_column_families.size()));
+ }
+ return s;
+ }
+
+ virtual Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
+ const std::vector<int>& rand_column_families,
+ const std::vector<int64_t>& rand_keys,
+ std::unique_ptr<MutexLock>& /* lock */) {
+ int64_t rand_key = rand_keys[0];
+ auto shared = thread->shared;
+ int64_t max_key = shared->GetMaxKey();
+ if (rand_key > max_key - FLAGS_range_deletion_width) {
+ rand_key =
+ thread->rand.Next() % (max_key - FLAGS_range_deletion_width + 1);
+ }
+ std::string key_str = Key(rand_key);
+ Slice key = key_str;
+ std::string end_key_str = Key(rand_key + FLAGS_range_deletion_width);
+ Slice end_key = end_key_str;
+ WriteBatch batch;
+ for (auto cf : rand_column_families) {
+ ColumnFamilyHandle* cfh = column_families_[rand_column_families[cf]];
+ batch.DeleteRange(cfh, key, end_key);
+ }
+ Status s = db_->Write(write_opts, &batch);
+ if (!s.ok()) {
+ fprintf(stderr, "multi del range error: %s\n", s.ToString().c_str());
+ thread->stats.AddErrors(1);
+ } else {
+ thread->stats.AddRangeDeletions(
+ static_cast<long>(rand_column_families.size()));
+ }
+ return s;
+ }
+
+ virtual void TestIngestExternalFile(
+ ThreadState* /* thread */,
+ const std::vector<int>& /* rand_column_families */,
+ const std::vector<int64_t>& /* rand_keys */,
+ std::unique_ptr<MutexLock>& /* lock */) {
+ assert(false);
+ fprintf(stderr,
+ "AtomicFlushStressTest does not support TestIngestExternalFile "
+ "because it's not possible to verify the result\n");
+ std::terminate();
+ }
+
+ virtual Status TestGet(ThreadState* thread, const ReadOptions& readoptions,
+ const std::vector<int>& rand_column_families,
+ const std::vector<int64_t>& rand_keys) {
+ std::string key_str = Key(rand_keys[0]);
+ Slice key = key_str;
+ auto cfh =
+ column_families_[rand_column_families[thread->rand.Next() %
+ rand_column_families.size()]];
+ std::string from_db;
+ Status s = db_->Get(readoptions, cfh, key, &from_db);
+ if (s.ok()) {
+ thread->stats.AddGets(1, 1);
+ } else if (s.IsNotFound()) {
+ thread->stats.AddGets(1, 0);
+ } else {
+ thread->stats.AddErrors(1);
+ }
+ return s;
+ }
+
+ virtual Status TestPrefixScan(ThreadState* thread,
+ const ReadOptions& readoptions,
+ const std::vector<int>& rand_column_families,
+ const std::vector<int64_t>& rand_keys) {
+ std::string key_str = Key(rand_keys[0]);
+ Slice key = key_str;
+ Slice prefix = Slice(key.data(), FLAGS_prefix_size);
+
+ std::string upper_bound;
+ Slice ub_slice;
+ ReadOptions ro_copy = readoptions;
+ if (thread->rand.OneIn(2) && GetNextPrefix(prefix, &upper_bound)) {
+ ub_slice = Slice(upper_bound);
+ ro_copy.iterate_upper_bound = &ub_slice;
+ }
+ auto cfh =
+ column_families_[rand_column_families[thread->rand.Next() %
+ rand_column_families.size()]];
+ Iterator* iter = db_->NewIterator(ro_copy, cfh);
+ long count = 0;
+ for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
+ iter->Next()) {
+ ++count;
+ }
+ assert(count <= (static_cast<long>(1) << ((8 - FLAGS_prefix_size) * 8)));
+ Status s = iter->status();
+ if (s.ok()) {
+ thread->stats.AddPrefixes(1, count);
+ } else {
+ thread->stats.AddErrors(1);
+ }
+ delete iter;
+ return s;
+ }
+
+#ifdef ROCKSDB_LITE
+ virtual Status TestCheckpoint(
+ ThreadState* /* thread */,
+ const std::vector<int>& /* rand_column_families */,
+ const std::vector<int64_t>& /* rand_keys */) {
+ assert(false);
+ fprintf(stderr,
+ "RocksDB lite does not support "
+ "TestCheckpoint\n");
+ std::terminate();
+ }
+#else
+ virtual Status TestCheckpoint(
+ ThreadState* thread, const std::vector<int>& /* rand_column_families */,
+ const std::vector<int64_t>& /* rand_keys */) {
+ std::string checkpoint_dir =
+ FLAGS_db + "/.checkpoint" + ToString(thread->tid);
+ DestroyDB(checkpoint_dir, Options());
+ Checkpoint* checkpoint = nullptr;
+ Status s = Checkpoint::Create(db_, &checkpoint);
+ if (s.ok()) {
+ s = checkpoint->CreateCheckpoint(checkpoint_dir);
+ }
+ std::vector<ColumnFamilyHandle*> cf_handles;
+ DB* checkpoint_db = nullptr;
+ if (s.ok()) {
+ delete checkpoint;
+ checkpoint = nullptr;
+ Options options(options_);
+ options.listeners.clear();
+ std::vector<ColumnFamilyDescriptor> cf_descs;
+ // TODO(ajkr): `column_family_names_` is not safe to access here when
+ // `clear_column_family_one_in != 0`. But we can't easily switch to
+ // `ListColumnFamilies` to get names because it won't necessarily give
+ // the same order as `column_family_names_`.
+ if (FLAGS_clear_column_family_one_in == 0) {
+ for (const auto& name : column_family_names_) {
+ cf_descs.emplace_back(name, ColumnFamilyOptions(options));
+ }
+ s = DB::OpenForReadOnly(DBOptions(options), checkpoint_dir, cf_descs,
+ &cf_handles, &checkpoint_db);
+ }
+ }
+ if (checkpoint_db != nullptr) {
+ for (auto cfh : cf_handles) {
+ delete cfh;
+ }
+ cf_handles.clear();
+ delete checkpoint_db;
+ checkpoint_db = nullptr;
+ }
+ DestroyDB(checkpoint_dir, Options());
+ if (!s.ok()) {
+ fprintf(stderr, "A checkpoint operation failed with: %s\n",
+ s.ToString().c_str());
+ }
+ return s;
+ }
+#endif // !ROCKSDB_LITE
+
+ virtual void VerifyDb(ThreadState* thread) const {
+ ReadOptions options(FLAGS_verify_checksum, true);
+ // We must set total_order_seek to true because we are doing a SeekToFirst
+ // on a column family whose memtables may support (by default) prefix-based
+ // iterator. In this case, NewIterator with options.total_order_seek being
+ // false returns a prefix-based iterator. Calling SeekToFirst using this
+ // iterator causes the iterator to become invalid. That means we cannot
+ // iterate the memtable using this iterator any more, although the memtable
+ // contains the most up-to-date key-values.
+ options.total_order_seek = true;
+ assert(thread != nullptr);
+ auto shared = thread->shared;
+ std::vector<std::unique_ptr<Iterator> > iters(column_families_.size());
+ for (size_t i = 0; i != column_families_.size(); ++i) {
+ iters[i].reset(db_->NewIterator(options, column_families_[i]));
+ }
+ for (auto& iter : iters) {
+ iter->SeekToFirst();
+ }
+ size_t num = column_families_.size();
+ assert(num == iters.size());
+ std::vector<Status> statuses(num, Status::OK());
+ do {
+ size_t valid_cnt = 0;
+ size_t idx = 0;
+ for (auto& iter : iters) {
+ if (iter->Valid()) {
+ ++valid_cnt;
+ } else {
+ statuses[idx] = iter->status();
+ }
+ ++idx;
+ }
+ if (valid_cnt == 0) {
+ Status status;
+ for (size_t i = 0; i != num; ++i) {
+ const auto& s = statuses[i];
+ if (!s.ok()) {
+ status = s;
+ fprintf(stderr, "Iterator on cf %s has error: %s\n",
+ column_families_[i]->GetName().c_str(),
+ s.ToString().c_str());
+ shared->SetVerificationFailure();
+ }
+ }
+ if (status.ok()) {
+ fprintf(stdout, "Finished scanning all column families.\n");
+ }
+ break;
+ } else if (valid_cnt != iters.size()) {
+ for (size_t i = 0; i != num; ++i) {
+ if (!iters[i]->Valid()) {
+ if (statuses[i].ok()) {
+ fprintf(stderr, "Finished scanning cf %s\n",
+ column_families_[i]->GetName().c_str());
+ } else {
+ fprintf(stderr, "Iterator on cf %s has error: %s\n",
+ column_families_[i]->GetName().c_str(),
+ statuses[i].ToString().c_str());
+ }
+ } else {
+ fprintf(stderr, "cf %s has remaining data to scan\n",
+ column_families_[i]->GetName().c_str());
+ }
+ }
+ shared->SetVerificationFailure();
+ break;
+ }
+ // If the program reaches here, then all column families' iterators are
+ // still valid.
+ Slice key;
+ Slice value;
+ for (size_t i = 0; i != num; ++i) {
+ if (i == 0) {
+ key = iters[i]->key();
+ value = iters[i]->value();
+ } else {
+ if (key.compare(iters[i]->key()) != 0) {
+ fprintf(stderr, "Verification failed\n");
+ fprintf(stderr, "cf%s: %s => %s\n",
+ column_families_[0]->GetName().c_str(),
+ key.ToString(true /* hex */).c_str(),
+ value.ToString(/* hex */).c_str());
+ fprintf(stderr, "cf%s: %s => %s\n",
+ column_families_[i]->GetName().c_str(),
+ iters[i]->key().ToString(true /* hex */).c_str(),
+ iters[i]->value().ToString(true /* hex */).c_str());
+ shared->SetVerificationFailure();
+ }
+ }
+ }
+ for (auto& iter : iters) {
+ iter->Next();
+ }
+ } while (true);
+ }
+
+ virtual std::vector<int> GenerateColumnFamilies(
+ const int /* num_column_families */, int /* rand_column_family */) const {
+ std::vector<int> ret;
+ int num = static_cast<int>(column_families_.size());
+ int k = 0;
+ std::generate_n(back_inserter(ret), num, [&k]() -> int { return k++; });
+ return ret;
+ }
+
+ private:
+ std::atomic<int64_t> batch_id_;
+};
+
} // namespace rocksdb
int main(int argc, char** argv) {
"Error: nooverwritepercent must be 0 when using file ingestion\n");
exit(1);
}
+ if (FLAGS_clear_column_family_one_in > 0 && FLAGS_backup_one_in > 0) {
+ fprintf(stderr,
+ "Error: clear_column_family_one_in must be 0 when using backup\n");
+ exit(1);
+ }
+ if (FLAGS_test_atomic_flush) {
+ FLAGS_atomic_flush = true;
+ }
+ if (FLAGS_read_only) {
+ if (FLAGS_writepercent != 0 || FLAGS_delpercent != 0 ||
+ FLAGS_delrangepercent != 0) {
+ fprintf(stderr, "Error: updates are not supported in read only mode\n");
+ exit(1);
+ } else if (FLAGS_checkpoint_one_in > 0 &&
+ FLAGS_clear_column_family_one_in > 0) {
+ fprintf(stdout,
+ "Warn: checkpoint won't be validated since column families may "
+ "be dropped.\n");
+ }
+ }
// Choose a location for the test database if none given with --db=<path>
if (FLAGS_db.empty()) {
rocksdb_kill_prefix_blacklist = SplitString(FLAGS_kill_prefix_blacklist);
std::unique_ptr<rocksdb::StressTest> stress;
- if (FLAGS_test_batches_snapshots) {
+ if (FLAGS_test_atomic_flush) {
+ stress.reset(new rocksdb::AtomicFlushStressTest());
+ } else if (FLAGS_test_batches_snapshots) {
stress.reset(new rocksdb::BatchedOpsStressTest());
} else {
stress.reset(new rocksdb::NonBatchedOpsStressTest());