// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
-#ifndef __STDC_FORMAT_MACROS
-#define __STDC_FORMAT_MACROS
-#endif
-
#ifdef GFLAGS
#ifdef NUMA
#include <numa.h>
#include <unistd.h>
#endif
#include <fcntl.h>
-#include <inttypes.h>
-#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <atomic>
+#include <cinttypes>
#include <condition_variable>
#include <cstddef>
#include <memory>
#include <thread>
#include <unordered_map>
-#include "db/db_impl.h"
+#include "db/db_impl/db_impl.h"
#include "db/malloc_stats.h"
#include "db/version_set.h"
#include "hdfs/env_hdfs.h"
#include "rocksdb/rate_limiter.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
+#include "rocksdb/stats_history.h"
#include "rocksdb/utilities/object_registry.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
#include "rocksdb/utilities/options_util.h"
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
#include "rocksdb/write_batch.h"
+#include "test_util/testutil.h"
+#include "test_util/transaction_test_util.h"
#include "util/cast_util.h"
#include "util/compression.h"
#include "util/crc32c.h"
#include "util/random.h"
#include "util/stderr_logger.h"
#include "util/string_util.h"
-#include "util/testutil.h"
-#include "util/transaction_test_util.h"
#include "util/xxhash.h"
#include "utilities/blob_db/blob_db.h"
#include "utilities/merge_operators.h"
#include "utilities/merge_operators/bytesxor.h"
+#include "utilities/merge_operators/sortlist.h"
#include "utilities/persistent_cache/block_cache_tier.h"
#ifdef OS_WIN
"multireadrandom,"
"mixgraph,"
"readseq,"
+ "readtorowcache,"
"readtocache,"
"readreverse,"
"readwhilewriting,"
"fillseekseq,"
"randomtransaction,"
"randomreplacekeys,"
- "timeseries",
+ "timeseries,"
+ "getmergeoperands",
"Comma-separated list of operations to run in the specified"
" order. Available benchmarks:\n"
" key order and keep the shape of the LSM tree\n"
"\toverwrite -- overwrite N values in random key order in"
" async mode\n"
- "\tfillsync -- write N/100 values in random key order in "
+ "\tfillsync -- write N/1000 values in random key order in "
"sync mode\n"
"\tfill100K -- write N/1000 100K values in random order in"
" async mode\n"
"\tlevelstats -- Print the number of files and bytes per level\n"
"\tsstables -- Print sstable info\n"
"\theapprofile -- Dump a heap profile (if supported by this port)\n"
- "\treplay -- replay the trace file specified with trace_file\n");
+ "\treplay -- replay the trace file specified with trace_file\n"
+ "\tgetmergeoperands -- Insert lots of merge records which are a list of "
+ "sorted ints for a key and then compare performance of lookup for another "
+ "key "
+ "by doing a Get followed by binary searching in the large sorted list vs "
+ "doing a GetMergeOperands and binary searching in the operands which are"
+ "sorted sub-lists. The MergeOperator used is sortlist.h\n");
DEFINE_int64(num, 1000000, "Number of key/values to place in database");
DEFINE_int32(duration, 0, "Time in seconds for the random-ops tests to run."
" When 0 then num & reads determine the test duration");
-DEFINE_int32(value_size, 100, "Size of each value");
+DEFINE_string(value_size_distribution_type, "fixed",
+ "Value size distribution type: fixed, uniform, normal");
+
+DEFINE_int32(value_size, 100, "Size of each value in fixed distribution");
+static unsigned int value_size = 100;
+
+DEFINE_int32(value_size_min, 100, "Min size of random value");
+
+DEFINE_int32(value_size_max, 102400, "Max size of random value");
DEFINE_int32(seek_nexts, 0,
"How many times to call Next() after Seek() in "
"CPU and memory of same node. Use \"$numactl --hardware\" command "
"to see NUMA memory architecture.");
-DEFINE_int64(db_write_buffer_size, rocksdb::Options().db_write_buffer_size,
+DEFINE_int64(db_write_buffer_size,
+ ROCKSDB_NAMESPACE::Options().db_write_buffer_size,
"Number of bytes to buffer in all memtables before compacting");
DEFINE_bool(cost_write_buffer_to_cache, false,
"The usage of memtable is costed to the block cache");
-DEFINE_int64(write_buffer_size, rocksdb::Options().write_buffer_size,
+DEFINE_int64(write_buffer_size, ROCKSDB_NAMESPACE::Options().write_buffer_size,
"Number of bytes to buffer in memtable before compacting");
DEFINE_int32(max_write_buffer_number,
- rocksdb::Options().max_write_buffer_number,
+ ROCKSDB_NAMESPACE::Options().max_write_buffer_number,
"The number of in-memory memtables. Each memtable is of size"
" write_buffer_size bytes.");
DEFINE_int32(min_write_buffer_number_to_merge,
- rocksdb::Options().min_write_buffer_number_to_merge,
+ ROCKSDB_NAMESPACE::Options().min_write_buffer_number_to_merge,
"The minimum number of write buffers that will be merged together"
"before writing to storage. This is cheap because it is an"
"in-memory merge. If this feature is not enabled, then all these"
" in each of these individual write buffers.");
DEFINE_int32(max_write_buffer_number_to_maintain,
- rocksdb::Options().max_write_buffer_number_to_maintain,
+ ROCKSDB_NAMESPACE::Options().max_write_buffer_number_to_maintain,
"The total maximum number of write buffers to maintain in memory "
"including copies of buffers that have already been flushed. "
"Unlike max_write_buffer_number, this parameter does not affect "
"after they are flushed. If this value is set to -1, "
"'max_write_buffer_number' will be used.");
+DEFINE_int64(max_write_buffer_size_to_maintain,
+ ROCKSDB_NAMESPACE::Options().max_write_buffer_size_to_maintain,
+ "The total maximum size of write buffers to maintain in memory "
+ "including copies of buffers that have already been flushed. "
+ "Unlike max_write_buffer_number, this parameter does not affect "
+ "flushing. This controls the minimum amount of write history "
+ "that will be available in memory for conflict checking when "
+ "Transactions are used. If this value is too low, some "
+ "transactions may fail at commit time due to not being able to "
+ "determine whether there were any write conflicts. Setting this "
+ "value to 0 will cause write buffers to be freed immediately "
+ "after they are flushed. If this value is set to -1, "
+ "'max_write_buffer_number' will be used.");
+
DEFINE_int32(max_background_jobs,
- rocksdb::Options().max_background_jobs,
+ ROCKSDB_NAMESPACE::Options().max_background_jobs,
"The maximum number of concurrent background jobs that can occur "
"in parallel.");
" that can occur in parallel.");
DEFINE_int32(max_background_compactions,
- rocksdb::Options().max_background_compactions,
+ ROCKSDB_NAMESPACE::Options().max_background_compactions,
"The maximum number of concurrent background compactions"
" that can occur in parallel.");
&ValidateUint32Range);
DEFINE_int32(max_background_flushes,
- rocksdb::Options().max_background_flushes,
+ ROCKSDB_NAMESPACE::Options().max_background_flushes,
"The maximum number of concurrent background flushes"
" that can occur in parallel.");
-static rocksdb::CompactionStyle FLAGS_compaction_style_e;
-DEFINE_int32(compaction_style, (int32_t) rocksdb::Options().compaction_style,
+static ROCKSDB_NAMESPACE::CompactionStyle FLAGS_compaction_style_e;
+DEFINE_int32(compaction_style,
+ (int32_t)ROCKSDB_NAMESPACE::Options().compaction_style,
"style of compaction: level-based, universal and fifo");
-static rocksdb::CompactionPri FLAGS_compaction_pri_e;
-DEFINE_int32(compaction_pri, (int32_t)rocksdb::Options().compaction_pri,
+static ROCKSDB_NAMESPACE::CompactionPri FLAGS_compaction_pri_e;
+DEFINE_int32(compaction_pri,
+ (int32_t)ROCKSDB_NAMESPACE::Options().compaction_pri,
"priority of files to compaction: by size or by data age");
DEFINE_int32(universal_size_ratio, 0,
DEFINE_bool(partition_index, false, "Partition index blocks");
DEFINE_int64(metadata_block_size,
- rocksdb::BlockBasedTableOptions().metadata_block_size,
+ ROCKSDB_NAMESPACE::BlockBasedTableOptions().metadata_block_size,
"Max partition size when partitioning index/filters");
// The default reduces the overhead of reading time with flash. With HDD, which
"Pin top-level index of partitioned index/filter blocks in block cache.");
DEFINE_int32(block_size,
- static_cast<int32_t>(rocksdb::BlockBasedTableOptions().block_size),
+ static_cast<int32_t>(
+ ROCKSDB_NAMESPACE::BlockBasedTableOptions().block_size),
"Number of bytes in a block.");
-DEFINE_int32(
- format_version,
- static_cast<int32_t>(rocksdb::BlockBasedTableOptions().format_version),
- "Format version of SST files.");
+DEFINE_int32(format_version,
+ static_cast<int32_t>(
+ ROCKSDB_NAMESPACE::BlockBasedTableOptions().format_version),
+ "Format version of SST files.");
DEFINE_int32(block_restart_interval,
- rocksdb::BlockBasedTableOptions().block_restart_interval,
+ ROCKSDB_NAMESPACE::BlockBasedTableOptions().block_restart_interval,
"Number of keys between restart points "
"for delta encoding of keys in data block.");
-DEFINE_int32(index_block_restart_interval,
- rocksdb::BlockBasedTableOptions().index_block_restart_interval,
- "Number of keys between restart points "
- "for delta encoding of keys in index block.");
+DEFINE_int32(
+ index_block_restart_interval,
+ ROCKSDB_NAMESPACE::BlockBasedTableOptions().index_block_restart_interval,
+ "Number of keys between restart points "
+ "for delta encoding of keys in index block.");
DEFINE_int32(read_amp_bytes_per_bit,
- rocksdb::BlockBasedTableOptions().read_amp_bytes_per_bit,
+ ROCKSDB_NAMESPACE::BlockBasedTableOptions().read_amp_bytes_per_bit,
"Number of bytes per bit to be used in block read-amp bitmap");
-DEFINE_bool(enable_index_compression,
- rocksdb::BlockBasedTableOptions().enable_index_compression,
- "Compress the index block");
+DEFINE_bool(
+ enable_index_compression,
+ ROCKSDB_NAMESPACE::BlockBasedTableOptions().enable_index_compression,
+ "Compress the index block");
-DEFINE_bool(block_align, rocksdb::BlockBasedTableOptions().block_align,
+DEFINE_bool(block_align,
+ ROCKSDB_NAMESPACE::BlockBasedTableOptions().block_align,
"Align data blocks on page size");
DEFINE_bool(use_data_block_hash_index, false,
"Number of bytes to use as a cache of individual rows"
" (0 = disabled).");
-DEFINE_int32(open_files, rocksdb::Options().max_open_files,
+DEFINE_int32(open_files, ROCKSDB_NAMESPACE::Options().max_open_files,
"Maximum number of files to keep open at the same time"
" (use default if == 0)");
-DEFINE_int32(file_opening_threads, rocksdb::Options().max_file_opening_threads,
+DEFINE_int32(file_opening_threads,
+ ROCKSDB_NAMESPACE::Options().max_file_opening_threads,
"If open_files is set to -1, this option set the number of "
"threads that will be used to open files during DB::Open()");
DEFINE_int32(compaction_readahead_size, 0, "Compaction readahead size");
+DEFINE_int32(log_readahead_size, 0, "WAL and manifest readahead size");
+
DEFINE_int32(random_access_max_buffer_size, 1024 * 1024,
"Maximum windows randomaccess buffer size");
" from storage");
DEFINE_bool(statistics, false, "Database statistics");
-DEFINE_int32(stats_level, rocksdb::StatsLevel::kExceptDetailedTimers,
+DEFINE_int32(stats_level, ROCKSDB_NAMESPACE::StatsLevel::kExceptDetailedTimers,
"stats level for statistics");
DEFINE_string(statistics_string, "", "Serialized statistics string");
-static class std::shared_ptr<rocksdb::Statistics> dbstats;
+static class std::shared_ptr<ROCKSDB_NAMESPACE::Statistics> dbstats;
DEFINE_int64(writes, -1, "Number of write operations to do. If negative, do"
" --num reads.");
DEFINE_int32(num_levels, 7, "The total number of levels");
-DEFINE_int64(target_file_size_base, rocksdb::Options().target_file_size_base,
+DEFINE_int64(target_file_size_base,
+ ROCKSDB_NAMESPACE::Options().target_file_size_base,
"Target file size at level-1");
DEFINE_int32(target_file_size_multiplier,
- rocksdb::Options().target_file_size_multiplier,
+ ROCKSDB_NAMESPACE::Options().target_file_size_multiplier,
"A multiplier to compute target level-N file size (N >= 2)");
DEFINE_uint64(max_bytes_for_level_base,
- rocksdb::Options().max_bytes_for_level_base,
+ ROCKSDB_NAMESPACE::Options().max_bytes_for_level_base,
"Max bytes for level-1");
DEFINE_bool(level_compaction_dynamic_level_bytes, false,
"A vector that specifies additional fanout per level");
DEFINE_int32(level0_stop_writes_trigger,
- rocksdb::Options().level0_stop_writes_trigger,
+ ROCKSDB_NAMESPACE::Options().level0_stop_writes_trigger,
"Number of files in level-0"
" that will trigger put stop.");
DEFINE_int32(level0_slowdown_writes_trigger,
- rocksdb::Options().level0_slowdown_writes_trigger,
+ ROCKSDB_NAMESPACE::Options().level0_slowdown_writes_trigger,
"Number of files in level-0"
" that will slow down writes.");
DEFINE_int32(level0_file_num_compaction_trigger,
- rocksdb::Options().level0_file_num_compaction_trigger,
+ ROCKSDB_NAMESPACE::Options().level0_file_num_compaction_trigger,
"Number of files in level-0"
" when compactions start");
"Open a BlobDB instance. "
"Required for large value benchmark.");
-DEFINE_bool(blob_db_enable_gc, false, "Enable BlobDB garbage collection.");
+DEFINE_bool(
+ blob_db_enable_gc,
+ ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().enable_garbage_collection,
+ "Enable BlobDB garbage collection.");
-DEFINE_bool(blob_db_is_fifo, false, "Enable FIFO eviction strategy in BlobDB.");
+DEFINE_double(
+ blob_db_gc_cutoff,
+ ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().garbage_collection_cutoff,
+ "Cutoff ratio for BlobDB garbage collection.");
-DEFINE_uint64(blob_db_max_db_size, 0,
+DEFINE_bool(blob_db_is_fifo,
+ ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().is_fifo,
+ "Enable FIFO eviction strategy in BlobDB.");
+
+DEFINE_uint64(blob_db_max_db_size,
+ ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().max_db_size,
"Max size limit of the directory where blob files are stored.");
-DEFINE_uint64(blob_db_max_ttl_range, 86400,
- "TTL range to generate BlobDB data (in seconds).");
+DEFINE_uint64(
+ blob_db_max_ttl_range, 0,
+ "TTL range to generate BlobDB data (in seconds). 0 means no TTL.");
-DEFINE_uint64(blob_db_ttl_range_secs, 3600,
+DEFINE_uint64(blob_db_ttl_range_secs,
+ ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().ttl_range_secs,
"TTL bucket size to use when creating blob files.");
-DEFINE_uint64(blob_db_min_blob_size, 0,
+DEFINE_uint64(blob_db_min_blob_size,
+ ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().min_blob_size,
"Smallest blob to store in a file. Blobs smaller than this "
"will be inlined with the key in the LSM tree.");
-DEFINE_uint64(blob_db_bytes_per_sync, 0, "Bytes to sync blob file at.");
+DEFINE_uint64(blob_db_bytes_per_sync,
+ ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().bytes_per_sync,
+ "Bytes to sync blob file at.");
-DEFINE_uint64(blob_db_file_size, 256 * 1024 * 1024,
+DEFINE_uint64(blob_db_file_size,
+ ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().blob_file_size,
"Target size of each blob file.");
+DEFINE_string(blob_db_compression_type, "snappy",
+ "Algorithm to use to compress blob in blob file");
+static enum ROCKSDB_NAMESPACE::CompressionType
+ FLAGS_blob_db_compression_type_e = ROCKSDB_NAMESPACE::kSnappyCompression;
+
+// Secondary DB instance Options
+DEFINE_bool(use_secondary_db, false,
+ "Open a RocksDB secondary instance. A primary instance can be "
+ "running in another db_bench process.");
+
+DEFINE_string(secondary_path, "",
+ "Path to a directory used by the secondary instance to store "
+ "private files, e.g. info log.");
+
+DEFINE_int32(secondary_update_interval, 5,
+ "Secondary instance attempts to catch up with the primary every "
+ "secondary_update_interval seconds.");
+
#endif // ROCKSDB_LITE
DEFINE_bool(report_bg_io_stats, false,
DEFINE_string(trace_file, "", "Trace workload to a file. ");
-static enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
+DEFINE_int32(trace_replay_fast_forward, 1,
+ "Fast forward trace replay, must >= 1. ");
+DEFINE_int32(block_cache_trace_sampling_frequency, 1,
+ "Block cache trace sampling frequency, termed s. It uses spatial "
+ "downsampling and samples accesses to one out of s blocks.");
+DEFINE_int64(
+ block_cache_trace_max_trace_file_size_in_bytes,
+ uint64_t{64} * 1024 * 1024 * 1024,
+ "The maximum block cache trace file size in bytes. Block cache accesses "
+ "will not be logged if the trace file size exceeds this threshold. Default "
+ "is 64 GB.");
+DEFINE_string(block_cache_trace_file, "", "Block cache trace file path.");
+DEFINE_int32(trace_replay_threads, 1,
+ "The number of threads to replay, must >=1.");
+
+static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType(
+ const char* ctype) {
assert(ctype);
if (!strcasecmp(ctype, "none"))
- return rocksdb::kNoCompression;
+ return ROCKSDB_NAMESPACE::kNoCompression;
else if (!strcasecmp(ctype, "snappy"))
- return rocksdb::kSnappyCompression;
+ return ROCKSDB_NAMESPACE::kSnappyCompression;
else if (!strcasecmp(ctype, "zlib"))
- return rocksdb::kZlibCompression;
+ return ROCKSDB_NAMESPACE::kZlibCompression;
else if (!strcasecmp(ctype, "bzip2"))
- return rocksdb::kBZip2Compression;
+ return ROCKSDB_NAMESPACE::kBZip2Compression;
else if (!strcasecmp(ctype, "lz4"))
- return rocksdb::kLZ4Compression;
+ return ROCKSDB_NAMESPACE::kLZ4Compression;
else if (!strcasecmp(ctype, "lz4hc"))
- return rocksdb::kLZ4HCCompression;
+ return ROCKSDB_NAMESPACE::kLZ4HCCompression;
else if (!strcasecmp(ctype, "xpress"))
- return rocksdb::kXpressCompression;
+ return ROCKSDB_NAMESPACE::kXpressCompression;
else if (!strcasecmp(ctype, "zstd"))
- return rocksdb::kZSTD;
+ return ROCKSDB_NAMESPACE::kZSTD;
fprintf(stdout, "Cannot parse compression type '%s'\n", ctype);
- return rocksdb::kSnappyCompression; // default value
+ return ROCKSDB_NAMESPACE::kSnappyCompression; // default value
}
static std::string ColumnFamilyName(size_t i) {
if (i == 0) {
- return rocksdb::kDefaultColumnFamilyName;
+ return ROCKSDB_NAMESPACE::kDefaultColumnFamilyName;
} else {
char name[100];
snprintf(name, sizeof(name), "column_family_name_%06zu", i);
DEFINE_string(compression_type, "snappy",
"Algorithm to use to compress the database");
-static enum rocksdb::CompressionType FLAGS_compression_type_e =
- rocksdb::kSnappyCompression;
+static enum ROCKSDB_NAMESPACE::CompressionType FLAGS_compression_type_e =
+ ROCKSDB_NAMESPACE::kSnappyCompression;
DEFINE_int64(sample_for_compression, 0, "Sample every N block for compression");
-DEFINE_int32(compression_level, rocksdb::CompressionOptions().level,
+DEFINE_int32(compression_level, ROCKSDB_NAMESPACE::CompressionOptions().level,
"Compression level. The meaning of this value is library-"
"dependent. If unset, we try to use the default for the library "
"specified in `--compression_type`");
DEFINE_int32(compression_max_dict_bytes,
- rocksdb::CompressionOptions().max_dict_bytes,
+ ROCKSDB_NAMESPACE::CompressionOptions().max_dict_bytes,
"Maximum size of dictionary used to prime the compression "
"library.");
DEFINE_int32(compression_zstd_max_train_bytes,
- rocksdb::CompressionOptions().zstd_max_train_bytes,
+ ROCKSDB_NAMESPACE::CompressionOptions().zstd_max_train_bytes,
"Maximum size of training data passed to zstd's dictionary "
"trainer.");
#endif // ROCKSDB_LITE
DEFINE_string(hdfs, "", "Name of hdfs environment. Mutually exclusive with"
" --env_uri.");
-static rocksdb::Env* FLAGS_env = rocksdb::Env::Default();
+
+static std::shared_ptr<ROCKSDB_NAMESPACE::Env> env_guard;
+
+static ROCKSDB_NAMESPACE::Env* FLAGS_env = ROCKSDB_NAMESPACE::Env::Default();
DEFINE_int64(stats_interval, 0, "Stats are reported every N operations when "
"this is greater than zero. When 0 the interval grows over time.");
"Takes and report a snapshot of the current status of each thread"
" when this is greater than 0.");
-DEFINE_int32(perf_level, rocksdb::PerfLevel::kDisable, "Level of perf collection");
+DEFINE_int32(perf_level, ROCKSDB_NAMESPACE::PerfLevel::kDisable,
+ "Level of perf collection");
static bool ValidateRateLimit(const char* flagname, double value) {
const double EPSILON = 1e-10;
DEFINE_bool(enable_pipelined_write, true,
"Allow WAL and memtable writes to be pipelined");
+DEFINE_bool(unordered_write, false,
+ "Allow WAL and memtable writes to be pipelined");
+
DEFINE_bool(allow_concurrent_memtable_write, true,
"Allow multi-writers to update mem tables in parallel.");
-DEFINE_bool(inplace_update_support, rocksdb::Options().inplace_update_support,
+DEFINE_bool(inplace_update_support,
+ ROCKSDB_NAMESPACE::Options().inplace_update_support,
"Support in-place memtable update for smaller or same-size values");
DEFINE_uint64(inplace_update_num_locks,
- rocksdb::Options().inplace_update_num_locks,
+ ROCKSDB_NAMESPACE::Options().inplace_update_num_locks,
"Number of RW locks to protect in-place memtable updates");
DEFINE_bool(enable_write_thread_adaptive_yield, true,
"is the global rate in bytes/second.");
// the parameters of mix_graph
+DEFINE_double(keyrange_dist_a, 0.0,
+ "The parameter 'a' of prefix average access distribution "
+ "f(x)=a*exp(b*x)+c*exp(d*x)");
+DEFINE_double(keyrange_dist_b, 0.0,
+ "The parameter 'b' of prefix average access distribution "
+ "f(x)=a*exp(b*x)+c*exp(d*x)");
+DEFINE_double(keyrange_dist_c, 0.0,
+ "The parameter 'c' of prefix average access distribution"
+ "f(x)=a*exp(b*x)+c*exp(d*x)");
+DEFINE_double(keyrange_dist_d, 0.0,
+ "The parameter 'd' of prefix average access distribution"
+ "f(x)=a*exp(b*x)+c*exp(d*x)");
+DEFINE_int64(keyrange_num, 1,
+ "The number of key ranges that are in the same prefix "
+ "group, each prefix range will have its key acccess "
+ "distribution");
DEFINE_double(key_dist_a, 0.0,
"The parameter 'a' of key access distribution model "
"f(x)=a*x^b");
"If non-zero, db_bench will rate-limit the reads from RocksDB. This "
"is the global rate in ops/second.");
-DEFINE_uint64(max_compaction_bytes, rocksdb::Options().max_compaction_bytes,
+DEFINE_uint64(max_compaction_bytes,
+ ROCKSDB_NAMESPACE::Options().max_compaction_bytes,
"Max bytes allowed in one compaction");
#ifndef ROCKSDB_LITE
" in MB.");
DEFINE_uint64(max_total_wal_size, 0, "Set total max WAL size");
-DEFINE_bool(mmap_read, rocksdb::Options().allow_mmap_reads,
+DEFINE_bool(mmap_read, ROCKSDB_NAMESPACE::Options().allow_mmap_reads,
"Allow reads to occur via mmap-ing files");
-DEFINE_bool(mmap_write, rocksdb::Options().allow_mmap_writes,
+DEFINE_bool(mmap_write, ROCKSDB_NAMESPACE::Options().allow_mmap_writes,
"Allow writes to occur via mmap-ing files");
-DEFINE_bool(use_direct_reads, rocksdb::Options().use_direct_reads,
+DEFINE_bool(use_direct_reads, ROCKSDB_NAMESPACE::Options().use_direct_reads,
"Use O_DIRECT for reading data");
DEFINE_bool(use_direct_io_for_flush_and_compaction,
- rocksdb::Options().use_direct_io_for_flush_and_compaction,
+ ROCKSDB_NAMESPACE::Options().use_direct_io_for_flush_and_compaction,
"Use O_DIRECT for background flush and compaction writes");
-DEFINE_bool(advise_random_on_open, rocksdb::Options().advise_random_on_open,
+DEFINE_bool(advise_random_on_open,
+ ROCKSDB_NAMESPACE::Options().advise_random_on_open,
"Advise random access on table file open");
DEFINE_string(compaction_fadvice, "NORMAL",
"Access pattern advice when a file is compacted");
static auto FLAGS_compaction_fadvice_e =
- rocksdb::Options().access_hint_on_compaction_start;
+ ROCKSDB_NAMESPACE::Options().access_hint_on_compaction_start;
DEFINE_bool(use_tailing_iterator, false,
"Use tailing iterator to access a series of keys instead of get");
-DEFINE_bool(use_adaptive_mutex, rocksdb::Options().use_adaptive_mutex,
+DEFINE_bool(use_adaptive_mutex, ROCKSDB_NAMESPACE::Options().use_adaptive_mutex,
"Use adaptive mutex");
-DEFINE_uint64(bytes_per_sync, rocksdb::Options().bytes_per_sync,
+DEFINE_uint64(bytes_per_sync, ROCKSDB_NAMESPACE::Options().bytes_per_sync,
"Allows OS to incrementally sync SST files to disk while they are"
" being written, in the background. Issue one request for every"
" bytes_per_sync written. 0 turns it off.");
-DEFINE_uint64(wal_bytes_per_sync, rocksdb::Options().wal_bytes_per_sync,
+DEFINE_uint64(wal_bytes_per_sync,
+ ROCKSDB_NAMESPACE::Options().wal_bytes_per_sync,
"Allows OS to incrementally sync WAL files to disk while they are"
" being written, in the background. Issue one request for every"
" wal_bytes_per_sync written. 0 turns it off.");
}
return true;
}
+
DEFINE_int32(prefix_size, 0, "control the prefix size for HashSkipList and "
"plain table");
DEFINE_int64(keys_per_prefix, 0, "control average number of keys generated "
"per prefix, 0 means no special handling of the prefix, "
"i.e. use the prefix comes with the generated random number.");
+DEFINE_bool(total_order_seek, false,
+ "Enable total order seek regardless of index format.");
+DEFINE_bool(prefix_same_as_start, false,
+ "Enforce iterator to return keys with prefix same as seek key.");
+DEFINE_bool(
+ seek_missing_prefix, false,
+ "Iterator seek to keys with non-exist prefixes. Require prefix_size > 8");
+
DEFINE_int32(memtable_insert_with_hint_prefix_size, 0,
"If non-zero, enable "
"memtable insert with hint with the given prefix size.");
"table becomes an identity function. This is only valid when key "
"is 8 bytes");
DEFINE_bool(dump_malloc_stats, true, "Dump malloc stats in LOG ");
-DEFINE_uint64(stats_dump_period_sec, rocksdb::Options().stats_dump_period_sec,
+DEFINE_uint64(stats_dump_period_sec,
+ ROCKSDB_NAMESPACE::Options().stats_dump_period_sec,
"Gap between printing stats to log in seconds");
DEFINE_uint64(stats_persist_period_sec,
- rocksdb::Options().stats_persist_period_sec,
+ ROCKSDB_NAMESPACE::Options().stats_persist_period_sec,
"Gap between persisting stats in seconds");
+DEFINE_bool(persist_stats_to_disk,
+ ROCKSDB_NAMESPACE::Options().persist_stats_to_disk,
+ "whether to persist stats to disk");
DEFINE_uint64(stats_history_buffer_size,
- rocksdb::Options().stats_history_buffer_size,
+ ROCKSDB_NAMESPACE::Options().stats_history_buffer_size,
"Max number of stats snapshots to keep in memory");
+DEFINE_int64(multiread_stride, 0,
+ "Stride length for the keys in a MultiGet batch");
+DEFINE_bool(multiread_batched, false, "Use the new MultiGet API");
enum RepFactory {
kSkipList,
"position");
DEFINE_bool(report_file_operations, false, "if report number of file "
"operations");
+DEFINE_int32(readahead_size, 0, "Iterator readahead size");
static const bool FLAGS_soft_rate_limit_dummy __attribute__((__unused__)) =
RegisterFlagValidator(&FLAGS_soft_rate_limit, &ValidateRateLimit);
RegisterFlagValidator(&FLAGS_table_cache_numshardbits,
&ValidateTableCacheNumshardbits);
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
namespace {
struct ReportFileOpCounters {
} // namespace
+enum DistributionType : unsigned char {
+ kFixed = 0,
+ kUniform,
+ kNormal
+};
+
+static enum DistributionType FLAGS_value_size_distribution_type_e = kFixed;
+
+static enum DistributionType StringToDistributionType(const char* ctype) {
+ assert(ctype);
+
+ if (!strcasecmp(ctype, "fixed"))
+ return kFixed;
+ else if (!strcasecmp(ctype, "uniform"))
+ return kUniform;
+ else if (!strcasecmp(ctype, "normal"))
+ return kNormal;
+
+ fprintf(stdout, "Cannot parse distribution type '%s'\n", ctype);
+ return kFixed; // default value
+}
+
+class BaseDistribution {
+ public:
+ BaseDistribution(unsigned int min, unsigned int max) :
+ min_value_size_(min),
+ max_value_size_(max) {}
+ virtual ~BaseDistribution() {}
+
+ unsigned int Generate() {
+ auto val = Get();
+ if (NeedTruncate()) {
+ val = std::max(min_value_size_, val);
+ val = std::min(max_value_size_, val);
+ }
+ return val;
+ }
+ private:
+ virtual unsigned int Get() = 0;
+ virtual bool NeedTruncate() {
+ return true;
+ }
+ unsigned int min_value_size_;
+ unsigned int max_value_size_;
+};
+
+class FixedDistribution : public BaseDistribution
+{
+ public:
+ FixedDistribution(unsigned int size) :
+ BaseDistribution(size, size),
+ size_(size) {}
+ private:
+ virtual unsigned int Get() override {
+ return size_;
+ }
+ virtual bool NeedTruncate() override {
+ return false;
+ }
+ unsigned int size_;
+};
+
+class NormalDistribution
+ : public BaseDistribution, public std::normal_distribution<double> {
+ public:
+ NormalDistribution(unsigned int min, unsigned int max) :
+ BaseDistribution(min, max),
+ // 99.7% values within the range [min, max].
+ std::normal_distribution<double>((double)(min + max) / 2.0 /*mean*/,
+ (double)(max - min) / 6.0 /*stddev*/),
+ gen_(rd_()) {}
+ private:
+ virtual unsigned int Get() override {
+ return static_cast<unsigned int>((*this)(gen_));
+ }
+ std::random_device rd_;
+ std::mt19937 gen_;
+};
+
+class UniformDistribution
+ : public BaseDistribution,
+ public std::uniform_int_distribution<unsigned int> {
+ public:
+ UniformDistribution(unsigned int min, unsigned int max) :
+ BaseDistribution(min, max),
+ std::uniform_int_distribution<unsigned int>(min, max),
+ gen_(rd_()) {}
+ private:
+ virtual unsigned int Get() override {
+ return (*this)(gen_);
+ }
+ virtual bool NeedTruncate() override {
+ return false;
+ }
+ std::random_device rd_;
+ std::mt19937 gen_;
+};
+
// Helper for quickly generating random data.
class RandomGenerator {
private:
std::string data_;
unsigned int pos_;
+ std::unique_ptr<BaseDistribution> dist_;
public:
+
RandomGenerator() {
+ auto max_value_size = FLAGS_value_size_max;
+ switch (FLAGS_value_size_distribution_type_e) {
+ case kUniform:
+ dist_.reset(new UniformDistribution(FLAGS_value_size_min,
+ FLAGS_value_size_max));
+ break;
+ case kNormal:
+ dist_.reset(new NormalDistribution(FLAGS_value_size_min,
+ FLAGS_value_size_max));
+ break;
+ case kFixed:
+ default:
+ dist_.reset(new FixedDistribution(value_size));
+ max_value_size = value_size;
+ }
// We use a limited amount of data over and over again and ensure
// that it is larger than the compression window (32KB), and also
// large enough to serve all typical value sizes we want to write.
Random rnd(301);
std::string piece;
- while (data_.size() < (unsigned)std::max(1048576, FLAGS_value_size)) {
+ while (data_.size() < (unsigned)std::max(1048576, max_value_size)) {
// Add a short fragment that is as compressible as specified
// by FLAGS_compression_ratio.
test::CompressibleString(&rnd, FLAGS_compression_ratio, 100, &piece);
return Slice(data_.data() + pos_ - len, len);
}
- Slice GenerateWithTTL(unsigned int len) {
- assert(len <= data_.size());
- if (pos_ + len > data_.size()) {
- pos_ = 0;
- }
- pos_ += len;
- return Slice(data_.data() + pos_ - len, len);
+ Slice Generate() {
+ auto len = dist_->Generate();
+ return Generate(len);
}
};
private:
std::string Header() const { return "secs_elapsed,interval_qps"; }
void SleepAndReport() {
- uint64_t kMicrosInSecond = 1000 * 1000;
auto time_started = env_->NowMicros();
while (true) {
{
std::atomic<int64_t> total_ops_done_;
int64_t last_report_;
const uint64_t report_interval_secs_;
- rocksdb::port::Thread reporting_thread_;
+ ROCKSDB_NAMESPACE::port::Thread reporting_thread_;
std::mutex mutex_;
// will notify on stop
std::condition_variable stop_cv_;
"ElapsedTime", "Stage", "State", "OperationProperties");
int64_t current_time = 0;
- Env::Default()->GetCurrentTime(¤t_time);
+ FLAGS_env->GetCurrentTime(¤t_time);
for (auto ts : thread_list) {
fprintf(stderr, "%18" PRIu64 " %10s %12s %20s %13s %45s %12s",
ts.thread_id,
DBWithColumnFamilies db_;
std::vector<DBWithColumnFamilies> multi_dbs_;
int64_t num_;
- int value_size_;
int key_size_;
int prefix_size_;
int64_t keys_per_prefix_;
Options open_options_; // keep options around to properly destroy db later
#ifndef ROCKSDB_LITE
TraceOptions trace_options_;
+ TraceOptions block_cache_trace_options_;
#endif
int64_t reads_;
int64_t deletes_;
cv_.SignalAll();
}
- bool WaitForRecovery(uint64_t /*abs_time_us*/) {
+ bool WaitForRecovery(uint64_t abs_time_us) {
InstrumentedMutexLock l(&mutex_);
if (!recovery_complete_) {
- cv_.Wait(/*abs_time_us*/);
+ cv_.TimedWait(abs_time_us);
}
if (recovery_complete_) {
recovery_complete_ = false;
const Slice& input, std::string* compressed) {
bool ok = true;
switch (FLAGS_compression_type_e) {
- case rocksdb::kSnappyCompression:
+ case ROCKSDB_NAMESPACE::kSnappyCompression:
ok = Snappy_Compress(compression_info, input.data(), input.size(),
compressed);
break;
- case rocksdb::kZlibCompression:
+ case ROCKSDB_NAMESPACE::kZlibCompression:
ok = Zlib_Compress(compression_info, 2, input.data(), input.size(),
compressed);
break;
- case rocksdb::kBZip2Compression:
+ case ROCKSDB_NAMESPACE::kBZip2Compression:
ok = BZip2_Compress(compression_info, 2, input.data(), input.size(),
compressed);
break;
- case rocksdb::kLZ4Compression:
+ case ROCKSDB_NAMESPACE::kLZ4Compression:
ok = LZ4_Compress(compression_info, 2, input.data(), input.size(),
compressed);
break;
- case rocksdb::kLZ4HCCompression:
+ case ROCKSDB_NAMESPACE::kLZ4HCCompression:
ok = LZ4HC_Compress(compression_info, 2, input.data(), input.size(),
compressed);
break;
- case rocksdb::kXpressCompression:
+ case ROCKSDB_NAMESPACE::kXpressCompression:
ok = XPRESS_Compress(input.data(),
input.size(), compressed);
break;
- case rocksdb::kZSTD:
+ case ROCKSDB_NAMESPACE::kZSTD:
ok = ZSTD_Compress(compression_info, input.data(), input.size(),
compressed);
break;
void PrintHeader() {
PrintEnvironment();
fprintf(stdout, "Keys: %d bytes each\n", FLAGS_key_size);
- fprintf(stdout, "Values: %d bytes each (%d bytes after compression)\n",
- FLAGS_value_size,
- static_cast<int>(FLAGS_value_size * FLAGS_compression_ratio + 0.5));
+ auto avg_value_size = FLAGS_value_size;
+ if (FLAGS_value_size_distribution_type_e == kFixed) {
+ fprintf(stdout, "Values: %d bytes each (%d bytes after compression)\n",
+ avg_value_size,
+ static_cast<int>(avg_value_size * FLAGS_compression_ratio + 0.5));
+ } else {
+ avg_value_size = (FLAGS_value_size_min + FLAGS_value_size_max) / 2;
+ fprintf(stdout, "Values: %d avg bytes each (%d bytes after compression)\n",
+ avg_value_size,
+ static_cast<int>(avg_value_size * FLAGS_compression_ratio + 0.5));
+ fprintf(stdout, "Values Distribution: %s (min: %d, max: %d)\n",
+ FLAGS_value_size_distribution_type.c_str(),
+ FLAGS_value_size_min, FLAGS_value_size_max);
+ }
fprintf(stdout, "Entries: %" PRIu64 "\n", num_);
fprintf(stdout, "Prefix: %d bytes\n", FLAGS_prefix_size);
fprintf(stdout, "Keys per prefix: %" PRIu64 "\n", keys_per_prefix_);
fprintf(stdout, "RawSize: %.1f MB (estimated)\n",
- ((static_cast<int64_t>(FLAGS_key_size + FLAGS_value_size) * num_)
+ ((static_cast<int64_t>(FLAGS_key_size + avg_value_size) * num_)
/ 1048576.0));
fprintf(stdout, "FileSize: %.1f MB (estimated)\n",
- (((FLAGS_key_size + FLAGS_value_size * FLAGS_compression_ratio)
+ (((FLAGS_key_size + avg_value_size * FLAGS_compression_ratio)
* num_)
/ 1048576.0));
fprintf(stdout, "Write rate: %" PRIu64 " bytes/second\n",
fprintf(stdout,
"WARNING: Assertions are enabled; benchmarks unnecessarily slow\n");
#endif
- if (FLAGS_compression_type_e != rocksdb::kNoCompression) {
+ if (FLAGS_compression_type_e != ROCKSDB_NAMESPACE::kNoCompression) {
// The test string should not be too small.
const int len = FLAGS_block_size;
std::string input_str(len, 'y');
return nullptr;
}
if (FLAGS_use_clock_cache) {
- auto cache = NewClockCache((size_t)capacity, FLAGS_cache_numshardbits);
+ auto cache = NewClockCache(static_cast<size_t>(capacity),
+ FLAGS_cache_numshardbits);
if (!cache) {
fprintf(stderr, "Clock cache not supported.");
exit(1);
}
return cache;
} else {
- return NewLRUCache((size_t)capacity, FLAGS_cache_numshardbits,
- false /*strict_capacity_limit*/,
- FLAGS_cache_high_pri_pool_ratio);
+ return NewLRUCache(
+ static_cast<size_t>(capacity), FLAGS_cache_numshardbits,
+ false /*strict_capacity_limit*/, FLAGS_cache_high_pri_pool_ratio);
}
}
: nullptr),
prefix_extractor_(NewFixedPrefixTransform(FLAGS_prefix_size)),
num_(FLAGS_num),
- value_size_(FLAGS_value_size),
key_size_(FLAGS_key_size),
prefix_size_(FLAGS_prefix_size),
keys_per_prefix_(FLAGS_keys_per_prefix),
"at the same time");
exit(1);
}
- FLAGS_env = new ReportFileOpEnv(rocksdb::Env::Default());
+ FLAGS_env = new ReportFileOpEnv(FLAGS_env);
}
if (FLAGS_prefix_size > FLAGS_key_size) {
}
if (!FLAGS_use_existing_db) {
Options options;
+ options.env = FLAGS_env;
if (!FLAGS_wal_dir.empty()) {
options.wal_dir = FLAGS_wal_dir;
}
}
}
+ void GenerateKeyFromIntForSeek(uint64_t v, int64_t num_keys, Slice* key) {
+ GenerateKeyFromInt(v, num_keys, key);
+ if (FLAGS_seek_missing_prefix) {
+ assert(prefix_size_ > 8);
+ char* key_ptr = const_cast<char*>(key->data());
+ // This rely on GenerateKeyFromInt filling paddings with '0's.
+ // Putting a '1' will create a non-existing prefix.
+ key_ptr[8] = '1';
+ }
+ }
+
std::string GetPathForMultiple(std::string base_name, size_t id) {
if (!base_name.empty()) {
#ifndef OS_WIN
return base_name + ToString(id);
}
-void VerifyDBFromDB(std::string& truth_db_name) {
- DBWithColumnFamilies truth_db;
- auto s = DB::OpenForReadOnly(open_options_, truth_db_name, &truth_db.db);
- if (!s.ok()) {
- fprintf(stderr, "open error: %s\n", s.ToString().c_str());
- exit(1);
- }
- ReadOptions ro;
- ro.total_order_seek = true;
- std::unique_ptr<Iterator> truth_iter(truth_db.db->NewIterator(ro));
- std::unique_ptr<Iterator> db_iter(db_.db->NewIterator(ro));
- // Verify that all the key/values in truth_db are retrivable in db with ::Get
- fprintf(stderr, "Verifying db >= truth_db with ::Get...\n");
- for (truth_iter->SeekToFirst(); truth_iter->Valid(); truth_iter->Next()) {
+ void VerifyDBFromDB(std::string& truth_db_name) {
+ DBWithColumnFamilies truth_db;
+ auto s = DB::OpenForReadOnly(open_options_, truth_db_name, &truth_db.db);
+ if (!s.ok()) {
+ fprintf(stderr, "open error: %s\n", s.ToString().c_str());
+ exit(1);
+ }
+ ReadOptions ro;
+ ro.total_order_seek = true;
+ std::unique_ptr<Iterator> truth_iter(truth_db.db->NewIterator(ro));
+ std::unique_ptr<Iterator> db_iter(db_.db->NewIterator(ro));
+ // Verify that all the key/values in truth_db are retrivable in db with
+ // ::Get
+ fprintf(stderr, "Verifying db >= truth_db with ::Get...\n");
+ for (truth_iter->SeekToFirst(); truth_iter->Valid(); truth_iter->Next()) {
std::string value;
s = db_.db->Get(ro, truth_iter->key(), &value);
assert(s.ok());
// TODO(myabandeh): provide debugging hints
assert(Slice(value) == truth_iter->value());
+ }
+ // Verify that the db iterator does not give any extra key/value
+ fprintf(stderr, "Verifying db == truth_db...\n");
+ for (db_iter->SeekToFirst(), truth_iter->SeekToFirst(); db_iter->Valid();
+ db_iter->Next(), truth_iter->Next()) {
+ assert(truth_iter->Valid());
+ assert(truth_iter->value() == db_iter->value());
+ }
+ // No more key should be left unchecked in truth_db
+ assert(!truth_iter->Valid());
+ fprintf(stderr, "...Verified\n");
}
- // Verify that the db iterator does not give any extra key/value
- fprintf(stderr, "Verifying db == truth_db...\n");
- for (db_iter->SeekToFirst(), truth_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next(), truth_iter->Next()) {
- assert(truth_iter->Valid());
- assert(truth_iter->value() == db_iter->value());
- }
- // No more key should be left unchecked in truth_db
- assert(!truth_iter->Valid());
- fprintf(stderr, "...Verified\n");
-}
void Run() {
if (!SanityCheck()) {
reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads);
writes_ = (FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes);
deletes_ = (FLAGS_deletes < 0 ? FLAGS_num : FLAGS_deletes);
- value_size_ = FLAGS_value_size;
+ value_size = FLAGS_value_size;
key_size_ = FLAGS_key_size;
entries_per_batch_ = FLAGS_batch_size;
writes_before_delete_range_ = FLAGS_writes_before_delete_range;
} else if (name == "fill100K") {
fresh_db = true;
num_ /= 1000;
- value_size_ = 100 * 1000;
+ value_size = 100 * 1000;
method = &Benchmark::WriteRandom;
} else if (name == "readseq") {
method = &Benchmark::ReadSequential;
+ } else if (name == "readtorowcache") {
+ if (!FLAGS_use_existing_keys || !FLAGS_row_cache_size) {
+ fprintf(stderr,
+ "Please set use_existing_keys to true and specify a "
+ "row cache size in readtorowcache benchmark\n");
+ exit(1);
+ }
+ method = &Benchmark::ReadToRowCache;
} else if (name == "readtocache") {
method = &Benchmark::ReadSequential;
num_threads = 1;
} else if (name == "readreverse") {
method = &Benchmark::ReadReverse;
} else if (name == "readrandom") {
+ if (FLAGS_multiread_stride) {
+ fprintf(stderr, "entries_per_batch = %" PRIi64 "\n",
+ entries_per_batch_);
+ }
method = &Benchmark::ReadRandom;
} else if (name == "readrandomfast") {
method = &Benchmark::ReadRandomFast;
PrintStats("rocksdb.levelstats");
} else if (name == "sstables") {
PrintStats("rocksdb.sstables");
+ } else if (name == "stats_history") {
+ PrintStatsHistory();
} else if (name == "replay") {
if (num_threads > 1) {
fprintf(stderr, "Multi-threaded replay is not yet supported\n");
exit(1);
}
method = &Benchmark::Replay;
+ } else if (name == "getmergeoperands") {
+ method = &Benchmark::GetMergeOperands;
} else if (!name.empty()) { // No error message for empty name
fprintf(stderr, "unknown benchmark '%s'\n", name.c_str());
exit(1);
fprintf(stdout, "Tracing the workload to: [%s]\n",
FLAGS_trace_file.c_str());
}
+ // Start block cache tracing.
+ if (!FLAGS_block_cache_trace_file.empty()) {
+ // Sanity checks.
+ if (FLAGS_block_cache_trace_sampling_frequency <= 0) {
+ fprintf(stderr,
+ "Block cache trace sampling frequency must be higher than "
+ "0.\n");
+ exit(1);
+ }
+ if (FLAGS_block_cache_trace_max_trace_file_size_in_bytes <= 0) {
+ fprintf(stderr,
+ "The maximum file size for block cache tracing must be "
+ "higher than 0.\n");
+ exit(1);
+ }
+ block_cache_trace_options_.max_trace_file_size =
+ FLAGS_block_cache_trace_max_trace_file_size_in_bytes;
+ block_cache_trace_options_.sampling_frequency =
+ FLAGS_block_cache_trace_sampling_frequency;
+ std::unique_ptr<TraceWriter> block_cache_trace_writer;
+ Status s = NewFileTraceWriter(FLAGS_env, EnvOptions(),
+ FLAGS_block_cache_trace_file,
+ &block_cache_trace_writer);
+ if (!s.ok()) {
+ fprintf(stderr,
+ "Encountered an error when creating trace writer, %s\n",
+ s.ToString().c_str());
+ exit(1);
+ }
+ s = db_.db->StartBlockCacheTrace(block_cache_trace_options_,
+ std::move(block_cache_trace_writer));
+ if (!s.ok()) {
+ fprintf(
+ stderr,
+ "Encountered an error when starting block cache tracing, %s\n",
+ s.ToString().c_str());
+ exit(1);
+ }
+ fprintf(stdout, "Tracing block cache accesses to: [%s]\n",
+ FLAGS_block_cache_trace_file.c_str());
+ }
#endif // ROCKSDB_LITE
if (num_warmup > 0) {
}
}
+ if (secondary_update_thread_) {
+ secondary_update_stopped_.store(1, std::memory_order_relaxed);
+ secondary_update_thread_->join();
+ secondary_update_thread_.reset();
+ }
+
#ifndef ROCKSDB_LITE
if (name != "replay" && FLAGS_trace_file != "") {
Status s = db_.db->EndTrace();
s.ToString().c_str());
}
}
+ if (!FLAGS_block_cache_trace_file.empty()) {
+ Status s = db_.db->EndBlockCacheTrace();
+ if (!s.ok()) {
+ fprintf(stderr,
+ "Encountered an error ending the block cache tracing, %s\n",
+ s.ToString().c_str());
+ }
+ }
#endif // ROCKSDB_LITE
if (FLAGS_statistics) {
->ToString()
.c_str());
}
+
+#ifndef ROCKSDB_LITE
+ if (FLAGS_use_secondary_db) {
+ fprintf(stdout, "Secondary instance updated %" PRIu64 " times.\n",
+ secondary_db_updates_);
+ }
+#endif // ROCKSDB_LITE
}
private:
std::shared_ptr<TimestampEmulator> timestamp_emulator_;
-
+ std::unique_ptr<port::Thread> secondary_update_thread_;
+ std::atomic<int> secondary_update_stopped_{0};
+#ifndef ROCKSDB_LITE
+ uint64_t secondary_db_updates_ = 0;
+#endif // ROCKSDB_LITE
struct ThreadArg {
Benchmark* bm;
SharedState* shared;
while (ok && bytes < 1024 * 1048576) {
CacheAllocationPtr uncompressed;
switch (FLAGS_compression_type_e) {
- case rocksdb::kSnappyCompression: {
+ case ROCKSDB_NAMESPACE::kSnappyCompression: {
// get size and allocate here to make comparison fair
size_t ulength = 0;
if (!Snappy_GetUncompressedLength(compressed.data(),
uncompressed.get());
break;
}
- case rocksdb::kZlibCompression:
- uncompressed = Zlib_Uncompress(uncompression_info, compressed.data(),
- compressed.size(), &decompress_size, 2);
- ok = uncompressed.get() != nullptr;
- break;
- case rocksdb::kBZip2Compression:
- uncompressed = BZip2_Uncompress(compressed.data(), compressed.size(),
- &decompress_size, 2);
- ok = uncompressed.get() != nullptr;
- break;
- case rocksdb::kLZ4Compression:
- uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(),
- compressed.size(), &decompress_size, 2);
- ok = uncompressed.get() != nullptr;
- break;
- case rocksdb::kLZ4HCCompression:
- uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(),
- compressed.size(), &decompress_size, 2);
- ok = uncompressed.get() != nullptr;
- break;
- case rocksdb::kXpressCompression:
- uncompressed.reset(XPRESS_Uncompress(
- compressed.data(), compressed.size(), &decompress_size));
- ok = uncompressed.get() != nullptr;
- break;
- case rocksdb::kZSTD:
- uncompressed = ZSTD_Uncompress(uncompression_info, compressed.data(),
- compressed.size(), &decompress_size);
- ok = uncompressed.get() != nullptr;
- break;
- default:
- ok = false;
+ case ROCKSDB_NAMESPACE::kZlibCompression:
+ uncompressed =
+ Zlib_Uncompress(uncompression_info, compressed.data(),
+ compressed.size(), &decompress_size, 2);
+ ok = uncompressed.get() != nullptr;
+ break;
+ case ROCKSDB_NAMESPACE::kBZip2Compression:
+ uncompressed = BZip2_Uncompress(compressed.data(), compressed.size(),
+ &decompress_size, 2);
+ ok = uncompressed.get() != nullptr;
+ break;
+ case ROCKSDB_NAMESPACE::kLZ4Compression:
+ uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(),
+ compressed.size(), &decompress_size, 2);
+ ok = uncompressed.get() != nullptr;
+ break;
+ case ROCKSDB_NAMESPACE::kLZ4HCCompression:
+ uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(),
+ compressed.size(), &decompress_size, 2);
+ ok = uncompressed.get() != nullptr;
+ break;
+ case ROCKSDB_NAMESPACE::kXpressCompression:
+ uncompressed.reset(XPRESS_Uncompress(
+ compressed.data(), compressed.size(), &decompress_size));
+ ok = uncompressed.get() != nullptr;
+ break;
+ case ROCKSDB_NAMESPACE::kZSTD:
+ uncompressed = ZSTD_Uncompress(uncompression_info, compressed.data(),
+ compressed.size(), &decompress_size);
+ ok = uncompressed.get() != nullptr;
+ break;
+ default:
+ ok = false;
}
bytes += input.size();
thread->stats.FinishedOps(nullptr, nullptr, 1, kUncompress);
DBOptions db_opts;
std::vector<ColumnFamilyDescriptor> cf_descs;
if (FLAGS_options_file != "") {
- auto s = LoadOptionsFromFile(FLAGS_options_file, Env::Default(), &db_opts,
+ auto s = LoadOptionsFromFile(FLAGS_options_file, FLAGS_env, &db_opts,
&cf_descs);
+ db_opts.env = FLAGS_env;
if (s.ok()) {
*opts = Options(db_opts, cf_descs[0].options);
return true;
assert(db_.db == nullptr);
+ options.env = FLAGS_env;
options.max_open_files = FLAGS_open_files;
if (FLAGS_cost_write_buffer_to_cache || FLAGS_db_write_buffer_size != 0) {
options.write_buffer_manager.reset(
FLAGS_min_write_buffer_number_to_merge;
options.max_write_buffer_number_to_maintain =
FLAGS_max_write_buffer_number_to_maintain;
+ options.max_write_buffer_size_to_maintain =
+ FLAGS_max_write_buffer_size_to_maintain;
options.max_background_jobs = FLAGS_max_background_jobs;
options.max_background_compactions = FLAGS_max_background_compactions;
options.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
options.new_table_reader_for_compaction_inputs =
FLAGS_new_table_reader_for_compaction_inputs;
options.compaction_readahead_size = FLAGS_compaction_readahead_size;
+ options.log_readahead_size = FLAGS_log_readahead_size;
options.random_access_max_buffer_size = FLAGS_random_access_max_buffer_size;
options.writable_file_max_buffer_size = FLAGS_writable_file_max_buffer_size;
options.use_fsync = FLAGS_use_fsync;
exit(1);
}
- rocksdb::CuckooTableOptions table_options;
+ ROCKSDB_NAMESPACE::CuckooTableOptions table_options;
table_options.hash_table_ratio = FLAGS_cuckoo_hash_ratio;
table_options.identity_as_first_hash = FLAGS_identity_as_first_hash;
options.table_factory = std::shared_ptr<TableFactory>(
block_based_options.block_align = FLAGS_block_align;
if (FLAGS_use_data_block_hash_index) {
block_based_options.data_block_index_type =
- rocksdb::BlockBasedTableOptions::kDataBlockBinaryAndHash;
+ ROCKSDB_NAMESPACE::BlockBasedTableOptions::kDataBlockBinaryAndHash;
} else {
block_based_options.data_block_index_type =
- rocksdb::BlockBasedTableOptions::kDataBlockBinarySearch;
+ ROCKSDB_NAMESPACE::BlockBasedTableOptions::kDataBlockBinarySearch;
}
block_based_options.data_block_hash_table_util_ratio =
FLAGS_data_block_hash_table_util_ratio;
}
if (FLAGS_max_bytes_for_level_multiplier_additional_v.size() > 0) {
if (FLAGS_max_bytes_for_level_multiplier_additional_v.size() !=
- (unsigned int)FLAGS_num_levels) {
+ static_cast<unsigned int>(FLAGS_num_levels)) {
fprintf(stderr, "Insufficient number of fanouts specified %d\n",
- (int)FLAGS_max_bytes_for_level_multiplier_additional_v.size());
+ static_cast<int>(
+ FLAGS_max_bytes_for_level_multiplier_additional_v.size()));
exit(1);
}
options.max_bytes_for_level_multiplier_additional =
options.enable_write_thread_adaptive_yield =
FLAGS_enable_write_thread_adaptive_yield;
options.enable_pipelined_write = FLAGS_enable_pipelined_write;
+ options.unordered_write = FLAGS_unordered_write;
options.write_thread_max_yield_usec = FLAGS_write_thread_max_yield_usec;
options.write_thread_slow_yield_usec = FLAGS_write_thread_slow_yield_usec;
options.rate_limit_delay_max_milliseconds =
fprintf(stderr, "Cannot use readonly flag with transaction_db\n");
exit(1);
}
+ if (FLAGS_use_secondary_db &&
+ (FLAGS_transaction_db || FLAGS_optimistic_transaction_db)) {
+ fprintf(stderr, "Cannot use use_secondary_db flag with transaction_db\n");
+ exit(1);
+ }
#endif // ROCKSDB_LITE
}
static_cast<unsigned int>(FLAGS_stats_dump_period_sec);
options.stats_persist_period_sec =
static_cast<unsigned int>(FLAGS_stats_persist_period_sec);
+ options.persist_stats_to_disk = FLAGS_persist_stats_to_disk;
options.stats_history_buffer_size =
static_cast<size_t>(FLAGS_stats_history_buffer_size);
} else if (FLAGS_transaction_db) {
TransactionDB* ptr;
TransactionDBOptions txn_db_options;
+ if (options.unordered_write) {
+ options.two_write_queues = true;
+ txn_db_options.skip_concurrency_control = true;
+ txn_db_options.write_policy = WRITE_PREPARED;
+ }
s = TransactionDB::Open(options, txn_db_options, db_name,
column_families, &db->cfh, &ptr);
if (s.ok()) {
} else if (FLAGS_transaction_db) {
TransactionDB* ptr = nullptr;
TransactionDBOptions txn_db_options;
+ if (options.unordered_write) {
+ options.two_write_queues = true;
+ txn_db_options.skip_concurrency_control = true;
+ txn_db_options.write_policy = WRITE_PREPARED;
+ }
s = CreateLoggerFromOptions(db_name, options, &options.info_log);
if (s.ok()) {
s = TransactionDB::Open(options, txn_db_options, db_name, &ptr);
} else if (FLAGS_use_blob_db) {
blob_db::BlobDBOptions blob_db_options;
blob_db_options.enable_garbage_collection = FLAGS_blob_db_enable_gc;
+ blob_db_options.garbage_collection_cutoff = FLAGS_blob_db_gc_cutoff;
blob_db_options.is_fifo = FLAGS_blob_db_is_fifo;
blob_db_options.max_db_size = FLAGS_blob_db_max_db_size;
blob_db_options.ttl_range_secs = FLAGS_blob_db_ttl_range_secs;
blob_db_options.min_blob_size = FLAGS_blob_db_min_blob_size;
blob_db_options.bytes_per_sync = FLAGS_blob_db_bytes_per_sync;
blob_db_options.blob_file_size = FLAGS_blob_db_file_size;
+ blob_db_options.compression = FLAGS_blob_db_compression_type_e;
blob_db::BlobDB* ptr = nullptr;
s = blob_db::BlobDB::Open(options, blob_db_options, db_name, &ptr);
if (s.ok()) {
db->db = ptr;
}
+ } else if (FLAGS_use_secondary_db) {
+ if (FLAGS_secondary_path.empty()) {
+ std::string default_secondary_path;
+ FLAGS_env->GetTestDirectory(&default_secondary_path);
+ default_secondary_path += "/dbbench_secondary";
+ FLAGS_secondary_path = default_secondary_path;
+ }
+ s = DB::OpenAsSecondary(options, db_name, FLAGS_secondary_path, &db->db);
+ if (s.ok() && FLAGS_secondary_update_interval > 0) {
+ secondary_update_thread_.reset(new port::Thread(
+ [this](int interval, DBWithColumnFamilies* _db) {
+ while (0 == secondary_update_stopped_.load(
+ std::memory_order_relaxed)) {
+ Status secondary_update_status =
+ _db->db->TryCatchUpWithPrimary();
+ if (!secondary_update_status.ok()) {
+ fprintf(stderr, "Failed to catch up with primary: %s\n",
+ secondary_update_status.ToString().c_str());
+ break;
+ }
+ ++secondary_db_updates_;
+ FLAGS_env->SleepForMicroseconds(interval * 1000000);
+ }
+ },
+ FLAGS_secondary_update_interval, db));
+ }
#endif // ROCKSDB_LITE
} else {
s = DB::Open(options, db_name, &db->db);
size_t id = thread->rand.Next() % num_key_gens;
DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(id);
batch.Clear();
-
- if (thread->shared->write_rate_limiter.get() != nullptr) {
- thread->shared->write_rate_limiter->Request(
- entries_per_batch_ * (value_size_ + key_size_), Env::IO_HIGH,
- nullptr /* stats */, RateLimiter::OpType::kWrite);
- // Set time at which last op finished to Now() to hide latency and
- // sleep from rate limiter. Also, do the check once per batch, not
- // once per write.
- thread->stats.ResetLastOpTime();
- }
+ int64_t batch_bytes = 0;
for (int64_t j = 0; j < entries_per_batch_; j++) {
int64_t rand_num = key_gens[id]->Next();
GenerateKeyFromInt(rand_num, FLAGS_num, &key);
+ Slice val = gen.Generate();
if (use_blob_db_) {
#ifndef ROCKSDB_LITE
- Slice val = gen.Generate(value_size_);
- int ttl = rand() % FLAGS_blob_db_max_ttl_range;
blob_db::BlobDB* blobdb =
static_cast<blob_db::BlobDB*>(db_with_cfh->db);
- s = blobdb->PutWithTTL(write_options_, key, val, ttl);
+ if (FLAGS_blob_db_max_ttl_range > 0) {
+ int ttl = rand() % FLAGS_blob_db_max_ttl_range;
+ s = blobdb->PutWithTTL(write_options_, key, val, ttl);
+ } else {
+ s = blobdb->Put(write_options_, key, val);
+ }
#endif // ROCKSDB_LITE
} else if (FLAGS_num_column_families <= 1) {
- batch.Put(key, gen.Generate(value_size_));
+ batch.Put(key, val);
} else {
// We use same rand_num as seed for key and column family so that we
// can deterministically find the cfh corresponding to a particular
// key while reading the key.
batch.Put(db_with_cfh->GetCfh(rand_num), key,
- gen.Generate(value_size_));
+ val);
}
- bytes += value_size_ + key_size_;
+ batch_bytes += val.size() + key_size_;
+ bytes += val.size() + key_size_;
++num_written;
if (writes_per_range_tombstone_ > 0 &&
num_written > writes_before_delete_range_ &&
}
}
}
+ if (thread->shared->write_rate_limiter.get() != nullptr) {
+ thread->shared->write_rate_limiter->Request(
+ batch_bytes, Env::IO_HIGH,
+ nullptr /* stats */, RateLimiter::OpType::kWrite);
+ // Set time at which last op finished to Now() to hide latency and
+ // sleep from rate limiter. Also, do the check once per batch, not
+ // once per write.
+ thread->stats.ResetLastOpTime();
+ }
if (!use_blob_db_) {
s = db_with_cfh->db->Write(write_options_, &batch);
}
delete iter;
thread->stats.AddBytes(bytes);
- if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
+ if (FLAGS_perf_level > ROCKSDB_NAMESPACE::PerfLevel::kDisable) {
+ thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
+ get_perf_context()->ToString());
+ }
+ }
+
+ void ReadToRowCache(ThreadState* thread) {
+ int64_t read = 0;
+ int64_t found = 0;
+ int64_t bytes = 0;
+ int64_t key_rand = 0;
+ ReadOptions options(FLAGS_verify_checksum, true);
+ std::unique_ptr<const char[]> key_guard;
+ Slice key = AllocateKey(&key_guard);
+ PinnableSlice pinnable_val;
+
+ while (key_rand < FLAGS_num) {
+ DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(thread);
+ // We use same key_rand as seed for key and column family so that we can
+ // deterministically find the cfh corresponding to a particular key, as it
+ // is done in DoWrite method.
+ GenerateKeyFromInt(key_rand, FLAGS_num, &key);
+ key_rand++;
+ read++;
+ Status s;
+ if (FLAGS_num_column_families > 1) {
+ s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key,
+ &pinnable_val);
+ } else {
+ pinnable_val.Reset();
+ s = db_with_cfh->db->Get(options,
+ db_with_cfh->db->DefaultColumnFamily(), key,
+ &pinnable_val);
+ }
+
+ if (s.ok()) {
+ found++;
+ bytes += key.size() + pinnable_val.size();
+ } else if (!s.IsNotFound()) {
+ fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());
+ abort();
+ }
+
+ if (thread->shared->read_rate_limiter.get() != nullptr &&
+ read % 256 == 255) {
+ thread->shared->read_rate_limiter->Request(
+ 256, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead);
+ }
+
+ thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kRead);
+ }
+
+ char msg[100];
+ snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n", found,
+ read);
+
+ thread->stats.AddBytes(bytes);
+ thread->stats.AddMessage(msg);
+
+ if (FLAGS_perf_level > ROCKSDB_NAMESPACE::PerfLevel::kDisable) {
thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
get_perf_context()->ToString());
}
thread->stats.AddMessage(msg);
- if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
+ if (FLAGS_perf_level > ROCKSDB_NAMESPACE::PerfLevel::kDisable) {
thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
get_perf_context()->ToString());
}
int64_t read = 0;
int64_t found = 0;
int64_t bytes = 0;
+ int num_keys = 0;
+ int64_t key_rand = GetRandomKey(&thread->rand);
ReadOptions options(FLAGS_verify_checksum, true);
std::unique_ptr<const char[]> key_guard;
Slice key = AllocateKey(&key_guard);
// We use same key_rand as seed for key and column family so that we can
// deterministically find the cfh corresponding to a particular key, as it
// is done in DoWrite method.
- int64_t key_rand = GetRandomKey(&thread->rand);
GenerateKeyFromInt(key_rand, FLAGS_num, &key);
+ if (entries_per_batch_ > 1 && FLAGS_multiread_stride) {
+ if (++num_keys == entries_per_batch_) {
+ num_keys = 0;
+ key_rand = GetRandomKey(&thread->rand);
+ if ((key_rand + (entries_per_batch_ - 1) * FLAGS_multiread_stride) >=
+ FLAGS_num) {
+ key_rand = FLAGS_num - entries_per_batch_ * FLAGS_multiread_stride;
+ }
+ } else {
+ key_rand += FLAGS_multiread_stride;
+ }
+ } else {
+ key_rand = GetRandomKey(&thread->rand);
+ }
read++;
Status s;
if (FLAGS_num_column_families > 1) {
thread->stats.AddBytes(bytes);
thread->stats.AddMessage(msg);
- if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
+ if (FLAGS_perf_level > ROCKSDB_NAMESPACE::PerfLevel::kDisable) {
thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
get_perf_context()->ToString());
}
std::vector<Slice> keys;
std::vector<std::unique_ptr<const char[]> > key_guards;
std::vector<std::string> values(entries_per_batch_);
+ PinnableSlice* pin_values = new PinnableSlice[entries_per_batch_];
+ std::unique_ptr<PinnableSlice[]> pin_values_guard(pin_values);
+ std::vector<Status> stat_list(entries_per_batch_);
while (static_cast<int64_t>(keys.size()) < entries_per_batch_) {
key_guards.push_back(std::unique_ptr<const char[]>());
keys.push_back(AllocateKey(&key_guards.back()));
Duration duration(FLAGS_duration, reads_);
while (!duration.Done(1)) {
DB* db = SelectDB(thread);
- for (int64_t i = 0; i < entries_per_batch_; ++i) {
- GenerateKeyFromInt(GetRandomKey(&thread->rand), FLAGS_num, &keys[i]);
+ if (FLAGS_multiread_stride) {
+ int64_t key = GetRandomKey(&thread->rand);
+ if ((key + (entries_per_batch_ - 1) * FLAGS_multiread_stride) >=
+ static_cast<int64_t>(FLAGS_num)) {
+ key = FLAGS_num - entries_per_batch_ * FLAGS_multiread_stride;
+ }
+ for (int64_t i = 0; i < entries_per_batch_; ++i) {
+ GenerateKeyFromInt(key, FLAGS_num, &keys[i]);
+ key += FLAGS_multiread_stride;
+ }
+ } else {
+ for (int64_t i = 0; i < entries_per_batch_; ++i) {
+ GenerateKeyFromInt(GetRandomKey(&thread->rand), FLAGS_num, &keys[i]);
+ }
}
- std::vector<Status> statuses = db->MultiGet(options, keys, &values);
- assert(static_cast<int64_t>(statuses.size()) == entries_per_batch_);
-
- read += entries_per_batch_;
- num_multireads++;
- for (int64_t i = 0; i < entries_per_batch_; ++i) {
- if (statuses[i].ok()) {
- ++found;
- } else if (!statuses[i].IsNotFound()) {
- fprintf(stderr, "MultiGet returned an error: %s\n",
- statuses[i].ToString().c_str());
- abort();
+ if (!FLAGS_multiread_batched) {
+ std::vector<Status> statuses = db->MultiGet(options, keys, &values);
+ assert(static_cast<int64_t>(statuses.size()) == entries_per_batch_);
+
+ read += entries_per_batch_;
+ num_multireads++;
+ for (int64_t i = 0; i < entries_per_batch_; ++i) {
+ if (statuses[i].ok()) {
+ ++found;
+ } else if (!statuses[i].IsNotFound()) {
+ fprintf(stderr, "MultiGet returned an error: %s\n",
+ statuses[i].ToString().c_str());
+ abort();
+ }
+ }
+ } else {
+ db->MultiGet(options, db->DefaultColumnFamily(), keys.size(),
+ keys.data(), pin_values, stat_list.data());
+
+ read += entries_per_batch_;
+ num_multireads++;
+ for (int64_t i = 0; i < entries_per_batch_; ++i) {
+ if (stat_list[i].ok()) {
+ ++found;
+ } else if (!stat_list[i].IsNotFound()) {
+ fprintf(stderr, "MultiGet returned an error: %s\n",
+ stat_list[i].ToString().c_str());
+ abort();
+ }
+ stat_list[i] = Status::OK();
+ pin_values[i].Reset();
}
}
if (thread->shared->read_rate_limiter.get() != nullptr &&
thread->stats.AddMessage(msg);
}
- // THe reverse function of Pareto function
+ // The inverse function of Pareto distribution
int64_t ParetoCdfInversion(double u, double theta, double k, double sigma) {
double ret;
if (k == 0.0) {
}
return static_cast<int64_t>(ceil(ret));
}
- // inversion of y=ax^b
+ // The inverse function of power distribution (y=ax^b)
int64_t PowerCdfInversion(double u, double a, double b) {
double ret;
ret = std::pow((u / a), (1 / b));
}
}
- // decide the query type
+ // Decide the ratio of different query types
// 0 Get, 1 Put, 2 Seek, 3 SeekForPrev, 4 Delete, 5 SingleDelete, 6 merge
class QueryDecider {
public:
}
};
- // The graph wokrload mixed with Get, Put, Iterator
+ // KeyrangeUnit is the struct of a keyrange. It is used in a keyrange vector
+ // to transfer a random value to one keyrange based on the hotness.
+ struct KeyrangeUnit {
+ int64_t keyrange_start;
+ int64_t keyrange_access;
+ int64_t keyrange_keys;
+ };
+
+ // From our observations, the prefix hotness (key-range hotness) follows
+ // the two-term-exponential distribution: f(x) = a*exp(b*x) + c*exp(d*x).
+ // However, we cannot directly use the inverse function to decide a
+ // key-range from a random distribution. To achieve it, we create a list of
+ // KeyrangeUnit, each KeyrangeUnit occupies a range of integers whose size is
+ // decided based on the hotness of the key-range. When a random value is
+ // generated based on uniform distribution, we map it to the KeyrangeUnit Vec
+ // and one KeyrangeUnit is selected. The probability of a KeyrangeUnit being
+ // selected is the same as the hotness of this KeyrangeUnit. After that, the
+ // key can be randomly allocated to the key-range of this KeyrangeUnit, or we
+ // can based on the power distribution (y=ax^b) to generate the offset of
+ // the key in the selected key-range. In this way, we generate the keyID
+ // based on the hotness of the prefix and also the key hotness distribution.
+ class GenerateTwoTermExpKeys {
+ public:
+ int64_t keyrange_rand_max_;
+ int64_t keyrange_size_;
+ int64_t keyrange_num_;
+ bool initiated_;
+ std::vector<KeyrangeUnit> keyrange_set_;
+
+ GenerateTwoTermExpKeys() {
+ keyrange_rand_max_ = FLAGS_num;
+ initiated_ = false;
+ }
+
+ ~GenerateTwoTermExpKeys() {}
+
+ // Initiate the KeyrangeUnit vector and calculate the size of each
+ // KeyrangeUnit.
+ Status InitiateExpDistribution(int64_t total_keys, double prefix_a,
+ double prefix_b, double prefix_c,
+ double prefix_d) {
+ int64_t amplify = 0;
+ int64_t keyrange_start = 0;
+ initiated_ = true;
+ if (FLAGS_keyrange_num <= 0) {
+ keyrange_num_ = 1;
+ } else {
+ keyrange_num_ = FLAGS_keyrange_num;
+ }
+ keyrange_size_ = total_keys / keyrange_num_;
+
+ // Calculate the key-range shares size based on the input parameters
+ for (int64_t pfx = keyrange_num_; pfx >= 1; pfx--) {
+ // Step 1. Calculate the probability that this key range will be
+ // accessed in a query. It is based on the two-term expoential
+ // distribution
+ double keyrange_p = prefix_a * std::exp(prefix_b * pfx) +
+ prefix_c * std::exp(prefix_d * pfx);
+ if (keyrange_p < std::pow(10.0, -16.0)) {
+ keyrange_p = 0.0;
+ }
+ // Step 2. Calculate the amplify
+ // In order to allocate a query to a key-range based on the random
+ // number generated for this query, we need to extend the probability
+ // of each key range from [0,1] to [0, amplify]. Amplify is calculated
+ // by 1/(smallest key-range probability). In this way, we ensure that
+ // all key-ranges are assigned with an Integer that >=0
+ if (amplify == 0 && keyrange_p > 0) {
+ amplify = static_cast<int64_t>(std::floor(1 / keyrange_p)) + 1;
+ }
+
+ // Step 3. For each key-range, we calculate its position in the
+ // [0, amplify] range, including the start, the size (keyrange_access)
+ KeyrangeUnit p_unit;
+ p_unit.keyrange_start = keyrange_start;
+ if (0.0 >= keyrange_p) {
+ p_unit.keyrange_access = 0;
+ } else {
+ p_unit.keyrange_access =
+ static_cast<int64_t>(std::floor(amplify * keyrange_p));
+ }
+ p_unit.keyrange_keys = keyrange_size_;
+ keyrange_set_.push_back(p_unit);
+ keyrange_start += p_unit.keyrange_access;
+ }
+ keyrange_rand_max_ = keyrange_start;
+
+ // Step 4. Shuffle the key-ranges randomly
+ // Since the access probability is calculated from small to large,
+ // If we do not re-allocate them, hot key-ranges are always at the end
+ // and cold key-ranges are at the begin of the key space. Therefore, the
+ // key-ranges are shuffled and the rand seed is only decide by the
+ // key-range hotness distribution. With the same distribution parameters
+ // the shuffle results are the same.
+ Random64 rand_loca(keyrange_rand_max_);
+ for (int64_t i = 0; i < FLAGS_keyrange_num; i++) {
+ int64_t pos = rand_loca.Next() % FLAGS_keyrange_num;
+ assert(i >= 0 && i < static_cast<int64_t>(keyrange_set_.size()) &&
+ pos >= 0 && pos < static_cast<int64_t>(keyrange_set_.size()));
+ std::swap(keyrange_set_[i], keyrange_set_[pos]);
+ }
+
+ // Step 5. Recalculate the prefix start postion after shuffling
+ int64_t offset = 0;
+ for (auto& p_unit : keyrange_set_) {
+ p_unit.keyrange_start = offset;
+ offset += p_unit.keyrange_access;
+ }
+
+ return Status::OK();
+ }
+
+ // Generate the Key ID according to the input ini_rand and key distribution
+ int64_t DistGetKeyID(int64_t ini_rand, double key_dist_a,
+ double key_dist_b) {
+ int64_t keyrange_rand = ini_rand % keyrange_rand_max_;
+
+ // Calculate and select one key-range that contains the new key
+ int64_t start = 0, end = static_cast<int64_t>(keyrange_set_.size());
+ while (start + 1 < end) {
+ int64_t mid = start + (end - start) / 2;
+ assert(mid >= 0 && mid < static_cast<int64_t>(keyrange_set_.size()));
+ if (keyrange_rand < keyrange_set_[mid].keyrange_start) {
+ end = mid;
+ } else {
+ start = mid;
+ }
+ }
+ int64_t keyrange_id = start;
+
+ // Select one key in the key-range and compose the keyID
+ int64_t key_offset = 0, key_seed;
+ if (key_dist_a == 0.0 && key_dist_b == 0.0) {
+ key_offset = ini_rand % keyrange_size_;
+ } else {
+ key_seed = static_cast<int64_t>(
+ ceil(std::pow((ini_rand / key_dist_a), (1 / key_dist_b))));
+ Random64 rand_key(key_seed);
+ key_offset = static_cast<int64_t>(rand_key.Next()) % keyrange_size_;
+ }
+ return keyrange_size_ * keyrange_id + key_offset;
+ }
+ };
+
+ // The social graph wokrload mixed with Get, Put, Iterator queries.
+ // The value size and iterator length follow Pareto distribution.
+ // The overall key access follow power distribution. If user models the
+ // workload based on different key-ranges (or different prefixes), user
+ // can use two-term-exponential distribution to fit the workload. User
+ // needs to decides the ratio between Get, Put, Iterator queries before
+ // starting the benchmark.
void MixGraph(ThreadState* thread) {
int64_t read = 0; // including single gets and Next of iterators
int64_t gets = 0;
int64_t scan_len_max = FLAGS_mix_max_scan_len;
double write_rate = 1000000.0;
double read_rate = 1000000.0;
+ bool use_prefix_modeling = false;
+ GenerateTwoTermExpKeys gen_exp;
std::vector<double> ratio{FLAGS_mix_get_ratio, FLAGS_mix_put_ratio,
FLAGS_mix_seek_ratio};
char value_buffer[default_value_max];
// the limit of qps initiation
if (FLAGS_sine_a != 0 || FLAGS_sine_d != 0) {
thread->shared->read_rate_limiter.reset(NewGenericRateLimiter(
- read_rate, 100000 /* refill_period_us */, 10 /* fairness */,
+ static_cast<int64_t>(read_rate), 100000 /* refill_period_us */, 10 /* fairness */,
RateLimiter::Mode::kReadsOnly));
thread->shared->write_rate_limiter.reset(
- NewGenericRateLimiter(write_rate));
+ NewGenericRateLimiter(static_cast<int64_t>(write_rate)));
+ }
+
+ // Decide if user wants to use prefix based key generation
+ if (FLAGS_keyrange_dist_a != 0.0 || FLAGS_keyrange_dist_b != 0.0 ||
+ FLAGS_keyrange_dist_c != 0.0 || FLAGS_keyrange_dist_d != 0.0) {
+ use_prefix_modeling = true;
+ gen_exp.InitiateExpDistribution(
+ FLAGS_num, FLAGS_keyrange_dist_a, FLAGS_keyrange_dist_b,
+ FLAGS_keyrange_dist_c, FLAGS_keyrange_dist_d);
}
Duration duration(FLAGS_duration, reads_);
while (!duration.Done(1)) {
DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(thread);
- int64_t rand_v, key_rand, key_seed;
- rand_v = GetRandomKey(&thread->rand) % FLAGS_num;
+ int64_t ini_rand, rand_v, key_rand, key_seed;
+ ini_rand = GetRandomKey(&thread->rand);
+ rand_v = ini_rand % FLAGS_num;
double u = static_cast<double>(rand_v) / FLAGS_num;
- key_seed = PowerCdfInversion(u, FLAGS_key_dist_a, FLAGS_key_dist_b);
- Random64 rand(key_seed);
- key_rand = static_cast<int64_t>(rand.Next()) % FLAGS_num;
+
+ // Generate the keyID based on the key hotness and prefix hotness
+ if (use_prefix_modeling) {
+ key_rand =
+ gen_exp.DistGetKeyID(ini_rand, FLAGS_key_dist_a, FLAGS_key_dist_b);
+ } else {
+ key_seed = PowerCdfInversion(u, FLAGS_key_dist_a, FLAGS_key_dist_b);
+ Random64 rand(key_seed);
+ key_rand = static_cast<int64_t>(rand.Next()) % FLAGS_num;
+ }
GenerateKeyFromInt(key_rand, FLAGS_num, &key);
int query_type = query.GetType(rand_v);
mix_rate_with_noise * query.ratio_[1] * FLAGS_mix_ave_kv_size;
thread->shared->write_rate_limiter.reset(
- NewGenericRateLimiter(write_rate));
+ NewGenericRateLimiter(static_cast<int64_t>(write_rate)));
thread->shared->read_rate_limiter.reset(NewGenericRateLimiter(
- read_rate,
+ static_cast<int64_t>(read_rate),
FLAGS_sine_mix_rate_interval_milliseconds * uint64_t{1000}, 10,
RateLimiter::Mode::kReadsOnly));
}
} else if (query_type == 1) {
// the Put query
puts++;
- int64_t value_size = ParetoCdfInversion(
+ int64_t val_size = ParetoCdfInversion(
u, FLAGS_value_theta, FLAGS_value_k, FLAGS_value_sigma);
- if (value_size < 0) {
- value_size = 10;
- } else if (value_size > value_max) {
- value_size = value_size % value_max;
+ if (val_size < 0) {
+ val_size = 10;
+ } else if (val_size > value_max) {
+ val_size = val_size % value_max;
}
s = db_with_cfh->db->Put(
write_options_, key,
- gen.Generate(static_cast<unsigned int>(value_size)));
+ gen.Generate(static_cast<unsigned int>(val_size)));
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
if (thread->shared->write_rate_limiter) {
thread->shared->write_rate_limiter->Request(
- key.size() + value_size, Env::IO_HIGH, nullptr /*stats*/,
+ key.size() + val_size, Env::IO_HIGH, nullptr /*stats*/,
RateLimiter::OpType::kWrite);
}
thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kWrite);
thread->stats.AddBytes(bytes);
thread->stats.AddMessage(msg);
- if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
+ if (FLAGS_perf_level > ROCKSDB_NAMESPACE::PerfLevel::kDisable) {
thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
get_perf_context()->ToString());
}
int64_t found = 0;
int64_t bytes = 0;
ReadOptions options(FLAGS_verify_checksum, true);
+ options.total_order_seek = FLAGS_total_order_seek;
+ options.prefix_same_as_start = FLAGS_prefix_same_as_start;
options.tailing = FLAGS_use_tailing_iterator;
+ options.readahead_size = FLAGS_readahead_size;
Iterator* single_iter = nullptr;
std::vector<Iterator*> multi_iters;
char value_buffer[256];
while (!duration.Done(1)) {
int64_t seek_pos = thread->rand.Next() % FLAGS_num;
- GenerateKeyFromInt((uint64_t)seek_pos, FLAGS_num, &key);
+ GenerateKeyFromIntForSeek(static_cast<uint64_t>(seek_pos), FLAGS_num,
+ &key);
if (FLAGS_max_scan_distance != 0) {
if (FLAGS_reverse_iterator) {
GenerateKeyFromInt(
FLAGS_num, &lower_bound);
options.iterate_lower_bound = &lower_bound;
} else {
- GenerateKeyFromInt(
- (uint64_t)std::min(FLAGS_num, seek_pos + FLAGS_max_scan_distance),
- FLAGS_num, &upper_bound);
+ auto min_num =
+ std::min(FLAGS_num, seek_pos + FLAGS_max_scan_distance);
+ GenerateKeyFromInt(static_cast<uint64_t>(min_num), FLAGS_num,
+ &upper_bound);
options.iterate_upper_bound = &upper_bound;
}
}
found, read);
thread->stats.AddBytes(bytes);
thread->stats.AddMessage(msg);
- if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
+ if (FLAGS_perf_level > ROCKSDB_NAMESPACE::PerfLevel::kDisable) {
thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
get_perf_context()->ToString());
}
// Wait for the writes to be finished
if (!hint_printed) {
fprintf(stderr, "Reads are finished. Have %d more writes to do\n",
- (int)writes_ - written);
+ static_cast<int>(writes_) - written);
hint_printed = true;
}
} else {
GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
Status s;
+ Slice val = gen.Generate();
if (write_merge == kWrite) {
- s = db->Put(write_options_, key, gen.Generate(value_size_));
+ s = db->Put(write_options_, key, val);
} else {
- s = db->Merge(write_options_, key, gen.Generate(value_size_));
+ s = db->Merge(write_options_, key, val);
}
written++;
fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
exit(1);
}
- bytes += key.size() + value_size_;
+ bytes += key.size() + val.size();
thread->stats.FinishedOps(&db_, db_.db, 1, kWrite);
if (FLAGS_benchmark_write_rate_limit > 0) {
write_rate_limiter->Request(
- entries_per_batch_ * (value_size_ + key_size_), Env::IO_HIGH,
+ key.size() + val.size(), Env::IO_HIGH,
nullptr /* stats */, RateLimiter::OpType::kWrite);
}
}
} else if (put_weight > 0) {
// then do all the corresponding number of puts
// for all the gets we have done earlier
- Status s = PutMany(db, write_options_, key, gen.Generate(value_size_));
+ Status s = PutMany(db, write_options_, key, gen.Generate());
if (!s.ok()) {
fprintf(stderr, "putmany error: %s\n", s.ToString().c_str());
exit(1);
} else if (put_weight > 0) {
// then do all the corresponding number of puts
// for all the gets we have done earlier
- Status s = db->Put(write_options_, key, gen.Generate(value_size_));
+ Status s = db->Put(write_options_, key, gen.Generate());
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
if (thread->shared->write_rate_limiter) {
thread->shared->write_rate_limiter->Request(
- key.size() + value_size_, Env::IO_HIGH, nullptr /*stats*/,
+ key.size() + value.size(), Env::IO_HIGH, nullptr /*stats*/,
RateLimiter::OpType::kWrite);
}
- Status s = db->Put(write_options_, key, gen.Generate(value_size_));
+ Slice val = gen.Generate();
+ Status s = db->Put(write_options_, key, val);
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
}
- bytes += key.size() + value_size_;
+ bytes += key.size() + val.size();
thread->stats.FinishedOps(nullptr, db, 1, kUpdate);
}
char msg[100];
exit(1);
}
- Slice value = gen.Generate(value_size_);
+ Slice value = gen.Generate(static_cast<unsigned int>(existing_value.size()));
std::string new_value;
if (status.ok()) {
}
// Update the value (by appending data)
- Slice operand = gen.Generate(value_size_);
+ Slice operand = gen.Generate();
if (value.size() > 0) {
// Use a delimiter to match the semantics for StringAppendOperator
value.append(1,',');
GenerateKeyFromInt(key_rand, merge_keys_, &key);
Status s;
+ Slice val = gen.Generate();
if (FLAGS_num_column_families > 1) {
s = db_with_cfh->db->Merge(write_options_,
db_with_cfh->GetCfh(key_rand), key,
- gen.Generate(value_size_));
+ val);
} else {
s = db_with_cfh->db->Merge(write_options_,
db_with_cfh->db->DefaultColumnFamily(), key,
- gen.Generate(value_size_));
+ val);
}
if (!s.ok()) {
fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
exit(1);
}
- bytes += key.size() + value_size_;
+ bytes += key.size() + val.size();
thread->stats.FinishedOps(nullptr, db_with_cfh->db, 1, kMerge);
}
bool do_merge = int(thread->rand.Next() % 100) < FLAGS_mergereadpercent;
if (do_merge) {
- Status s = db->Merge(write_options_, key, gen.Generate(value_size_));
+ Status s = db->Merge(write_options_, key, gen.Generate());
if (!s.ok()) {
fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
exit(1);
}
}
+ bool binary_search(std::vector<int>& data, int start, int end, int key) {
+ if (data.empty()) return false;
+ if (start > end) return false;
+ int mid = start + (end - start) / 2;
+ if (mid > static_cast<int>(data.size()) - 1) return false;
+ if (data[mid] == key) {
+ return true;
+ } else if (data[mid] > key) {
+ return binary_search(data, start, mid - 1, key);
+ } else {
+ return binary_search(data, mid + 1, end, key);
+ }
+ }
+
+ // Does a bunch of merge operations for a key(key1) where the merge operand
+ // is a sorted list. Next performance comparison is done between doing a Get
+ // for key1 followed by searching for another key(key2) in the large sorted
+ // list vs calling GetMergeOperands for key1 and then searching for the key2
+ // in all the sorted sub-lists. Later case is expected to be a lot faster.
+ void GetMergeOperands(ThreadState* thread) {
+ DB* db = SelectDB(thread);
+ const int kTotalValues = 100000;
+ const int kListSize = 100;
+ std::string key = "my_key";
+ std::string value;
+
+ for (int i = 1; i < kTotalValues; i++) {
+ if (i % kListSize == 0) {
+ // Remove trailing ','
+ value.pop_back();
+ db->Merge(WriteOptions(), key, value);
+ value.clear();
+ } else {
+ value.append(std::to_string(i)).append(",");
+ }
+ }
+
+ SortList s;
+ std::vector<int> data;
+ // This value can be experimented with and it will demonstrate the
+ // perf difference between doing a Get and searching for lookup_key in the
+ // resultant large sorted list vs doing GetMergeOperands and searching
+ // for lookup_key within this resultant sorted sub-lists.
+ int lookup_key = 1;
+
+ // Get API call
+ std::cout << "--- Get API call --- \n";
+ PinnableSlice p_slice;
+ uint64_t st = FLAGS_env->NowNanos();
+ db->Get(ReadOptions(), db->DefaultColumnFamily(), key, &p_slice);
+ s.MakeVector(data, p_slice);
+ bool found =
+ binary_search(data, 0, static_cast<int>(data.size() - 1), lookup_key);
+ std::cout << "Found key? " << std::to_string(found) << "\n";
+ uint64_t sp = FLAGS_env->NowNanos();
+ std::cout << "Get: " << (sp - st) / 1000000000.0 << " seconds\n";
+ std::string* dat_ = p_slice.GetSelf();
+ std::cout << "Sample data from Get API call: " << dat_->substr(0, 10)
+ << "\n";
+ data.clear();
+
+ // GetMergeOperands API call
+ std::cout << "--- GetMergeOperands API --- \n";
+ std::vector<PinnableSlice> a_slice((kTotalValues / kListSize) + 1);
+ st = FLAGS_env->NowNanos();
+ int number_of_operands = 0;
+ GetMergeOperandsOptions get_merge_operands_options;
+ get_merge_operands_options.expected_max_number_of_operands =
+ (kTotalValues / 100) + 1;
+ db->GetMergeOperands(ReadOptions(), db->DefaultColumnFamily(), key,
+ a_slice.data(), &get_merge_operands_options,
+ &number_of_operands);
+ for (PinnableSlice& psl : a_slice) {
+ s.MakeVector(data, psl);
+ found =
+ binary_search(data, 0, static_cast<int>(data.size() - 1), lookup_key);
+ data.clear();
+ if (found) break;
+ }
+ std::cout << "Found key? " << std::to_string(found) << "\n";
+ sp = FLAGS_env->NowNanos();
+ std::cout << "Get Merge operands: " << (sp - st) / 1000000000.0
+ << " seconds \n";
+ int to_print = 0;
+ std::cout << "Sample data from GetMergeOperands API call: ";
+ for (PinnableSlice& psl : a_slice) {
+ std::cout << "List: " << to_print << " : " << *psl.GetSelf() << "\n";
+ if (to_print++ > 2) break;
+ }
+ }
+
#ifndef ROCKSDB_LITE
// This benchmark stress tests Transactions. For a given --duration (or
// total number of --writes, a Transaction will perform a read-modify-write
}
thread->stats.AddMessage(msg);
- if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
+ if (FLAGS_perf_level > ROCKSDB_NAMESPACE::PerfLevel::kDisable) {
thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
get_perf_context()->ToString());
}
DB* db = SelectDB(thread);
for (int64_t i = 0; i < FLAGS_numdistinct; i++) {
GenerateKeyFromInt(i * max_counter, FLAGS_num, &key);
- s = db->Put(write_options_, key, gen.Generate(value_size_));
+ s = db->Put(write_options_, key, gen.Generate());
if (!s.ok()) {
fprintf(stderr, "Operation failed: %s\n", s.ToString().c_str());
exit(1);
read);
thread->stats.AddBytes(bytes);
thread->stats.AddMessage(msg);
- if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
+ if (FLAGS_perf_level > ROCKSDB_NAMESPACE::PerfLevel::kDisable) {
thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
get_perf_context()->ToString());
}
timestamp_emulator_->Inc();
Status s;
-
- s = db->Put(write_options_, key, gen.Generate(value_size_));
+ Slice val = gen.Generate();
+ s = db->Put(write_options_, key, val);
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
}
- bytes = key.size() + value_size_;
+ bytes = key.size() + val.size();
thread->stats.FinishedOps(&db_, db_.db, 1, kWrite);
thread->stats.AddBytes(bytes);
if (FLAGS_benchmark_write_rate_limit > 0) {
write_rate_limiter->Request(
- entries_per_batch_ * (value_size_ + key_size_), Env::IO_HIGH,
+ key.size() + val.size(), Env::IO_HIGH,
nullptr /* stats */, RateLimiter::OpType::kWrite);
}
}
void Compact(ThreadState* thread) {
DB* db = SelectDB(thread);
CompactRangeOptions cro;
- cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+ cro.bottommost_level_compaction =
+ BottommostLevelCompaction::kForceOptimized;
db->CompactRange(cro, nullptr, nullptr);
}
}
}
+ void PrintStatsHistory() {
+ if (db_.db != nullptr) {
+ PrintStatsHistoryImpl(db_.db, false);
+ }
+ for (const auto& db_with_cfh : multi_dbs_) {
+ PrintStatsHistoryImpl(db_with_cfh.db, true);
+ }
+ }
+
+ void PrintStatsHistoryImpl(DB* db, bool print_header) {
+ if (print_header) {
+ fprintf(stdout, "\n==== DB: %s ===\n", db->GetName().c_str());
+ }
+
+ std::unique_ptr<StatsHistoryIterator> shi;
+ Status s = db->GetStatsHistory(0, port::kMaxUint64, &shi);
+ if (!s.ok()) {
+ fprintf(stdout, "%s\n", s.ToString().c_str());
+ return;
+ }
+ assert(shi);
+ while (shi->Valid()) {
+ uint64_t stats_time = shi->GetStatsTime();
+ fprintf(stdout, "------ %s ------\n",
+ TimeToHumanString(static_cast<int>(stats_time)).c_str());
+ for (auto& entry : shi->GetStatsMap()) {
+ fprintf(stdout, " %" PRIu64 " %s %" PRIu64 "\n", stats_time,
+ entry.first.c_str(), entry.second);
+ }
+ shi->Next();
+ }
+ }
+
void PrintStats(const char* key) {
if (db_.db != nullptr) {
PrintStats(db_.db, key, false);
}
Replayer replayer(db_with_cfh->db, db_with_cfh->cfh,
std::move(trace_reader));
- s = replayer.Replay();
+ replayer.SetFastForward(
+ static_cast<uint32_t>(FLAGS_trace_replay_fast_forward));
+ s = replayer.MultiThreadReplay(
+ static_cast<uint32_t>(FLAGS_trace_replay_threads));
if (s.ok()) {
fprintf(stdout, "Replay started from trace_file: %s\n",
FLAGS_trace_file.c_str());
};
int db_bench_tool(int argc, char** argv) {
- rocksdb::port::InstallStackTraceHandler();
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
static bool initialized = false;
if (!initialized) {
SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) +
initialized = true;
}
ParseCommandLineFlags(&argc, &argv, true);
- FLAGS_compaction_style_e = (rocksdb::CompactionStyle) FLAGS_compaction_style;
+ FLAGS_compaction_style_e =
+ (ROCKSDB_NAMESPACE::CompactionStyle)FLAGS_compaction_style;
#ifndef ROCKSDB_LITE
if (FLAGS_statistics && !FLAGS_statistics_string.empty()) {
fprintf(stderr,
exit(1);
}
if (!FLAGS_statistics_string.empty()) {
- std::unique_ptr<Statistics> custom_stats_guard;
- dbstats.reset(NewCustomObject<Statistics>(FLAGS_statistics_string,
- &custom_stats_guard));
- custom_stats_guard.release();
+ Status s = ObjectRegistry::NewInstance()->NewSharedObject<Statistics>(
+ FLAGS_statistics_string, &dbstats);
if (dbstats == nullptr) {
- fprintf(stderr, "No Statistics registered matching string: %s\n",
- FLAGS_statistics_string.c_str());
+ fprintf(stderr,
+ "No Statistics registered matching string: %s status=%s\n",
+ FLAGS_statistics_string.c_str(), s.ToString().c_str());
exit(1);
}
}
#endif // ROCKSDB_LITE
if (FLAGS_statistics) {
- dbstats = rocksdb::CreateDBStatistics();
+ dbstats = ROCKSDB_NAMESPACE::CreateDBStatistics();
}
if (dbstats) {
dbstats->set_stats_level(static_cast<StatsLevel>(FLAGS_stats_level));
}
- FLAGS_compaction_pri_e = (rocksdb::CompactionPri)FLAGS_compaction_pri;
+ FLAGS_compaction_pri_e =
+ (ROCKSDB_NAMESPACE::CompactionPri)FLAGS_compaction_pri;
- std::vector<std::string> fanout = rocksdb::StringSplit(
+ std::vector<std::string> fanout = ROCKSDB_NAMESPACE::StringSplit(
FLAGS_max_bytes_for_level_multiplier_additional, ',');
for (size_t j = 0; j < fanout.size(); j++) {
FLAGS_max_bytes_for_level_multiplier_additional_v.push_back(
StringToCompressionType(FLAGS_compression_type.c_str());
#ifndef ROCKSDB_LITE
- std::unique_ptr<Env> custom_env_guard;
+ FLAGS_blob_db_compression_type_e =
+ StringToCompressionType(FLAGS_blob_db_compression_type.c_str());
+
if (!FLAGS_hdfs.empty() && !FLAGS_env_uri.empty()) {
fprintf(stderr, "Cannot provide both --hdfs and --env_uri.\n");
exit(1);
} else if (!FLAGS_env_uri.empty()) {
- FLAGS_env = NewCustomObject<Env>(FLAGS_env_uri, &custom_env_guard);
+ Status s = Env::LoadEnv(FLAGS_env_uri, &FLAGS_env, &env_guard);
if (FLAGS_env == nullptr) {
fprintf(stderr, "No Env registered for URI: %s\n", FLAGS_env_uri.c_str());
exit(1);
}
if (!FLAGS_hdfs.empty()) {
- FLAGS_env = new rocksdb::HdfsEnv(FLAGS_hdfs);
+ FLAGS_env = new ROCKSDB_NAMESPACE::HdfsEnv(FLAGS_hdfs);
}
if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "NONE"))
- FLAGS_compaction_fadvice_e = rocksdb::Options::NONE;
+ FLAGS_compaction_fadvice_e = ROCKSDB_NAMESPACE::Options::NONE;
else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "NORMAL"))
- FLAGS_compaction_fadvice_e = rocksdb::Options::NORMAL;
+ FLAGS_compaction_fadvice_e = ROCKSDB_NAMESPACE::Options::NORMAL;
else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "SEQUENTIAL"))
- FLAGS_compaction_fadvice_e = rocksdb::Options::SEQUENTIAL;
+ FLAGS_compaction_fadvice_e = ROCKSDB_NAMESPACE::Options::SEQUENTIAL;
else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "WILLNEED"))
- FLAGS_compaction_fadvice_e = rocksdb::Options::WILLNEED;
+ FLAGS_compaction_fadvice_e = ROCKSDB_NAMESPACE::Options::WILLNEED;
else {
fprintf(stdout, "Unknown compaction fadvice:%s\n",
FLAGS_compaction_fadvice.c_str());
}
+ FLAGS_value_size_distribution_type_e =
+ StringToDistributionType(FLAGS_value_size_distribution_type.c_str());
+
FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str());
// Note options sanitization may increase thread pool sizes according to
// max_background_flushes/max_background_compactions/max_background_jobs
FLAGS_env->SetBackgroundThreads(FLAGS_num_high_pri_threads,
- rocksdb::Env::Priority::HIGH);
+ ROCKSDB_NAMESPACE::Env::Priority::HIGH);
FLAGS_env->SetBackgroundThreads(FLAGS_num_bottom_pri_threads,
- rocksdb::Env::Priority::BOTTOM);
+ ROCKSDB_NAMESPACE::Env::Priority::BOTTOM);
FLAGS_env->SetBackgroundThreads(FLAGS_num_low_pri_threads,
- rocksdb::Env::Priority::LOW);
+ ROCKSDB_NAMESPACE::Env::Priority::LOW);
// Choose a location for the test database if none given with --db=<path>
if (FLAGS_db.empty()) {
std::string default_db_path;
- rocksdb::Env::Default()->GetTestDirectory(&default_db_path);
+ FLAGS_env->GetTestDirectory(&default_db_path);
default_db_path += "/dbbench";
FLAGS_db = default_db_path;
}
FLAGS_stats_interval = 1000;
}
- rocksdb::Benchmark benchmark;
+ if (FLAGS_seek_missing_prefix && FLAGS_prefix_size <= 8) {
+ fprintf(stderr, "prefix_size > 8 required by --seek_missing_prefix\n");
+ exit(1);
+ }
+
+ ROCKSDB_NAMESPACE::Benchmark benchmark;
benchmark.Run();
#ifndef ROCKSDB_LITE
if (FLAGS_print_malloc_stats) {
std::string stats_string;
- rocksdb::DumpMallocStats(&stats_string);
+ ROCKSDB_NAMESPACE::DumpMallocStats(&stats_string);
fprintf(stdout, "Malloc stats:\n%s\n", stats_string.c_str());
}
#endif // ROCKSDB_LITE
return 0;
}
-} // namespace rocksdb
+} // namespace ROCKSDB_NAMESPACE
#endif