#endif
#include <fcntl.h>
#include <inttypes.h>
+#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unordered_map>
#include "db/db_impl.h"
+#include "db/malloc_stats.h"
#include "db/version_set.h"
#include "hdfs/env_hdfs.h"
#include "monitoring/histogram.h"
#include "monitoring/statistics.h"
+#include "options/cf_options.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/cache.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/options.h"
-#include "options/cf_options.h"
#include "rocksdb/perf_context.h"
#include "rocksdb/persistent_cache.h"
#include "rocksdb/rate_limiter.h"
"compact,"
"compactall,"
"multireadrandom,"
+ "mixgraph,"
"readseq,"
"readtocache,"
"readreverse,"
"When true use Prev rather than Next for iterators that do "
"Seek and then Next");
+DEFINE_int64(max_scan_distance, 0,
+ "Used to define iterate_upper_bound (or iterate_lower_bound "
+ "if FLAGS_reverse_iterator is set to true) when value is nonzero");
+
DEFINE_bool(use_uint64_comparator, false, "use Uint64 user comparator");
DEFINE_int64(batch_size, 1, "Batch size");
DEFINE_double(memtable_bloom_size_ratio, 0,
"Ratio of memtable size used for bloom filter. 0 means no bloom "
"filter.");
+DEFINE_bool(memtable_whole_key_filtering, false,
+ "Try to use whole key bloom filter in memtables.");
DEFINE_bool(memtable_use_huge_page, false,
"Try to use huge page in memtables.");
" database. If you set this flag and also specify a benchmark that"
" wants a fresh database, that benchmark will fail.");
+DEFINE_bool(use_existing_keys, false,
+ "If true, uses existing keys in the DB, "
+ "rather than generating new ones. This involves some startup "
+ "latency to load all keys into memory. It is supported for the "
+ "same read/overwrite benchmarks as `-use_existing_db=true`, which "
+ "must also be set for this flag to be enabled. When this flag is "
+ "set, the value for `-num` will be ignored.");
+
DEFINE_bool(show_table_properties, false,
"If true, then per-level table"
" properties will be printed on every stats-interval when"
" from storage");
DEFINE_bool(statistics, false, "Database statistics");
+DEFINE_int32(stats_level, rocksdb::StatsLevel::kExceptDetailedTimers,
+ "stats level for statistics");
DEFINE_string(statistics_string, "", "Serialized statistics string");
static class std::shared_ptr<rocksdb::Statistics> dbstats;
DEFINE_uint64(delete_obsolete_files_period_micros, 0,
"Ignored. Left here for backward compatibility");
+DEFINE_int64(writes_before_delete_range, 0,
+ "Number of writes before DeleteRange is called regularly.");
+
DEFINE_int64(writes_per_range_tombstone, 0,
- "Number of writes between range "
- "tombstones");
+ "Number of writes between range tombstones");
DEFINE_int64(range_tombstone_width, 100, "Number of keys in tombstone's range");
"RocksDB options related command-line arguments, all other arguments "
"that are related to RocksDB options will be ignored:\n"
"\t--use_existing_db\n"
+ "\t--use_existing_keys\n"
"\t--statistics\n"
"\t--row_cache_size\n"
"\t--row_cache_numshardbits\n"
static enum rocksdb::CompressionType FLAGS_compression_type_e =
rocksdb::kSnappyCompression;
+DEFINE_int64(sample_for_compression, 0, "Sample every N block for compression");
+
DEFINE_int32(compression_level, rocksdb::CompressionOptions().level,
"Compression level. The meaning of this value is library-"
"dependent. If unset, we try to use the default for the library "
"If non-zero, db_bench will rate-limit the writes going into RocksDB. This "
"is the global rate in bytes/second.");
+// the parameters of mix_graph
+DEFINE_double(key_dist_a, 0.0,
+ "The parameter 'a' of key access distribution model "
+ "f(x)=a*x^b");
+DEFINE_double(key_dist_b, 0.0,
+ "The parameter 'b' of key access distribution model "
+ "f(x)=a*x^b");
+DEFINE_double(value_theta, 0.0,
+ "The parameter 'theta' of Generized Pareto Distribution "
+ "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
+DEFINE_double(value_k, 0.0,
+ "The parameter 'k' of Generized Pareto Distribution "
+ "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
+DEFINE_double(value_sigma, 0.0,
+ "The parameter 'theta' of Generized Pareto Distribution "
+ "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
+DEFINE_double(iter_theta, 0.0,
+ "The parameter 'theta' of Generized Pareto Distribution "
+ "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
+DEFINE_double(iter_k, 0.0,
+ "The parameter 'k' of Generized Pareto Distribution "
+ "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
+DEFINE_double(iter_sigma, 0.0,
+ "The parameter 'sigma' of Generized Pareto Distribution "
+ "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
+DEFINE_double(mix_get_ratio, 1.0,
+ "The ratio of Get queries of mix_graph workload");
+DEFINE_double(mix_put_ratio, 0.0,
+ "The ratio of Put queries of mix_graph workload");
+DEFINE_double(mix_seek_ratio, 0.0,
+ "The ratio of Seek queries of mix_graph workload");
+DEFINE_int64(mix_max_scan_len, 10000, "The max scan length of Iterator");
+DEFINE_int64(mix_ave_kv_size, 512,
+ "The average key-value size of this workload");
+DEFINE_int64(mix_max_value_size, 1024, "The max value size of this workload");
+DEFINE_double(
+ sine_mix_rate_noise, 0.0,
+ "Add the noise ratio to the sine rate, it is between 0.0 and 1.0");
+DEFINE_bool(sine_mix_rate, false,
+ "Enable the sine QPS control on the mix workload");
+DEFINE_uint64(
+ sine_mix_rate_interval_milliseconds, 10000,
+ "Interval of which the sine wave read_rate_limit is recalculated");
+DEFINE_int64(mix_accesses, -1,
+ "The total query accesses of mix_graph workload");
+
DEFINE_uint64(
benchmark_read_rate_limit, 0,
"If non-zero, db_bench will rate-limit the reads from RocksDB. This "
#ifndef ROCKSDB_LITE
DEFINE_bool(readonly, false, "Run read only benchmarks.");
+
+DEFINE_bool(print_malloc_stats, false,
+ "Print malloc stats to stdout after benchmarks finish.");
#endif // ROCKSDB_LITE
DEFINE_bool(disable_auto_compactions, false, "Do not auto trigger compactions");
DEFINE_bool(dump_malloc_stats, true, "Dump malloc stats in LOG ");
DEFINE_uint64(stats_dump_period_sec, rocksdb::Options().stats_dump_period_sec,
"Gap between printing stats to log in seconds");
+DEFINE_uint64(stats_persist_period_sec,
+ rocksdb::Options().stats_persist_period_sec,
+ "Gap between persisting stats in seconds");
+DEFINE_uint64(stats_history_buffer_size,
+ rocksdb::Options().stats_history_buffer_size,
+ "Max number of stats snapshots to keep in memory");
enum RepFactory {
kSkipList,
kPrefixHash,
kVectorRep,
kHashLinkedList,
- kCuckoo
};
static enum RepFactory StringToRepFactory(const char* ctype) {
return kVectorRep;
else if (!strcasecmp(ctype, "hash_linkedlist"))
return kHashLinkedList;
- else if (!strcasecmp(ctype, "cuckoo"))
- return kCuckoo;
fprintf(stdout, "Cannot parse memreptable %s\n", ctype);
return kSkipList;
counters_.bytes_written_ = 0;
}
- Status NewSequentialFile(const std::string& f, unique_ptr<SequentialFile>* r,
+ Status NewSequentialFile(const std::string& f,
+ std::unique_ptr<SequentialFile>* r,
const EnvOptions& soptions) override {
class CountingFile : public SequentialFile {
private:
- unique_ptr<SequentialFile> target_;
+ std::unique_ptr<SequentialFile> target_;
ReportFileOpCounters* counters_;
public:
- CountingFile(unique_ptr<SequentialFile>&& target,
+ CountingFile(std::unique_ptr<SequentialFile>&& target,
ReportFileOpCounters* counters)
: target_(std::move(target)), counters_(counters) {}
- virtual Status Read(size_t n, Slice* result, char* scratch) override {
+ Status Read(size_t n, Slice* result, char* scratch) override {
counters_->read_counter_.fetch_add(1, std::memory_order_relaxed);
Status rv = target_->Read(n, result, scratch);
counters_->bytes_read_.fetch_add(result->size(),
return rv;
}
- virtual Status Skip(uint64_t n) override { return target_->Skip(n); }
+ Status Skip(uint64_t n) override { return target_->Skip(n); }
};
Status s = target()->NewSequentialFile(f, r, soptions);
}
Status NewRandomAccessFile(const std::string& f,
- unique_ptr<RandomAccessFile>* r,
+ std::unique_ptr<RandomAccessFile>* r,
const EnvOptions& soptions) override {
class CountingFile : public RandomAccessFile {
private:
- unique_ptr<RandomAccessFile> target_;
+ std::unique_ptr<RandomAccessFile> target_;
ReportFileOpCounters* counters_;
public:
- CountingFile(unique_ptr<RandomAccessFile>&& target,
+ CountingFile(std::unique_ptr<RandomAccessFile>&& target,
ReportFileOpCounters* counters)
: target_(std::move(target)), counters_(counters) {}
- virtual Status Read(uint64_t offset, size_t n, Slice* result,
- char* scratch) const override {
+ Status Read(uint64_t offset, size_t n, Slice* result,
+ char* scratch) const override {
counters_->read_counter_.fetch_add(1, std::memory_order_relaxed);
Status rv = target_->Read(offset, n, result, scratch);
counters_->bytes_read_.fetch_add(result->size(),
return s;
}
- Status NewWritableFile(const std::string& f, unique_ptr<WritableFile>* r,
+ Status NewWritableFile(const std::string& f, std::unique_ptr<WritableFile>* r,
const EnvOptions& soptions) override {
class CountingFile : public WritableFile {
private:
- unique_ptr<WritableFile> target_;
+ std::unique_ptr<WritableFile> target_;
ReportFileOpCounters* counters_;
public:
- CountingFile(unique_ptr<WritableFile>&& target,
+ CountingFile(std::unique_ptr<WritableFile>&& target,
ReportFileOpCounters* counters)
: target_(std::move(target)), counters_(counters) {}
fprintf(stdout, "%-12s : %11.3f micros/op %ld ops/sec;%s%s\n",
name.ToString().c_str(),
- elapsed * 1e6 / done_,
+ seconds_ * 1e6 / done_,
(long)throughput,
(extra.empty() ? "" : " "),
extra.c_str());
int prefix_size_;
int64_t keys_per_prefix_;
int64_t entries_per_batch_;
+ int64_t writes_before_delete_range_;
int64_t writes_per_range_tombstone_;
int64_t range_tombstone_width_;
int64_t max_num_range_tombstones_;
WriteOptions write_options_;
Options open_options_; // keep options around to properly destroy db later
+#ifndef ROCKSDB_LITE
TraceOptions trace_options_;
+#endif
int64_t reads_;
int64_t deletes_;
double read_random_exp_range_;
int64_t merge_keys_;
bool report_file_operations_;
bool use_blob_db_;
+ std::vector<std::string> keys_;
class ErrorHandlerListener : public EventListener {
public:
+#ifndef ROCKSDB_LITE
ErrorHandlerListener()
: mutex_(),
cv_(&mutex_),
no_auto_recovery_(false),
recovery_complete_(false) {}
- ~ErrorHandlerListener() {}
+ ~ErrorHandlerListener() override {}
void OnErrorRecoveryBegin(BackgroundErrorReason /*reason*/,
- Status /*bg_error*/, bool* auto_recovery) {
+ Status /*bg_error*/,
+ bool* auto_recovery) override {
if (*auto_recovery && no_auto_recovery_) {
*auto_recovery = false;
}
}
- void OnErrorRecoveryCompleted(Status /*old_bg_error*/) {
+ void OnErrorRecoveryCompleted(Status /*old_bg_error*/) override {
InstrumentedMutexLock l(&mutex_);
recovery_complete_ = true;
cv_.SignalAll();
InstrumentedCondVar cv_;
bool no_auto_recovery_;
bool recovery_complete_;
+#else // ROCKSDB_LITE
+ bool WaitForRecovery(uint64_t /*abs_time_us*/) { return true; }
+ void EnableAutoRecovery(bool /*enable*/) {}
+#endif // ROCKSDB_LITE
};
std::shared_ptr<ErrorHandlerListener> listener_;
return true;
}
- inline bool CompressSlice(const CompressionContext& compression_ctx,
+ inline bool CompressSlice(const CompressionInfo& compression_info,
const Slice& input, std::string* compressed) {
bool ok = true;
switch (FLAGS_compression_type_e) {
case rocksdb::kSnappyCompression:
- ok = Snappy_Compress(compression_ctx, input.data(), input.size(),
+ ok = Snappy_Compress(compression_info, input.data(), input.size(),
compressed);
break;
case rocksdb::kZlibCompression:
- ok = Zlib_Compress(compression_ctx, 2, input.data(), input.size(),
+ ok = Zlib_Compress(compression_info, 2, input.data(), input.size(),
compressed);
break;
case rocksdb::kBZip2Compression:
- ok = BZip2_Compress(compression_ctx, 2, input.data(), input.size(),
+ ok = BZip2_Compress(compression_info, 2, input.data(), input.size(),
compressed);
break;
case rocksdb::kLZ4Compression:
- ok = LZ4_Compress(compression_ctx, 2, input.data(), input.size(),
+ ok = LZ4_Compress(compression_info, 2, input.data(), input.size(),
compressed);
break;
case rocksdb::kLZ4HCCompression:
- ok = LZ4HC_Compress(compression_ctx, 2, input.data(), input.size(),
+ ok = LZ4HC_Compress(compression_info, 2, input.data(), input.size(),
compressed);
break;
case rocksdb::kXpressCompression:
input.size(), compressed);
break;
case rocksdb::kZSTD:
- ok = ZSTD_Compress(compression_ctx, input.data(), input.size(),
+ ok = ZSTD_Compress(compression_info, input.data(), input.size(),
compressed);
break;
default:
auto compression = CompressionTypeToString(FLAGS_compression_type_e);
fprintf(stdout, "Compression: %s\n", compression.c_str());
+ fprintf(stdout, "Compression sampling rate: %" PRId64 "\n",
+ FLAGS_sample_for_compression);
switch (FLAGS_rep_factory) {
case kPrefixHash:
case kHashLinkedList:
fprintf(stdout, "Memtablerep: hash_linkedlist\n");
break;
- case kCuckoo:
- fprintf(stdout, "Memtablerep: cuckoo\n");
- break;
}
fprintf(stdout, "Perf Level: %d\n", FLAGS_perf_level);
const int len = FLAGS_block_size;
std::string input_str(len, 'y');
std::string compressed;
- CompressionContext compression_ctx(FLAGS_compression_type_e,
- Options().compression_opts);
- bool result =
- CompressSlice(compression_ctx, Slice(input_str), &compressed);
+ CompressionOptions opts;
+ CompressionContext context(FLAGS_compression_type_e);
+ CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
+ FLAGS_compression_type_e,
+ FLAGS_sample_for_compression);
+ bool result = CompressSlice(info, Slice(input_str), &compressed);
if (!result) {
fprintf(stdout, "WARNING: %s compression is not enabled\n",
class KeepFilter : public CompactionFilter {
public:
- virtual bool Filter(int /*level*/, const Slice& /*key*/,
- const Slice& /*value*/, std::string* /*new_value*/,
- bool* /*value_changed*/) const override {
+ bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
+ std::string* /*new_value*/,
+ bool* /*value_changed*/) const override {
return false;
}
- virtual const char* Name() const override { return "KeepFilter"; }
+ const char* Name() const override { return "KeepFilter"; }
};
std::shared_ptr<Cache> NewCache(int64_t capacity) {
// | key 00000 |
// ----------------------------
void GenerateKeyFromInt(uint64_t v, int64_t num_keys, Slice* key) {
+ if (!keys_.empty()) {
+ assert(FLAGS_use_existing_keys);
+ assert(keys_.size() == static_cast<size_t>(num_keys));
+ assert(v < static_cast<uint64_t>(num_keys));
+ *key = keys_[v];
+ return;
+ }
char* start = const_cast<char*>(key->data());
char* pos = start;
if (keys_per_prefix_ > 0) {
value_size_ = FLAGS_value_size;
key_size_ = FLAGS_key_size;
entries_per_batch_ = FLAGS_batch_size;
+ writes_before_delete_range_ = FLAGS_writes_before_delete_range;
writes_per_range_tombstone_ = FLAGS_writes_per_range_tombstone;
range_tombstone_width_ = FLAGS_range_tombstone_width;
max_num_range_tombstones_ = FLAGS_max_num_range_tombstones;
fprintf(stderr, "entries_per_batch = %" PRIi64 "\n",
entries_per_batch_);
method = &Benchmark::MultiReadRandom;
+ } else if (name == "mixgraph") {
+ method = &Benchmark::MixGraph;
} else if (name == "readmissing") {
++key_size_;
method = &Benchmark::ReadRandom;
}
SetPerfLevel(static_cast<PerfLevel> (shared->perf_level));
+ perf_context.EnablePerLevelPerfContext();
thread->stats.Start(thread->tid);
(arg->bm->*(arg->method))(thread);
thread->stats.Stop();
int64_t produced = 0;
bool ok = true;
std::string compressed;
- CompressionContext compression_ctx(FLAGS_compression_type_e,
- Options().compression_opts);
-
+ CompressionOptions opts;
+ CompressionContext context(FLAGS_compression_type_e);
+ CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
+ FLAGS_compression_type_e,
+ FLAGS_sample_for_compression);
// Compress 1G
while (ok && bytes < int64_t(1) << 30) {
compressed.clear();
- ok = CompressSlice(compression_ctx, input, &compressed);
+ ok = CompressSlice(info, input, &compressed);
produced += compressed.size();
bytes += input.size();
thread->stats.FinishedOps(nullptr, nullptr, 1, kCompress);
Slice input = gen.Generate(FLAGS_block_size);
std::string compressed;
+ CompressionContext compression_ctx(FLAGS_compression_type_e);
+ CompressionOptions compression_opts;
+ CompressionInfo compression_info(
+ compression_opts, compression_ctx, CompressionDict::GetEmptyDict(),
+ FLAGS_compression_type_e, FLAGS_sample_for_compression);
UncompressionContext uncompression_ctx(FLAGS_compression_type_e);
- CompressionContext compression_ctx(FLAGS_compression_type_e,
- Options().compression_opts);
+ UncompressionInfo uncompression_info(uncompression_ctx,
+ UncompressionDict::GetEmptyDict(),
+ FLAGS_compression_type_e);
- bool ok = CompressSlice(compression_ctx, input, &compressed);
+ bool ok = CompressSlice(compression_info, input, &compressed);
int64_t bytes = 0;
int decompress_size;
while (ok && bytes < 1024 * 1048576) {
- char *uncompressed = nullptr;
+ CacheAllocationPtr uncompressed;
switch (FLAGS_compression_type_e) {
case rocksdb::kSnappyCompression: {
// get size and allocate here to make comparison fair
ok = false;
break;
}
- uncompressed = new char[ulength];
+ uncompressed = AllocateBlock(ulength, nullptr);
ok = Snappy_Uncompress(compressed.data(), compressed.size(),
- uncompressed);
+ uncompressed.get());
break;
}
case rocksdb::kZlibCompression:
- uncompressed = Zlib_Uncompress(uncompression_ctx, compressed.data(),
+ uncompressed = Zlib_Uncompress(uncompression_info, compressed.data(),
compressed.size(), &decompress_size, 2);
- ok = uncompressed != nullptr;
+ ok = uncompressed.get() != nullptr;
break;
case rocksdb::kBZip2Compression:
uncompressed = BZip2_Uncompress(compressed.data(), compressed.size(),
&decompress_size, 2);
- ok = uncompressed != nullptr;
+ ok = uncompressed.get() != nullptr;
break;
case rocksdb::kLZ4Compression:
- uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(),
+ uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(),
compressed.size(), &decompress_size, 2);
- ok = uncompressed != nullptr;
+ ok = uncompressed.get() != nullptr;
break;
case rocksdb::kLZ4HCCompression:
- uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(),
+ uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(),
compressed.size(), &decompress_size, 2);
- ok = uncompressed != nullptr;
+ ok = uncompressed.get() != nullptr;
break;
case rocksdb::kXpressCompression:
- uncompressed = XPRESS_Uncompress(compressed.data(), compressed.size(),
- &decompress_size);
- ok = uncompressed != nullptr;
+ uncompressed.reset(XPRESS_Uncompress(
+ compressed.data(), compressed.size(), &decompress_size));
+ ok = uncompressed.get() != nullptr;
break;
case rocksdb::kZSTD:
- uncompressed = ZSTD_Uncompress(uncompression_ctx, compressed.data(),
+ uncompressed = ZSTD_Uncompress(uncompression_info, compressed.data(),
compressed.size(), &decompress_size);
- ok = uncompressed != nullptr;
+ ok = uncompressed.get() != nullptr;
break;
default:
ok = false;
}
- delete[] uncompressed;
bytes += input.size();
thread->stats.FinishedOps(nullptr, nullptr, 1, kUncompress);
}
options.use_direct_io_for_flush_and_compaction =
FLAGS_use_direct_io_for_flush_and_compaction;
#ifndef ROCKSDB_LITE
+ options.ttl = FLAGS_fifo_compaction_ttl;
options.compaction_options_fifo = CompactionOptionsFIFO(
FLAGS_fifo_compaction_max_table_files_size_mb * 1024 * 1024,
- FLAGS_fifo_compaction_allow_compaction, FLAGS_fifo_compaction_ttl);
+ FLAGS_fifo_compaction_allow_compaction);
#endif // ROCKSDB_LITE
if (FLAGS_prefix_size != 0) {
options.prefix_extractor.reset(
}
options.memtable_huge_page_size = FLAGS_memtable_use_huge_page ? 2048 : 0;
options.memtable_prefix_bloom_size_ratio = FLAGS_memtable_bloom_size_ratio;
+ options.memtable_whole_key_filtering = FLAGS_memtable_whole_key_filtering;
if (FLAGS_memtable_insert_with_hint_prefix_size > 0) {
options.memtable_insert_with_hint_prefix_extractor.reset(
NewCappedPrefixTransform(
new VectorRepFactory
);
break;
- case kCuckoo:
- options.memtable_factory.reset(NewHashCuckooRepFactory(
- options.write_buffer_size, FLAGS_key_size + FLAGS_value_size));
- break;
#else
default:
fprintf(stderr, "Only skip list is supported in lite mode\n");
options.level0_slowdown_writes_trigger =
FLAGS_level0_slowdown_writes_trigger;
options.compression = FLAGS_compression_type_e;
+ options.sample_for_compression = FLAGS_sample_for_compression;
options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds;
options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB;
options.max_total_wal_size = FLAGS_max_total_wal_size;
options.dump_malloc_stats = FLAGS_dump_malloc_stats;
options.stats_dump_period_sec =
static_cast<unsigned int>(FLAGS_stats_dump_period_sec);
+ options.stats_persist_period_sec =
+ static_cast<unsigned int>(FLAGS_stats_persist_period_sec);
+ options.stats_history_buffer_size =
+ static_cast<size_t>(FLAGS_stats_history_buffer_size);
options.compression_opts.level = FLAGS_compression_level;
options.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes;
options.compaction_filter = new KeepFilter();
fprintf(stdout, "A noop compaction filter is used\n");
}
+
+ if (FLAGS_use_existing_keys) {
+ // Only work on single database
+ assert(db_.db != nullptr);
+ ReadOptions read_opts;
+ read_opts.total_order_seek = true;
+ Iterator* iter = db_.db->NewIterator(read_opts);
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ keys_.emplace_back(iter->key().ToString());
+ }
+ delete iter;
+ FLAGS_num = keys_.size();
+ }
}
void Open(Options* opts) {
bytes += value_size_ + key_size_;
++num_written;
if (writes_per_range_tombstone_ > 0 &&
- num_written / writes_per_range_tombstone_ <=
+ num_written > writes_before_delete_range_ &&
+ (num_written - writes_before_delete_range_) /
+ writes_per_range_tombstone_ <=
max_num_range_tombstones_ &&
- num_written % writes_per_range_tombstone_ == 0) {
+ (num_written - writes_before_delete_range_) %
+ writes_per_range_tombstone_ ==
+ 0) {
int64_t begin_num = key_gens[id]->Next();
if (FLAGS_expand_range_tombstones) {
for (int64_t offset = 0; offset < range_tombstone_width_;
}
if (levelMeta.level == 0) {
for (auto& fileMeta : levelMeta.files) {
- fprintf(stdout, "Level[%d]: %s(size: %" PRIu64 " bytes)\n",
+ fprintf(stdout, "Level[%d]: %s(size: %" ROCKSDB_PRIszt " bytes)\n",
levelMeta.level, fileMeta.name.c_str(), fileMeta.size);
}
} else {
thread->stats.AddMessage(msg);
}
+ // THe reverse function of Pareto function
+ int64_t ParetoCdfInversion(double u, double theta, double k, double sigma) {
+ double ret;
+ if (k == 0.0) {
+ ret = theta - sigma * std::log(u);
+ } else {
+ ret = theta + sigma * (std::pow(u, -1 * k) - 1) / k;
+ }
+ return static_cast<int64_t>(ceil(ret));
+ }
+ // inversion of y=ax^b
+ int64_t PowerCdfInversion(double u, double a, double b) {
+ double ret;
+ ret = std::pow((u / a), (1 / b));
+ return static_cast<int64_t>(ceil(ret));
+ }
+
+ // Add the noice to the QPS
+ double AddNoise(double origin, double noise_ratio) {
+ if (noise_ratio < 0.0 || noise_ratio > 1.0) {
+ return origin;
+ }
+ int band_int = static_cast<int>(FLAGS_sine_a);
+ double delta = (rand() % band_int - band_int / 2) * noise_ratio;
+ if (origin + delta < 0) {
+ return origin;
+ } else {
+ return (origin + delta);
+ }
+ }
+
+ // decide the query type
+ // 0 Get, 1 Put, 2 Seek, 3 SeekForPrev, 4 Delete, 5 SingleDelete, 6 merge
+ class QueryDecider {
+ public:
+ std::vector<int> type_;
+ std::vector<double> ratio_;
+ int range_;
+
+ QueryDecider() {}
+ ~QueryDecider() {}
+
+ Status Initiate(std::vector<double> ratio_input) {
+ int range_max = 1000;
+ double sum = 0.0;
+ for (auto& ratio : ratio_input) {
+ sum += ratio;
+ }
+ range_ = 0;
+ for (auto& ratio : ratio_input) {
+ range_ += static_cast<int>(ceil(range_max * (ratio / sum)));
+ type_.push_back(range_);
+ ratio_.push_back(ratio / sum);
+ }
+ return Status::OK();
+ }
+
+ int GetType(int64_t rand_num) {
+ if (rand_num < 0) {
+ rand_num = rand_num * (-1);
+ }
+ assert(range_ != 0);
+ int pos = static_cast<int>(rand_num % range_);
+ for (int i = 0; i < static_cast<int>(type_.size()); i++) {
+ if (pos < type_[i]) {
+ return i;
+ }
+ }
+ return 0;
+ }
+ };
+
+ // The graph wokrload mixed with Get, Put, Iterator
+ void MixGraph(ThreadState* thread) {
+ int64_t read = 0; // including single gets and Next of iterators
+ int64_t gets = 0;
+ int64_t puts = 0;
+ int64_t found = 0;
+ int64_t seek = 0;
+ int64_t seek_found = 0;
+ int64_t bytes = 0;
+ const int64_t default_value_max = 1 * 1024 * 1024;
+ int64_t value_max = default_value_max;
+ int64_t scan_len_max = FLAGS_mix_max_scan_len;
+ double write_rate = 1000000.0;
+ double read_rate = 1000000.0;
+ std::vector<double> ratio{FLAGS_mix_get_ratio, FLAGS_mix_put_ratio,
+ FLAGS_mix_seek_ratio};
+ char value_buffer[default_value_max];
+ QueryDecider query;
+ RandomGenerator gen;
+ Status s;
+ if (value_max > FLAGS_mix_max_value_size) {
+ value_max = FLAGS_mix_max_value_size;
+ }
+
+ ReadOptions options(FLAGS_verify_checksum, true);
+ std::unique_ptr<const char[]> key_guard;
+ Slice key = AllocateKey(&key_guard);
+ PinnableSlice pinnable_val;
+ query.Initiate(ratio);
+
+ // the limit of qps initiation
+ if (FLAGS_sine_a != 0 || FLAGS_sine_d != 0) {
+ thread->shared->read_rate_limiter.reset(NewGenericRateLimiter(
+ read_rate, 100000 /* refill_period_us */, 10 /* fairness */,
+ RateLimiter::Mode::kReadsOnly));
+ thread->shared->write_rate_limiter.reset(
+ NewGenericRateLimiter(write_rate));
+ }
+
+ Duration duration(FLAGS_duration, reads_);
+ while (!duration.Done(1)) {
+ DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(thread);
+ int64_t rand_v, key_rand, key_seed;
+ rand_v = GetRandomKey(&thread->rand) % FLAGS_num;
+ double u = static_cast<double>(rand_v) / FLAGS_num;
+ key_seed = PowerCdfInversion(u, FLAGS_key_dist_a, FLAGS_key_dist_b);
+ Random64 rand(key_seed);
+ key_rand = static_cast<int64_t>(rand.Next()) % FLAGS_num;
+ GenerateKeyFromInt(key_rand, FLAGS_num, &key);
+ int query_type = query.GetType(rand_v);
+
+ // change the qps
+ uint64_t now = FLAGS_env->NowMicros();
+ uint64_t usecs_since_last;
+ if (now > thread->stats.GetSineInterval()) {
+ usecs_since_last = now - thread->stats.GetSineInterval();
+ } else {
+ usecs_since_last = 0;
+ }
+
+ if (usecs_since_last >
+ (FLAGS_sine_mix_rate_interval_milliseconds * uint64_t{1000})) {
+ double usecs_since_start =
+ static_cast<double>(now - thread->stats.GetStart());
+ thread->stats.ResetSineInterval();
+ double mix_rate_with_noise = AddNoise(
+ SineRate(usecs_since_start / 1000000.0), FLAGS_sine_mix_rate_noise);
+ read_rate = mix_rate_with_noise * (query.ratio_[0] + query.ratio_[2]);
+ write_rate =
+ mix_rate_with_noise * query.ratio_[1] * FLAGS_mix_ave_kv_size;
+
+ thread->shared->write_rate_limiter.reset(
+ NewGenericRateLimiter(write_rate));
+ thread->shared->read_rate_limiter.reset(NewGenericRateLimiter(
+ read_rate,
+ FLAGS_sine_mix_rate_interval_milliseconds * uint64_t{1000}, 10,
+ RateLimiter::Mode::kReadsOnly));
+ }
+ // Start the query
+ if (query_type == 0) {
+ // the Get query
+ gets++;
+ read++;
+ if (FLAGS_num_column_families > 1) {
+ s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key,
+ &pinnable_val);
+ } else {
+ pinnable_val.Reset();
+ s = db_with_cfh->db->Get(options,
+ db_with_cfh->db->DefaultColumnFamily(), key,
+ &pinnable_val);
+ }
+
+ if (s.ok()) {
+ found++;
+ bytes += key.size() + pinnable_val.size();
+ } else if (!s.IsNotFound()) {
+ fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());
+ abort();
+ }
+
+ if (thread->shared->read_rate_limiter.get() != nullptr &&
+ read % 256 == 255) {
+ thread->shared->read_rate_limiter->Request(
+ 256, Env::IO_HIGH, nullptr /* stats */,
+ RateLimiter::OpType::kRead);
+ }
+ thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kRead);
+ } else if (query_type == 1) {
+ // the Put query
+ puts++;
+ int64_t value_size = ParetoCdfInversion(
+ u, FLAGS_value_theta, FLAGS_value_k, FLAGS_value_sigma);
+ if (value_size < 0) {
+ value_size = 10;
+ } else if (value_size > value_max) {
+ value_size = value_size % value_max;
+ }
+ s = db_with_cfh->db->Put(
+ write_options_, key,
+ gen.Generate(static_cast<unsigned int>(value_size)));
+ if (!s.ok()) {
+ fprintf(stderr, "put error: %s\n", s.ToString().c_str());
+ exit(1);
+ }
+
+ if (thread->shared->write_rate_limiter) {
+ thread->shared->write_rate_limiter->Request(
+ key.size() + value_size, Env::IO_HIGH, nullptr /*stats*/,
+ RateLimiter::OpType::kWrite);
+ }
+ thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kWrite);
+ } else if (query_type == 2) {
+ // Seek query
+ if (db_with_cfh->db != nullptr) {
+ Iterator* single_iter = nullptr;
+ single_iter = db_with_cfh->db->NewIterator(options);
+ if (single_iter != nullptr) {
+ single_iter->Seek(key);
+ seek++;
+ read++;
+ if (single_iter->Valid() && single_iter->key().compare(key) == 0) {
+ seek_found++;
+ }
+ int64_t scan_length =
+ ParetoCdfInversion(u, FLAGS_iter_theta, FLAGS_iter_k,
+ FLAGS_iter_sigma) %
+ scan_len_max;
+ for (int64_t j = 0; j < scan_length && single_iter->Valid(); j++) {
+ Slice value = single_iter->value();
+ memcpy(value_buffer, value.data(),
+ std::min(value.size(), sizeof(value_buffer)));
+ bytes += single_iter->key().size() + single_iter->value().size();
+ single_iter->Next();
+ assert(single_iter->status().ok());
+ }
+ }
+ delete single_iter;
+ }
+ thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kSeek);
+ }
+ }
+ char msg[256];
+ snprintf(msg, sizeof(msg),
+ "( Gets:%" PRIu64 " Puts:%" PRIu64 " Seek:%" PRIu64 " of %" PRIu64
+ " in %" PRIu64 " found)\n",
+ gets, puts, seek, found, read);
+
+ thread->stats.AddBytes(bytes);
+ thread->stats.AddMessage(msg);
+
+ if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
+ thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
+ get_perf_context()->ToString());
+ }
+ }
+
void IteratorCreation(ThreadState* thread) {
Duration duration(FLAGS_duration, reads_);
ReadOptions options(FLAGS_verify_checksum, true);
std::unique_ptr<const char[]> key_guard;
Slice key = AllocateKey(&key_guard);
+ std::unique_ptr<const char[]> upper_bound_key_guard;
+ Slice upper_bound = AllocateKey(&upper_bound_key_guard);
+ std::unique_ptr<const char[]> lower_bound_key_guard;
+ Slice lower_bound = AllocateKey(&lower_bound_key_guard);
+
Duration duration(FLAGS_duration, reads_);
char value_buffer[256];
while (!duration.Done(1)) {
+ int64_t seek_pos = thread->rand.Next() % FLAGS_num;
+ GenerateKeyFromInt((uint64_t)seek_pos, FLAGS_num, &key);
+ if (FLAGS_max_scan_distance != 0) {
+ if (FLAGS_reverse_iterator) {
+ GenerateKeyFromInt(
+ static_cast<uint64_t>(std::max(
+ static_cast<int64_t>(0), seek_pos - FLAGS_max_scan_distance)),
+ FLAGS_num, &lower_bound);
+ options.iterate_lower_bound = &lower_bound;
+ } else {
+ GenerateKeyFromInt(
+ (uint64_t)std::min(FLAGS_num, seek_pos + FLAGS_max_scan_distance),
+ FLAGS_num, &upper_bound);
+ options.iterate_upper_bound = &upper_bound;
+ }
+ }
+
if (!FLAGS_use_tailing_iterator) {
if (db_.db != nullptr) {
delete single_iter;
iter_to_use = multi_iters[thread->rand.Next() % multi_iters.size()];
}
- GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
iter_to_use->Seek(key);
read++;
if (iter_to_use->Valid() && iter_to_use->key().compare(key) == 0) {
void Replay(ThreadState* /*thread*/, DBWithColumnFamilies* db_with_cfh) {
Status s;
- unique_ptr<TraceReader> trace_reader;
+ std::unique_ptr<TraceReader> trace_reader;
s = NewFileTraceReader(FLAGS_env, EnvOptions(), FLAGS_trace_file,
&trace_reader);
if (!s.ok()) {
if (FLAGS_statistics) {
dbstats = rocksdb::CreateDBStatistics();
}
+ if (dbstats) {
+ dbstats->set_stats_level(static_cast<StatsLevel>(FLAGS_stats_level));
+ }
FLAGS_compaction_pri_e = (rocksdb::CompactionPri)FLAGS_compaction_pri;
std::vector<std::string> fanout = rocksdb::StringSplit(
}
}
#endif // ROCKSDB_LITE
+ if (FLAGS_use_existing_keys && !FLAGS_use_existing_db) {
+ fprintf(stderr,
+ "`-use_existing_db` must be true for `-use_existing_keys` to be "
+ "settable\n");
+ exit(1);
+ }
+
if (!FLAGS_hdfs.empty()) {
FLAGS_env = new rocksdb::HdfsEnv(FLAGS_hdfs);
}
rocksdb::Benchmark benchmark;
benchmark.Run();
+
+#ifndef ROCKSDB_LITE
+ if (FLAGS_print_malloc_stats) {
+ std::string stats_string;
+ rocksdb::DumpMallocStats(&stats_string);
+ fprintf(stdout, "Malloc stats:\n%s\n", stats_string.c_str());
+ }
+#endif // ROCKSDB_LITE
+
return 0;
}
} // namespace rocksdb