BatchedOpsStressTest() {}
virtual ~BatchedOpsStressTest() {}
- // Given a key K and value V, this puts ("0"+K, "0"+V), ("1"+K, "1"+V), ...
- // ("9"+K, "9"+V) in DB atomically i.e in a single batch.
+ bool IsStateTracked() const override { return false; }
+
+ // Given a key K and value V, this puts ("0"+K, V+"0"), ("1"+K, V+"1"), ...,
+ // ("9"+K, V+"9") in DB atomically i.e in a single batch.
// Also refer BatchedOpsStressTest::TestGet
Status TestPut(ThreadState* thread, WriteOptions& write_opts,
const ReadOptions& /* read_opts */,
const std::vector<int>& rand_column_families,
- const std::vector<int64_t>& rand_keys, char (&value)[100],
- std::unique_ptr<MutexLock>& /* lock */) override {
- uint32_t value_base =
+ const std::vector<int64_t>& rand_keys,
+ char (&value)[100]) override {
+ assert(!rand_column_families.empty());
+ assert(!rand_keys.empty());
+
+ const std::string key_body = Key(rand_keys[0]);
+
+ const uint32_t value_base =
thread->rand.Next() % thread->shared->UNKNOWN_SENTINEL;
- size_t sz = GenerateValue(value_base, value, sizeof(value));
- Slice v(value, sz);
- std::string keys[10] = {"9", "8", "7", "6", "5", "4", "3", "2", "1", "0"};
- std::string values[10] = {"9", "8", "7", "6", "5", "4", "3", "2", "1", "0"};
- Slice value_slices[10];
- WriteBatch batch;
- Status s;
- auto cfh = column_families_[rand_column_families[0]];
- std::string key_str = Key(rand_keys[0]);
- for (int i = 0; i < 10; i++) {
- keys[i] += key_str;
- values[i] += v.ToString();
- value_slices[i] = values[i];
+ const size_t sz = GenerateValue(value_base, value, sizeof(value));
+ const std::string value_body = Slice(value, sz).ToString();
+
+ WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
+ FLAGS_batch_protection_bytes_per_key,
+ FLAGS_user_timestamp_size);
+
+ ColumnFamilyHandle* const cfh = column_families_[rand_column_families[0]];
+ assert(cfh);
+
+ for (int i = 9; i >= 0; --i) {
+ const std::string num = std::to_string(i);
+
+ // Note: the digit in num is prepended to the key; however, it is appended
+ // to the value because we want the "value base" to be encoded uniformly
+ // at the beginning of the value for all types of stress tests (e.g.
+ // batched, non-batched, CF consistency).
+ const std::string k = num + key_body;
+ const std::string v = value_body + num;
+
if (FLAGS_use_merge) {
- batch.Merge(cfh, keys[i], value_slices[i]);
+ batch.Merge(cfh, k, v);
+ } else if (FLAGS_use_put_entity_one_in > 0 &&
+ (value_base % FLAGS_use_put_entity_one_in) == 0) {
+ batch.PutEntity(cfh, k, GenerateWideColumns(value_base, v));
} else {
- batch.Put(cfh, keys[i], value_slices[i]);
+ batch.Put(cfh, k, v);
}
}
- s = db_->Write(write_opts, &batch);
+ const Status s = db_->Write(write_opts, &batch);
+
if (!s.ok()) {
fprintf(stderr, "multiput error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
return s;
}
- // Given a key K, this deletes ("0"+K), ("1"+K),... ("9"+K)
+ // Given a key K, this deletes ("0"+K), ("1"+K), ..., ("9"+K)
// in DB atomically i.e in a single batch. Also refer MultiGet.
Status TestDelete(ThreadState* thread, WriteOptions& writeoptions,
const std::vector<int>& rand_column_families,
- const std::vector<int64_t>& rand_keys,
- std::unique_ptr<MutexLock>& /* lock */) override {
+ const std::vector<int64_t>& rand_keys) override {
std::string keys[10] = {"9", "7", "5", "3", "1", "8", "6", "4", "2", "0"};
- WriteBatch batch;
+ WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
+ FLAGS_batch_protection_bytes_per_key,
+ FLAGS_user_timestamp_size);
Status s;
auto cfh = column_families_[rand_column_families[0]];
std::string key_str = Key(rand_keys[0]);
Status TestDeleteRange(ThreadState* /* thread */,
WriteOptions& /* write_opts */,
const std::vector<int>& /* rand_column_families */,
- const std::vector<int64_t>& /* rand_keys */,
- std::unique_ptr<MutexLock>& /* lock */) override {
+ const std::vector<int64_t>& /* rand_keys */) override {
assert(false);
return Status::NotSupported(
"BatchedOpsStressTest does not support "
void TestIngestExternalFile(
ThreadState* /* thread */,
const std::vector<int>& /* rand_column_families */,
- const std::vector<int64_t>& /* rand_keys */,
- std::unique_ptr<MutexLock>& /* lock */) override {
+ const std::vector<int64_t>& /* rand_keys */) override {
assert(false);
fprintf(stderr,
"BatchedOpsStressTest does not support "
std::terminate();
}
- // Given a key K, this gets values for "0"+K, "1"+K,..."9"+K
+ // Given a key K, this gets values for "0"+K, "1"+K, ..., "9"+K
// in the same snapshot, and verifies that all the values are of the form
- // "0"+V, "1"+V,..."9"+V.
+ // V+"0", V+"1", ..., V+"9".
// ASSUMES that BatchedOpsStressTest::TestPut was used to put (K, V) into
// the DB.
Status TestGet(ThreadState* thread, const ReadOptions& readoptions,
} else {
values[i] = from_db;
- char expected_prefix = (keys[i])[0];
- char actual_prefix = (values[i])[0];
- if (actual_prefix != expected_prefix) {
- fprintf(stderr, "error expected prefix = %c actual = %c\n",
- expected_prefix, actual_prefix);
+ assert(!keys[i].empty());
+ assert(!values[i].empty());
+
+ const char expected = keys[i].front();
+ const char actual = values[i].back();
+
+ if (expected != actual) {
+ fprintf(stderr, "get error expected = %c actual = %c\n", expected,
+ actual);
}
- (values[i])[0] = ' '; // blank out the differing character
+
+ values[i].pop_back(); // get rid of the differing character
+
thread->stats.AddGets(1, 1);
}
}
// Now that we retrieved all values, check that they all match
for (int i = 1; i < 10; i++) {
if (values[i] != values[0]) {
- fprintf(stderr, "error : inconsistent values for key %s: %s, %s\n",
+ fprintf(stderr, "get error: inconsistent values for key %s: %s, %s\n",
key.ToString(true).c_str(), StringToHex(values[0]).c_str(),
StringToHex(values[i]).c_str());
// we continue after error rather than exiting so that we can
const std::vector<int64_t>& rand_keys) override {
size_t num_keys = rand_keys.size();
std::vector<Status> ret_status(num_keys);
- std::array<std::string, 10> keys = {{"0", "1", "2", "3", "4",
- "5", "6", "7", "8", "9"}};
+ std::array<std::string, 10> keys = {
+ {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"}};
size_t num_prefixes = keys.size();
for (size_t rand_key = 0; rand_key < num_keys; ++rand_key) {
std::vector<Slice> key_slices;
std::vector<Status> statuses(num_prefixes);
ReadOptions readoptionscopy = readoptions;
readoptionscopy.snapshot = db_->GetSnapshot();
+ readoptionscopy.rate_limiter_priority =
+ FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
std::vector<std::string> key_str;
key_str.reserve(num_prefixes);
key_slices.reserve(num_prefixes);
for (size_t i = 0; i < num_prefixes; i++) {
Status s = statuses[i];
if (!s.ok() && !s.IsNotFound()) {
- fprintf(stderr, "get error: %s\n", s.ToString().c_str());
+ fprintf(stderr, "multiget error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
ret_status[rand_key] = s;
// we continue after error rather than exiting so that we can
thread->stats.AddGets(1, 0);
ret_status[rand_key] = s;
} else {
- char expected_prefix = (keys[i])[0];
- char actual_prefix = (values[i])[0];
- if (actual_prefix != expected_prefix) {
- fprintf(stderr, "error expected prefix = %c actual = %c\n",
- expected_prefix, actual_prefix);
+ assert(!keys[i].empty());
+ assert(!values[i].empty());
+
+ const char expected = keys[i][0];
+ const char actual = values[i][values[i].size() - 1];
+
+ if (expected != actual) {
+ fprintf(stderr, "multiget error expected = %c actual = %c\n",
+ expected, actual);
}
- std::string str;
- str.assign(values[i].data(), values[i].size());
- values[i].Reset();
- str[0] = ' '; // blank out the differing character
- values[i].PinSelf(str);
+
+ values[i].remove_suffix(1); // get rid of the differing character
+
thread->stats.AddGets(1, 1);
}
}
// Now that we retrieved all values, check that they all match
for (size_t i = 1; i < num_prefixes; i++) {
if (values[i] != values[0]) {
- fprintf(stderr, "error : inconsistent values for key %s: %s, %s\n",
- key_str[i].c_str(), StringToHex(values[0].ToString()).c_str(),
+ fprintf(stderr,
+ "multiget error: inconsistent values for key %s: %s, %s\n",
+ StringToHex(key_str[i]).c_str(),
+ StringToHex(values[0].ToString()).c_str(),
StringToHex(values[i].ToString()).c_str());
// we continue after error rather than exiting so that we can
// find more errors if any
return ret_status;
}
- // Given a key, this does prefix scans for "0"+P, "1"+P,..."9"+P
+ // Given a key, this does prefix scans for "0"+P, "1"+P, ..., "9"+P
// in the same snapshot where P is the first FLAGS_prefix_size - 1 bytes
// of the key. Each of these 10 scans returns a series of values;
// each series should be the same length, and it is verified for each
- // index i that all the i'th values are of the form "0"+V, "1"+V,..."9"+V.
+ // index i that all the i'th values are of the form V+"0", V+"1", ..., V+"9".
// ASSUMES that MultiPut was used to put (K, V)
Status TestPrefixScan(ThreadState* thread, const ReadOptions& readoptions,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) override {
- size_t prefix_to_use =
- (FLAGS_prefix_size < 0) ? 7 : static_cast<size_t>(FLAGS_prefix_size);
- std::string key_str = Key(rand_keys[0]);
- Slice key = key_str;
- auto cfh = column_families_[rand_column_families[0]];
- std::string prefixes[10] = {"0", "1", "2", "3", "4",
- "5", "6", "7", "8", "9"};
- Slice prefix_slices[10];
- ReadOptions readoptionscopy[10];
- const Snapshot* snapshot = db_->GetSnapshot();
- Iterator* iters[10];
- std::string upper_bounds[10];
- Slice ub_slices[10];
- Status s = Status::OK();
- for (int i = 0; i < 10; i++) {
- prefixes[i] += key.ToString();
- prefixes[i].resize(prefix_to_use);
- prefix_slices[i] = Slice(prefixes[i]);
- readoptionscopy[i] = readoptions;
- readoptionscopy[i].snapshot = snapshot;
+ assert(!rand_column_families.empty());
+ assert(!rand_keys.empty());
+
+ const std::string key = Key(rand_keys[0]);
+
+ assert(FLAGS_prefix_size > 0);
+ const size_t prefix_to_use = static_cast<size_t>(FLAGS_prefix_size);
+
+ constexpr size_t num_prefixes = 10;
+
+ std::array<std::string, num_prefixes> prefixes;
+ std::array<Slice, num_prefixes> prefix_slices;
+ std::array<ReadOptions, num_prefixes> ro_copies;
+ std::array<std::string, num_prefixes> upper_bounds;
+ std::array<Slice, num_prefixes> ub_slices;
+ std::array<std::unique_ptr<Iterator>, num_prefixes> iters;
+
+ const Snapshot* const snapshot = db_->GetSnapshot();
+
+ ColumnFamilyHandle* const cfh = column_families_[rand_column_families[0]];
+ assert(cfh);
+
+ for (size_t i = 0; i < num_prefixes; ++i) {
+ prefixes[i] = std::to_string(i) + key;
+ prefix_slices[i] = Slice(prefixes[i].data(), prefix_to_use);
+
+ ro_copies[i] = readoptions;
+ ro_copies[i].snapshot = snapshot;
if (thread->rand.OneIn(2) &&
GetNextPrefix(prefix_slices[i], &(upper_bounds[i]))) {
// For half of the time, set the upper bound to the next prefix
- ub_slices[i] = Slice(upper_bounds[i]);
- readoptionscopy[i].iterate_upper_bound = &(ub_slices[i]);
+ ub_slices[i] = upper_bounds[i];
+ ro_copies[i].iterate_upper_bound = &(ub_slices[i]);
}
- iters[i] = db_->NewIterator(readoptionscopy[i], cfh);
+
+ iters[i].reset(db_->NewIterator(ro_copies[i], cfh));
iters[i]->Seek(prefix_slices[i]);
}
- long count = 0;
+ uint64_t count = 0;
+
while (iters[0]->Valid() && iters[0]->key().starts_with(prefix_slices[0])) {
- count++;
- std::string values[10];
+ ++count;
+
+ std::array<std::string, num_prefixes> values;
+
// get list of all values for this iteration
- for (int i = 0; i < 10; i++) {
+ for (size_t i = 0; i < num_prefixes; ++i) {
// no iterator should finish before the first one
assert(iters[i]->Valid() &&
iters[i]->key().starts_with(prefix_slices[i]));
values[i] = iters[i]->value().ToString();
- char expected_first = (prefixes[i])[0];
- char actual_first = (values[i])[0];
+ // make sure the last character of the value is the expected digit
+ assert(!prefixes[i].empty());
+ assert(!values[i].empty());
+
+ const char expected = prefixes[i].front();
+ const char actual = values[i].back();
- if (actual_first != expected_first) {
- fprintf(stderr, "error expected first = %c actual = %c\n",
- expected_first, actual_first);
+ if (expected != actual) {
+ fprintf(stderr, "prefix scan error expected = %c actual = %c\n",
+ expected, actual);
}
- (values[i])[0] = ' '; // blank out the differing character
- }
- // make sure all values are equivalent
- for (int i = 0; i < 10; i++) {
+
+ values[i].pop_back(); // get rid of the differing character
+
+ // make sure all values are equivalent
if (values[i] != values[0]) {
fprintf(stderr,
- "error : %d, inconsistent values for prefix %s: %s, %s\n", i,
- prefixes[i].c_str(), StringToHex(values[0]).c_str(),
+ "prefix scan error : %" ROCKSDB_PRIszt
+ ", inconsistent values for prefix %s: %s, %s\n",
+ i, prefix_slices[i].ToString(/* hex */ true).c_str(),
+ StringToHex(values[0]).c_str(),
StringToHex(values[i]).c_str());
// we continue after error rather than exiting so that we can
// find more errors if any
}
+
+ // make sure value() and columns() are consistent
+ const WideColumns expected_columns = GenerateExpectedWideColumns(
+ GetValueBase(iters[i]->value()), iters[i]->value());
+ if (iters[i]->columns() != expected_columns) {
+ fprintf(stderr,
+ "prefix scan error : %" ROCKSDB_PRIszt
+ ", value and columns inconsistent for prefix %s: %s\n",
+ i, prefix_slices[i].ToString(/* hex */ true).c_str(),
+ DebugString(iters[i]->value(), iters[i]->columns(),
+ expected_columns)
+ .c_str());
+ }
+
iters[i]->Next();
}
}
// cleanup iterators and snapshot
- for (int i = 0; i < 10; i++) {
+ for (size_t i = 0; i < num_prefixes; ++i) {
// if the first iterator finished, they should have all finished
assert(!iters[i]->Valid() ||
!iters[i]->key().starts_with(prefix_slices[i]));
assert(iters[i]->status().ok());
- delete iters[i];
}
+
db_->ReleaseSnapshot(snapshot);
- if (s.ok()) {
- thread->stats.AddPrefixes(1, count);
- } else {
- fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str());
- thread->stats.AddErrors(1);
- }
+ thread->stats.AddPrefixes(1, count);
- return s;
+ return Status::OK();
}
void VerifyDb(ThreadState* /* thread */) const override {}
+
+ void ContinuouslyVerifyDb(ThreadState* /* thread */) const override {}
};
StressTest* CreateBatchedOpsStressTest() { return new BatchedOpsStressTest(); }