]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/tools/db_bench_tool.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / tools / db_bench_tool.cc
index 0cb4e0eb27ec185d085d9db4f4c8f74e4ec1b6ac..5c2ca01e60aa1de9aed4212f9785cacba3edc782 100644 (file)
@@ -7,10 +7,6 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file. See the AUTHORS file for names of contributors.
 
-#ifndef __STDC_FORMAT_MACROS
-#define __STDC_FORMAT_MACROS
-#endif
-
 #ifdef GFLAGS
 #ifdef NUMA
 #include <numa.h>
 #include <unistd.h>
 #endif
 #include <fcntl.h>
-#include <inttypes.h>
-#include <math.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <sys/types.h>
 #include <atomic>
+#include <cinttypes>
 #include <condition_variable>
 #include <cstddef>
 #include <memory>
@@ -33,7 +28,7 @@
 #include <thread>
 #include <unordered_map>
 
-#include "db/db_impl.h"
+#include "db/db_impl/db_impl.h"
 #include "db/malloc_stats.h"
 #include "db/version_set.h"
 #include "hdfs/env_hdfs.h"
@@ -53,6 +48,7 @@
 #include "rocksdb/rate_limiter.h"
 #include "rocksdb/slice.h"
 #include "rocksdb/slice_transform.h"
+#include "rocksdb/stats_history.h"
 #include "rocksdb/utilities/object_registry.h"
 #include "rocksdb/utilities/optimistic_transaction_db.h"
 #include "rocksdb/utilities/options_util.h"
@@ -60,6 +56,8 @@
 #include "rocksdb/utilities/transaction.h"
 #include "rocksdb/utilities/transaction_db.h"
 #include "rocksdb/write_batch.h"
+#include "test_util/testutil.h"
+#include "test_util/transaction_test_util.h"
 #include "util/cast_util.h"
 #include "util/compression.h"
 #include "util/crc32c.h"
 #include "util/random.h"
 #include "util/stderr_logger.h"
 #include "util/string_util.h"
-#include "util/testutil.h"
-#include "util/transaction_test_util.h"
 #include "util/xxhash.h"
 #include "utilities/blob_db/blob_db.h"
 #include "utilities/merge_operators.h"
 #include "utilities/merge_operators/bytesxor.h"
+#include "utilities/merge_operators/sortlist.h"
 #include "utilities/persistent_cache/block_cache_tier.h"
 
 #ifdef OS_WIN
@@ -105,6 +102,7 @@ DEFINE_string(
     "multireadrandom,"
     "mixgraph,"
     "readseq,"
+    "readtorowcache,"
     "readtocache,"
     "readreverse,"
     "readwhilewriting,"
@@ -123,7 +121,8 @@ DEFINE_string(
     "fillseekseq,"
     "randomtransaction,"
     "randomreplacekeys,"
-    "timeseries",
+    "timeseries,"
+    "getmergeoperands",
 
     "Comma-separated list of operations to run in the specified"
     " order. Available benchmarks:\n"
@@ -137,7 +136,7 @@ DEFINE_string(
     " key order and keep the shape of the LSM tree\n"
     "\toverwrite     -- overwrite N values in random key order in"
     " async mode\n"
-    "\tfillsync      -- write N/100 values in random key order in "
+    "\tfillsync      -- write N/1000 values in random key order in "
     "sync mode\n"
     "\tfill100K      -- write N/1000 100K values in random order in"
     " async mode\n"
@@ -193,7 +192,13 @@ DEFINE_string(
     "\tlevelstats  -- Print the number of files and bytes per level\n"
     "\tsstables    -- Print sstable info\n"
     "\theapprofile -- Dump a heap profile (if supported by this port)\n"
-    "\treplay      -- replay the trace file specified with trace_file\n");
+    "\treplay      -- replay the trace file specified with trace_file\n"
+    "\tgetmergeoperands -- Insert lots of merge records which are a list of "
+    "sorted ints for a key and then compare performance of lookup for another "
+    "key "
+    "by doing a Get followed by binary searching in the large sorted list vs "
+    "doing a GetMergeOperands and binary searching in the operands which are"
+    "sorted sub-lists. The MergeOperator used is sortlist.h\n");
 
 DEFINE_int64(num, 1000000, "Number of key/values to place in database");
 
@@ -240,7 +245,15 @@ DEFINE_int32(threads, 1, "Number of concurrent threads to run.");
 DEFINE_int32(duration, 0, "Time in seconds for the random-ops tests to run."
              " When 0 then num & reads determine the test duration");
 
-DEFINE_int32(value_size, 100, "Size of each value");
+DEFINE_string(value_size_distribution_type, "fixed",
+              "Value size distribution type: fixed, uniform, normal");
+
+DEFINE_int32(value_size, 100, "Size of each value in fixed distribution");
+static unsigned int value_size = 100;
+
+DEFINE_int32(value_size_min, 100, "Min size of random value");
+
+DEFINE_int32(value_size_max, 102400, "Max size of random value");
 
 DEFINE_int32(seek_nexts, 0,
              "How many times to call Next() after Seek() in "
@@ -296,22 +309,23 @@ DEFINE_bool(enable_numa, false,
             "CPU and memory of same node. Use \"$numactl --hardware\" command "
             "to see NUMA memory architecture.");
 
-DEFINE_int64(db_write_buffer_size, rocksdb::Options().db_write_buffer_size,
+DEFINE_int64(db_write_buffer_size,
+             ROCKSDB_NAMESPACE::Options().db_write_buffer_size,
              "Number of bytes to buffer in all memtables before compacting");
 
 DEFINE_bool(cost_write_buffer_to_cache, false,
             "The usage of memtable is costed to the block cache");
 
-DEFINE_int64(write_buffer_size, rocksdb::Options().write_buffer_size,
+DEFINE_int64(write_buffer_size, ROCKSDB_NAMESPACE::Options().write_buffer_size,
              "Number of bytes to buffer in memtable before compacting");
 
 DEFINE_int32(max_write_buffer_number,
-             rocksdb::Options().max_write_buffer_number,
+             ROCKSDB_NAMESPACE::Options().max_write_buffer_number,
              "The number of in-memory memtables. Each memtable is of size"
              " write_buffer_size bytes.");
 
 DEFINE_int32(min_write_buffer_number_to_merge,
-             rocksdb::Options().min_write_buffer_number_to_merge,
+             ROCKSDB_NAMESPACE::Options().min_write_buffer_number_to_merge,
              "The minimum number of write buffers that will be merged together"
              "before writing to storage. This is cheap because it is an"
              "in-memory merge. If this feature is not enabled, then all these"
@@ -322,7 +336,7 @@ DEFINE_int32(min_write_buffer_number_to_merge,
              " in each of these individual write buffers.");
 
 DEFINE_int32(max_write_buffer_number_to_maintain,
-             rocksdb::Options().max_write_buffer_number_to_maintain,
+             ROCKSDB_NAMESPACE::Options().max_write_buffer_number_to_maintain,
              "The total maximum number of write buffers to maintain in memory "
              "including copies of buffers that have already been flushed. "
              "Unlike max_write_buffer_number, this parameter does not affect "
@@ -335,8 +349,22 @@ DEFINE_int32(max_write_buffer_number_to_maintain,
              "after they are flushed.  If this value is set to -1, "
              "'max_write_buffer_number' will be used.");
 
+DEFINE_int64(max_write_buffer_size_to_maintain,
+             ROCKSDB_NAMESPACE::Options().max_write_buffer_size_to_maintain,
+             "The total maximum size of write buffers to maintain in memory "
+             "including copies of buffers that have already been flushed. "
+             "Unlike max_write_buffer_number, this parameter does not affect "
+             "flushing. This controls the minimum amount of write history "
+             "that will be available in memory for conflict checking when "
+             "Transactions are used. If this value is too low, some "
+             "transactions may fail at commit time due to not being able to "
+             "determine whether there were any write conflicts. Setting this "
+             "value to 0 will cause write buffers to be freed immediately "
+             "after they are flushed.  If this value is set to -1, "
+             "'max_write_buffer_number' will be used.");
+
 DEFINE_int32(max_background_jobs,
-             rocksdb::Options().max_background_jobs,
+             ROCKSDB_NAMESPACE::Options().max_background_jobs,
              "The maximum number of concurrent background jobs that can occur "
              "in parallel.");
 
@@ -353,7 +381,7 @@ DEFINE_int32(num_low_pri_threads, 0,
              " that can occur in parallel.");
 
 DEFINE_int32(max_background_compactions,
-             rocksdb::Options().max_background_compactions,
+             ROCKSDB_NAMESPACE::Options().max_background_compactions,
              "The maximum number of concurrent background compactions"
              " that can occur in parallel.");
 
@@ -367,16 +395,18 @@ static const bool FLAGS_subcompactions_dummy
                                                     &ValidateUint32Range);
 
 DEFINE_int32(max_background_flushes,
-             rocksdb::Options().max_background_flushes,
+             ROCKSDB_NAMESPACE::Options().max_background_flushes,
              "The maximum number of concurrent background flushes"
              " that can occur in parallel.");
 
-static rocksdb::CompactionStyle FLAGS_compaction_style_e;
-DEFINE_int32(compaction_style, (int32_t) rocksdb::Options().compaction_style,
+static ROCKSDB_NAMESPACE::CompactionStyle FLAGS_compaction_style_e;
+DEFINE_int32(compaction_style,
+             (int32_t)ROCKSDB_NAMESPACE::Options().compaction_style,
              "style of compaction: level-based, universal and fifo");
 
-static rocksdb::CompactionPri FLAGS_compaction_pri_e;
-DEFINE_int32(compaction_pri, (int32_t)rocksdb::Options().compaction_pri,
+static ROCKSDB_NAMESPACE::CompactionPri FLAGS_compaction_pri_e;
+DEFINE_int32(compaction_pri,
+             (int32_t)ROCKSDB_NAMESPACE::Options().compaction_pri,
              "priority of files to compaction: by size or by data age");
 
 DEFINE_int32(universal_size_ratio, 0,
@@ -428,7 +458,7 @@ DEFINE_bool(partition_index_and_filters, false,
 DEFINE_bool(partition_index, false, "Partition index blocks");
 
 DEFINE_int64(metadata_block_size,
-             rocksdb::BlockBasedTableOptions().metadata_block_size,
+             ROCKSDB_NAMESPACE::BlockBasedTableOptions().metadata_block_size,
              "Max partition size when partitioning index/filters");
 
 // The default reduces the overhead of reading time with flash. With HDD, which
@@ -444,33 +474,37 @@ DEFINE_bool(
     "Pin top-level index of partitioned index/filter blocks in block cache.");
 
 DEFINE_int32(block_size,
-             static_cast<int32_t>(rocksdb::BlockBasedTableOptions().block_size),
+             static_cast<int32_t>(
+                 ROCKSDB_NAMESPACE::BlockBasedTableOptions().block_size),
              "Number of bytes in a block.");
 
-DEFINE_int32(
-    format_version,
-    static_cast<int32_t>(rocksdb::BlockBasedTableOptions().format_version),
-    "Format version of SST files.");
+DEFINE_int32(format_version,
+             static_cast<int32_t>(
+                 ROCKSDB_NAMESPACE::BlockBasedTableOptions().format_version),
+             "Format version of SST files.");
 
 DEFINE_int32(block_restart_interval,
-             rocksdb::BlockBasedTableOptions().block_restart_interval,
+             ROCKSDB_NAMESPACE::BlockBasedTableOptions().block_restart_interval,
              "Number of keys between restart points "
              "for delta encoding of keys in data block.");
 
-DEFINE_int32(index_block_restart_interval,
-             rocksdb::BlockBasedTableOptions().index_block_restart_interval,
-             "Number of keys between restart points "
-             "for delta encoding of keys in index block.");
+DEFINE_int32(
+    index_block_restart_interval,
+    ROCKSDB_NAMESPACE::BlockBasedTableOptions().index_block_restart_interval,
+    "Number of keys between restart points "
+    "for delta encoding of keys in index block.");
 
 DEFINE_int32(read_amp_bytes_per_bit,
-             rocksdb::BlockBasedTableOptions().read_amp_bytes_per_bit,
+             ROCKSDB_NAMESPACE::BlockBasedTableOptions().read_amp_bytes_per_bit,
              "Number of bytes per bit to be used in block read-amp bitmap");
 
-DEFINE_bool(enable_index_compression,
-            rocksdb::BlockBasedTableOptions().enable_index_compression,
-            "Compress the index block");
+DEFINE_bool(
+    enable_index_compression,
+    ROCKSDB_NAMESPACE::BlockBasedTableOptions().enable_index_compression,
+    "Compress the index block");
 
-DEFINE_bool(block_align, rocksdb::BlockBasedTableOptions().block_align,
+DEFINE_bool(block_align,
+            ROCKSDB_NAMESPACE::BlockBasedTableOptions().block_align,
             "Align data blocks on page size");
 
 DEFINE_bool(use_data_block_hash_index, false,
@@ -490,11 +524,12 @@ DEFINE_int64(row_cache_size, 0,
              "Number of bytes to use as a cache of individual rows"
              " (0 = disabled).");
 
-DEFINE_int32(open_files, rocksdb::Options().max_open_files,
+DEFINE_int32(open_files, ROCKSDB_NAMESPACE::Options().max_open_files,
              "Maximum number of files to keep open at the same time"
              " (use default if == 0)");
 
-DEFINE_int32(file_opening_threads, rocksdb::Options().max_file_opening_threads,
+DEFINE_int32(file_opening_threads,
+             ROCKSDB_NAMESPACE::Options().max_file_opening_threads,
              "If open_files is set to -1, this option set the number of "
              "threads that will be used to open files during DB::Open()");
 
@@ -503,6 +538,8 @@ DEFINE_bool(new_table_reader_for_compaction_inputs, true,
 
 DEFINE_int32(compaction_readahead_size, 0, "Compaction readahead size");
 
+DEFINE_int32(log_readahead_size, 0, "WAL and manifest readahead size");
+
 DEFINE_int32(random_access_max_buffer_size, 1024 * 1024,
              "Maximum windows randomaccess buffer size");
 
@@ -568,10 +605,10 @@ DEFINE_bool(verify_checksum, true,
             " from storage");
 
 DEFINE_bool(statistics, false, "Database statistics");
-DEFINE_int32(stats_level, rocksdb::StatsLevel::kExceptDetailedTimers,
+DEFINE_int32(stats_level, ROCKSDB_NAMESPACE::StatsLevel::kExceptDetailedTimers,
              "stats level for statistics");
 DEFINE_string(statistics_string, "", "Serialized statistics string");
-static class std::shared_ptr<rocksdb::Statistics> dbstats;
+static class std::shared_ptr<ROCKSDB_NAMESPACE::Statistics> dbstats;
 
 DEFINE_int64(writes, -1, "Number of write operations to do. If negative, do"
              " --num reads.");
@@ -591,15 +628,16 @@ DEFINE_string(truth_db, "/dev/shm/truth_db/dbbench",
 
 DEFINE_int32(num_levels, 7, "The total number of levels");
 
-DEFINE_int64(target_file_size_base, rocksdb::Options().target_file_size_base,
+DEFINE_int64(target_file_size_base,
+             ROCKSDB_NAMESPACE::Options().target_file_size_base,
              "Target file size at level-1");
 
 DEFINE_int32(target_file_size_multiplier,
-             rocksdb::Options().target_file_size_multiplier,
+             ROCKSDB_NAMESPACE::Options().target_file_size_multiplier,
              "A multiplier to compute target level-N file size (N >= 2)");
 
 DEFINE_uint64(max_bytes_for_level_base,
-              rocksdb::Options().max_bytes_for_level_base,
+              ROCKSDB_NAMESPACE::Options().max_bytes_for_level_base,
               "Max bytes for level-1");
 
 DEFINE_bool(level_compaction_dynamic_level_bytes, false,
@@ -613,17 +651,17 @@ DEFINE_string(max_bytes_for_level_multiplier_additional, "",
               "A vector that specifies additional fanout per level");
 
 DEFINE_int32(level0_stop_writes_trigger,
-             rocksdb::Options().level0_stop_writes_trigger,
+             ROCKSDB_NAMESPACE::Options().level0_stop_writes_trigger,
              "Number of files in level-0"
              " that will trigger put stop.");
 
 DEFINE_int32(level0_slowdown_writes_trigger,
-             rocksdb::Options().level0_slowdown_writes_trigger,
+             ROCKSDB_NAMESPACE::Options().level0_slowdown_writes_trigger,
              "Number of files in level-0"
              " that will slow down writes.");
 
 DEFINE_int32(level0_file_num_compaction_trigger,
-             rocksdb::Options().level0_file_num_compaction_trigger,
+             ROCKSDB_NAMESPACE::Options().level0_file_num_compaction_trigger,
              "Number of files in level-0"
              " when compactions start");
 
@@ -730,28 +768,63 @@ DEFINE_bool(use_blob_db, false,
             "Open a BlobDB instance. "
             "Required for large value benchmark.");
 
-DEFINE_bool(blob_db_enable_gc, false, "Enable BlobDB garbage collection.");
+DEFINE_bool(
+    blob_db_enable_gc,
+    ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().enable_garbage_collection,
+    "Enable BlobDB garbage collection.");
 
-DEFINE_bool(blob_db_is_fifo, false, "Enable FIFO eviction strategy in BlobDB.");
+DEFINE_double(
+    blob_db_gc_cutoff,
+    ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().garbage_collection_cutoff,
+    "Cutoff ratio for BlobDB garbage collection.");
 
-DEFINE_uint64(blob_db_max_db_size, 0,
+DEFINE_bool(blob_db_is_fifo,
+            ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().is_fifo,
+            "Enable FIFO eviction strategy in BlobDB.");
+
+DEFINE_uint64(blob_db_max_db_size,
+              ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().max_db_size,
               "Max size limit of the directory where blob files are stored.");
 
-DEFINE_uint64(blob_db_max_ttl_range, 86400,
-              "TTL range to generate BlobDB data (in seconds).");
+DEFINE_uint64(
+    blob_db_max_ttl_range, 0,
+    "TTL range to generate BlobDB data (in seconds). 0 means no TTL.");
 
-DEFINE_uint64(blob_db_ttl_range_secs, 3600,
+DEFINE_uint64(blob_db_ttl_range_secs,
+              ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().ttl_range_secs,
               "TTL bucket size to use when creating blob files.");
 
-DEFINE_uint64(blob_db_min_blob_size, 0,
+DEFINE_uint64(blob_db_min_blob_size,
+              ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().min_blob_size,
               "Smallest blob to store in a file. Blobs smaller than this "
               "will be inlined with the key in the LSM tree.");
 
-DEFINE_uint64(blob_db_bytes_per_sync, 0, "Bytes to sync blob file at.");
+DEFINE_uint64(blob_db_bytes_per_sync,
+              ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().bytes_per_sync,
+              "Bytes to sync blob file at.");
 
-DEFINE_uint64(blob_db_file_size, 256 * 1024 * 1024,
+DEFINE_uint64(blob_db_file_size,
+              ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().blob_file_size,
               "Target size of each blob file.");
 
+DEFINE_string(blob_db_compression_type, "snappy",
+              "Algorithm to use to compress blob in blob file");
+static enum ROCKSDB_NAMESPACE::CompressionType
+    FLAGS_blob_db_compression_type_e = ROCKSDB_NAMESPACE::kSnappyCompression;
+
+// Secondary DB instance Options
+DEFINE_bool(use_secondary_db, false,
+            "Open a RocksDB secondary instance. A primary instance can be "
+            "running in another db_bench process.");
+
+DEFINE_string(secondary_path, "",
+              "Path to a directory used by the secondary instance to store "
+              "private files, e.g. info log.");
+
+DEFINE_int32(secondary_update_interval, 5,
+             "Secondary instance attempts to catch up with the primary every "
+             "secondary_update_interval seconds.");
+
 #endif  // ROCKSDB_LITE
 
 DEFINE_bool(report_bg_io_stats, false,
@@ -762,33 +835,49 @@ DEFINE_bool(use_stderr_info_logger, false,
 
 DEFINE_string(trace_file, "", "Trace workload to a file. ");
 
-static enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
+DEFINE_int32(trace_replay_fast_forward, 1,
+             "Fast forward trace replay, must >= 1. ");
+DEFINE_int32(block_cache_trace_sampling_frequency, 1,
+             "Block cache trace sampling frequency, termed s. It uses spatial "
+             "downsampling and samples accesses to one out of s blocks.");
+DEFINE_int64(
+    block_cache_trace_max_trace_file_size_in_bytes,
+    uint64_t{64} * 1024 * 1024 * 1024,
+    "The maximum block cache trace file size in bytes. Block cache accesses "
+    "will not be logged if the trace file size exceeds this threshold. Default "
+    "is 64 GB.");
+DEFINE_string(block_cache_trace_file, "", "Block cache trace file path.");
+DEFINE_int32(trace_replay_threads, 1,
+             "The number of threads to replay, must >=1.");
+
+static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType(
+    const char* ctype) {
   assert(ctype);
 
   if (!strcasecmp(ctype, "none"))
-    return rocksdb::kNoCompression;
+    return ROCKSDB_NAMESPACE::kNoCompression;
   else if (!strcasecmp(ctype, "snappy"))
-    return rocksdb::kSnappyCompression;
+    return ROCKSDB_NAMESPACE::kSnappyCompression;
   else if (!strcasecmp(ctype, "zlib"))
-    return rocksdb::kZlibCompression;
+    return ROCKSDB_NAMESPACE::kZlibCompression;
   else if (!strcasecmp(ctype, "bzip2"))
-    return rocksdb::kBZip2Compression;
+    return ROCKSDB_NAMESPACE::kBZip2Compression;
   else if (!strcasecmp(ctype, "lz4"))
-    return rocksdb::kLZ4Compression;
+    return ROCKSDB_NAMESPACE::kLZ4Compression;
   else if (!strcasecmp(ctype, "lz4hc"))
-    return rocksdb::kLZ4HCCompression;
+    return ROCKSDB_NAMESPACE::kLZ4HCCompression;
   else if (!strcasecmp(ctype, "xpress"))
-    return rocksdb::kXpressCompression;
+    return ROCKSDB_NAMESPACE::kXpressCompression;
   else if (!strcasecmp(ctype, "zstd"))
-    return rocksdb::kZSTD;
+    return ROCKSDB_NAMESPACE::kZSTD;
 
   fprintf(stdout, "Cannot parse compression type '%s'\n", ctype);
-  return rocksdb::kSnappyCompression;  // default value
+  return ROCKSDB_NAMESPACE::kSnappyCompression;  // default value
 }
 
 static std::string ColumnFamilyName(size_t i) {
   if (i == 0) {
-    return rocksdb::kDefaultColumnFamilyName;
+    return ROCKSDB_NAMESPACE::kDefaultColumnFamilyName;
   } else {
     char name[100];
     snprintf(name, sizeof(name), "column_family_name_%06zu", i);
@@ -798,23 +887,23 @@ static std::string ColumnFamilyName(size_t i) {
 
 DEFINE_string(compression_type, "snappy",
               "Algorithm to use to compress the database");
-static enum rocksdb::CompressionType FLAGS_compression_type_e =
-    rocksdb::kSnappyCompression;
+static enum ROCKSDB_NAMESPACE::CompressionType FLAGS_compression_type_e =
+    ROCKSDB_NAMESPACE::kSnappyCompression;
 
 DEFINE_int64(sample_for_compression, 0, "Sample every N block for compression");
 
-DEFINE_int32(compression_level, rocksdb::CompressionOptions().level,
+DEFINE_int32(compression_level, ROCKSDB_NAMESPACE::CompressionOptions().level,
              "Compression level. The meaning of this value is library-"
              "dependent. If unset, we try to use the default for the library "
              "specified in `--compression_type`");
 
 DEFINE_int32(compression_max_dict_bytes,
-             rocksdb::CompressionOptions().max_dict_bytes,
+             ROCKSDB_NAMESPACE::CompressionOptions().max_dict_bytes,
              "Maximum size of dictionary used to prime the compression "
              "library.");
 
 DEFINE_int32(compression_zstd_max_train_bytes,
-             rocksdb::CompressionOptions().zstd_max_train_bytes,
+             ROCKSDB_NAMESPACE::CompressionOptions().zstd_max_train_bytes,
              "Maximum size of training data passed to zstd's dictionary "
              "trainer.");
 
@@ -840,7 +929,10 @@ DEFINE_string(env_uri, "", "URI for registry Env lookup. Mutually exclusive"
 #endif  // ROCKSDB_LITE
 DEFINE_string(hdfs, "", "Name of hdfs environment. Mutually exclusive with"
               " --env_uri.");
-static rocksdb::Env* FLAGS_env = rocksdb::Env::Default();
+
+static std::shared_ptr<ROCKSDB_NAMESPACE::Env> env_guard;
+
+static ROCKSDB_NAMESPACE::Env* FLAGS_env = ROCKSDB_NAMESPACE::Env::Default();
 
 DEFINE_int64(stats_interval, 0, "Stats are reported every N operations when "
              "this is greater than zero. When 0 the interval grows over time.");
@@ -863,7 +955,8 @@ DEFINE_int32(thread_status_per_interval, 0,
              "Takes and report a snapshot of the current status of each thread"
              " when this is greater than 0.");
 
-DEFINE_int32(perf_level, rocksdb::PerfLevel::kDisable, "Level of perf collection");
+DEFINE_int32(perf_level, ROCKSDB_NAMESPACE::PerfLevel::kDisable,
+             "Level of perf collection");
 
 static bool ValidateRateLimit(const char* flagname, double value) {
   const double EPSILON = 1e-10;
@@ -891,14 +984,18 @@ DEFINE_uint64(delayed_write_rate, 8388608u,
 DEFINE_bool(enable_pipelined_write, true,
             "Allow WAL and memtable writes to be pipelined");
 
+DEFINE_bool(unordered_write, false,
+            "Allow WAL and memtable writes to be pipelined");
+
 DEFINE_bool(allow_concurrent_memtable_write, true,
             "Allow multi-writers to update mem tables in parallel.");
 
-DEFINE_bool(inplace_update_support, rocksdb::Options().inplace_update_support,
+DEFINE_bool(inplace_update_support,
+            ROCKSDB_NAMESPACE::Options().inplace_update_support,
             "Support in-place memtable update for smaller or same-size values");
 
 DEFINE_uint64(inplace_update_num_locks,
-              rocksdb::Options().inplace_update_num_locks,
+              ROCKSDB_NAMESPACE::Options().inplace_update_num_locks,
               "Number of RW locks to protect in-place memtable updates");
 
 DEFINE_bool(enable_write_thread_adaptive_yield, true,
@@ -950,6 +1047,22 @@ DEFINE_uint64(
     "is the global rate in bytes/second.");
 
 // the parameters of mix_graph
+DEFINE_double(keyrange_dist_a, 0.0,
+              "The parameter 'a' of prefix average access distribution "
+              "f(x)=a*exp(b*x)+c*exp(d*x)");
+DEFINE_double(keyrange_dist_b, 0.0,
+              "The parameter 'b' of prefix average access distribution "
+              "f(x)=a*exp(b*x)+c*exp(d*x)");
+DEFINE_double(keyrange_dist_c, 0.0,
+              "The parameter 'c' of prefix average access distribution"
+              "f(x)=a*exp(b*x)+c*exp(d*x)");
+DEFINE_double(keyrange_dist_d, 0.0,
+              "The parameter 'd' of prefix average access distribution"
+              "f(x)=a*exp(b*x)+c*exp(d*x)");
+DEFINE_int64(keyrange_num, 1,
+             "The number of key ranges that are in the same prefix "
+             "group, each prefix range will have its key acccess "
+             "distribution");
 DEFINE_double(key_dist_a, 0.0,
               "The parameter 'a' of key access distribution model "
               "f(x)=a*x^b");
@@ -1000,7 +1113,8 @@ DEFINE_uint64(
     "If non-zero, db_bench will rate-limit the reads from RocksDB. This "
     "is the global rate in ops/second.");
 
-DEFINE_uint64(max_compaction_bytes, rocksdb::Options().max_compaction_bytes,
+DEFINE_uint64(max_compaction_bytes,
+              ROCKSDB_NAMESPACE::Options().max_compaction_bytes,
               "Max bytes allowed in one compaction");
 
 #ifndef ROCKSDB_LITE
@@ -1017,39 +1131,41 @@ DEFINE_uint64(wal_size_limit_MB, 0, "Set the size limit for the WAL Files"
               " in MB.");
 DEFINE_uint64(max_total_wal_size, 0, "Set total max WAL size");
 
-DEFINE_bool(mmap_read, rocksdb::Options().allow_mmap_reads,
+DEFINE_bool(mmap_read, ROCKSDB_NAMESPACE::Options().allow_mmap_reads,
             "Allow reads to occur via mmap-ing files");
 
-DEFINE_bool(mmap_write, rocksdb::Options().allow_mmap_writes,
+DEFINE_bool(mmap_write, ROCKSDB_NAMESPACE::Options().allow_mmap_writes,
             "Allow writes to occur via mmap-ing files");
 
-DEFINE_bool(use_direct_reads, rocksdb::Options().use_direct_reads,
+DEFINE_bool(use_direct_reads, ROCKSDB_NAMESPACE::Options().use_direct_reads,
             "Use O_DIRECT for reading data");
 
 DEFINE_bool(use_direct_io_for_flush_and_compaction,
-            rocksdb::Options().use_direct_io_for_flush_and_compaction,
+            ROCKSDB_NAMESPACE::Options().use_direct_io_for_flush_and_compaction,
             "Use O_DIRECT for background flush and compaction writes");
 
-DEFINE_bool(advise_random_on_open, rocksdb::Options().advise_random_on_open,
+DEFINE_bool(advise_random_on_open,
+            ROCKSDB_NAMESPACE::Options().advise_random_on_open,
             "Advise random access on table file open");
 
 DEFINE_string(compaction_fadvice, "NORMAL",
               "Access pattern advice when a file is compacted");
 static auto FLAGS_compaction_fadvice_e =
-  rocksdb::Options().access_hint_on_compaction_start;
+    ROCKSDB_NAMESPACE::Options().access_hint_on_compaction_start;
 
 DEFINE_bool(use_tailing_iterator, false,
             "Use tailing iterator to access a series of keys instead of get");
 
-DEFINE_bool(use_adaptive_mutex, rocksdb::Options().use_adaptive_mutex,
+DEFINE_bool(use_adaptive_mutex, ROCKSDB_NAMESPACE::Options().use_adaptive_mutex,
             "Use adaptive mutex");
 
-DEFINE_uint64(bytes_per_sync,  rocksdb::Options().bytes_per_sync,
+DEFINE_uint64(bytes_per_sync, ROCKSDB_NAMESPACE::Options().bytes_per_sync,
               "Allows OS to incrementally sync SST files to disk while they are"
               " being written, in the background. Issue one request for every"
               " bytes_per_sync written. 0 turns it off.");
 
-DEFINE_uint64(wal_bytes_per_sync,  rocksdb::Options().wal_bytes_per_sync,
+DEFINE_uint64(wal_bytes_per_sync,
+              ROCKSDB_NAMESPACE::Options().wal_bytes_per_sync,
               "Allows OS to incrementally sync WAL files to disk while they are"
               " being written, in the background. Issue one request for every"
               " wal_bytes_per_sync written. 0 turns it off.");
@@ -1090,11 +1206,20 @@ static bool ValidatePrefixSize(const char* flagname, int32_t value) {
   }
   return true;
 }
+
 DEFINE_int32(prefix_size, 0, "control the prefix size for HashSkipList and "
              "plain table");
 DEFINE_int64(keys_per_prefix, 0, "control average number of keys generated "
              "per prefix, 0 means no special handling of the prefix, "
              "i.e. use the prefix comes with the generated random number.");
+DEFINE_bool(total_order_seek, false,
+            "Enable total order seek regardless of index format.");
+DEFINE_bool(prefix_same_as_start, false,
+            "Enforce iterator to return keys with prefix same as seek key.");
+DEFINE_bool(
+    seek_missing_prefix, false,
+    "Iterator seek to keys with non-exist prefixes. Require prefix_size > 8");
+
 DEFINE_int32(memtable_insert_with_hint_prefix_size, 0,
              "If non-zero, enable "
              "memtable insert with hint with the given prefix size.");
@@ -1106,14 +1231,21 @@ DEFINE_bool(identity_as_first_hash, false, "the first hash function of cuckoo "
             "table becomes an identity function. This is only valid when key "
             "is 8 bytes");
 DEFINE_bool(dump_malloc_stats, true, "Dump malloc stats in LOG ");
-DEFINE_uint64(stats_dump_period_sec, rocksdb::Options().stats_dump_period_sec,
+DEFINE_uint64(stats_dump_period_sec,
+              ROCKSDB_NAMESPACE::Options().stats_dump_period_sec,
               "Gap between printing stats to log in seconds");
 DEFINE_uint64(stats_persist_period_sec,
-              rocksdb::Options().stats_persist_period_sec,
+              ROCKSDB_NAMESPACE::Options().stats_persist_period_sec,
               "Gap between persisting stats in seconds");
+DEFINE_bool(persist_stats_to_disk,
+            ROCKSDB_NAMESPACE::Options().persist_stats_to_disk,
+            "whether to persist stats to disk");
 DEFINE_uint64(stats_history_buffer_size,
-              rocksdb::Options().stats_history_buffer_size,
+              ROCKSDB_NAMESPACE::Options().stats_history_buffer_size,
               "Max number of stats snapshots to keep in memory");
+DEFINE_int64(multiread_stride, 0,
+             "Stride length for the keys in a MultiGet batch");
+DEFINE_bool(multiread_batched, false, "Use the new MultiGet API");
 
 enum RepFactory {
   kSkipList,
@@ -1160,6 +1292,7 @@ DEFINE_int32(skip_list_lookahead, 0, "Used with skip_list memtablerep; try "
              "position");
 DEFINE_bool(report_file_operations, false, "if report number of file "
             "operations");
+DEFINE_int32(readahead_size, 0, "Iterator readahead size");
 
 static const bool FLAGS_soft_rate_limit_dummy __attribute__((__unused__)) =
     RegisterFlagValidator(&FLAGS_soft_rate_limit, &ValidateRateLimit);
@@ -1189,7 +1322,7 @@ static const bool FLAGS_table_cache_numshardbits_dummy __attribute__((__unused__
     RegisterFlagValidator(&FLAGS_table_cache_numshardbits,
                           &ValidateTableCacheNumshardbits);
 
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
 
 namespace {
 struct ReportFileOpCounters {
@@ -1318,20 +1451,135 @@ class ReportFileOpEnv : public EnvWrapper {
 
 }  // namespace
 
+enum DistributionType : unsigned char {
+  kFixed = 0,
+  kUniform,
+  kNormal
+};
+
+static enum DistributionType FLAGS_value_size_distribution_type_e = kFixed;
+
+static enum DistributionType StringToDistributionType(const char* ctype) {
+  assert(ctype);
+
+  if (!strcasecmp(ctype, "fixed"))
+    return kFixed;
+  else if (!strcasecmp(ctype, "uniform"))
+    return kUniform;
+  else if (!strcasecmp(ctype, "normal"))
+    return kNormal;
+
+  fprintf(stdout, "Cannot parse distribution type '%s'\n", ctype);
+  return kFixed;  // default value
+}
+
+class BaseDistribution {
+ public:
+  BaseDistribution(unsigned int min, unsigned int max) :
+    min_value_size_(min),
+    max_value_size_(max) {}
+  virtual ~BaseDistribution() {}
+
+  unsigned int Generate() {
+    auto val = Get();
+    if (NeedTruncate()) {
+      val = std::max(min_value_size_, val);
+      val = std::min(max_value_size_, val);
+    }
+    return val;
+  }
+ private:
+  virtual unsigned int Get() = 0;
+  virtual bool NeedTruncate() {
+    return true;
+  }
+  unsigned int min_value_size_;
+  unsigned int max_value_size_;
+};
+
+class FixedDistribution : public BaseDistribution
+{
+ public:
+  FixedDistribution(unsigned int size) :
+    BaseDistribution(size, size),
+    size_(size) {}
+ private:
+  virtual unsigned int Get() override {
+    return size_;
+  }
+  virtual bool NeedTruncate() override {
+    return false;
+  }
+  unsigned int size_;
+};
+
+class NormalDistribution
+    : public BaseDistribution, public std::normal_distribution<double> {
+ public:
+  NormalDistribution(unsigned int min, unsigned int max) :
+    BaseDistribution(min, max),
+    // 99.7% values within the range [min, max].
+    std::normal_distribution<double>((double)(min + max) / 2.0 /*mean*/,
+                                     (double)(max - min) / 6.0 /*stddev*/),
+    gen_(rd_()) {}
+ private:
+  virtual unsigned int Get() override {
+    return static_cast<unsigned int>((*this)(gen_));
+  }
+  std::random_device rd_;
+  std::mt19937 gen_;
+};
+
+class UniformDistribution
+    : public BaseDistribution,
+      public std::uniform_int_distribution<unsigned int> {
+ public:
+  UniformDistribution(unsigned int min, unsigned int max) :
+    BaseDistribution(min, max),
+    std::uniform_int_distribution<unsigned int>(min, max),
+    gen_(rd_()) {}
+ private:
+  virtual unsigned int Get() override {
+    return (*this)(gen_);
+  }
+  virtual bool NeedTruncate() override {
+    return false;
+  }
+  std::random_device rd_;
+  std::mt19937 gen_;
+};
+
 // Helper for quickly generating random data.
 class RandomGenerator {
  private:
   std::string data_;
   unsigned int pos_;
+  std::unique_ptr<BaseDistribution> dist_;
 
  public:
+
   RandomGenerator() {
+    auto max_value_size = FLAGS_value_size_max;
+    switch (FLAGS_value_size_distribution_type_e) {
+      case kUniform:
+        dist_.reset(new UniformDistribution(FLAGS_value_size_min,
+                                            FLAGS_value_size_max));
+        break;
+      case kNormal:
+        dist_.reset(new NormalDistribution(FLAGS_value_size_min,
+                                           FLAGS_value_size_max));
+        break;
+      case kFixed:
+      default:
+        dist_.reset(new FixedDistribution(value_size));
+        max_value_size = value_size;
+    }
     // We use a limited amount of data over and over again and ensure
     // that it is larger than the compression window (32KB), and also
     // large enough to serve all typical value sizes we want to write.
     Random rnd(301);
     std::string piece;
-    while (data_.size() < (unsigned)std::max(1048576, FLAGS_value_size)) {
+    while (data_.size() < (unsigned)std::max(1048576, max_value_size)) {
       // Add a short fragment that is as compressible as specified
       // by FLAGS_compression_ratio.
       test::CompressibleString(&rnd, FLAGS_compression_ratio, 100, &piece);
@@ -1349,13 +1597,9 @@ class RandomGenerator {
     return Slice(data_.data() + pos_ - len, len);
   }
 
-  Slice GenerateWithTTL(unsigned int len) {
-    assert(len <= data_.size());
-    if (pos_ + len > data_.size()) {
-      pos_ = 0;
-    }
-    pos_ += len;
-    return Slice(data_.data() + pos_ - len, len);
+  Slice Generate() {
+    auto len = dist_->Generate();
+    return Generate(len);
   }
 };
 
@@ -1506,7 +1750,6 @@ class ReporterAgent {
  private:
   std::string Header() const { return "secs_elapsed,interval_qps"; }
   void SleepAndReport() {
-    uint64_t kMicrosInSecond = 1000 * 1000;
     auto time_started = env_->NowMicros();
     while (true) {
       {
@@ -1546,7 +1789,7 @@ class ReporterAgent {
   std::atomic<int64_t> total_ops_done_;
   int64_t last_report_;
   const uint64_t report_interval_secs_;
-  rocksdb::port::Thread reporting_thread_;
+  ROCKSDB_NAMESPACE::port::Thread reporting_thread_;
   std::mutex mutex_;
   // will notify on stop
   std::condition_variable stop_cv_;
@@ -1672,7 +1915,7 @@ class Stats {
         "ElapsedTime", "Stage", "State", "OperationProperties");
 
     int64_t current_time = 0;
-    Env::Default()->GetCurrentTime(&current_time);
+    FLAGS_env->GetCurrentTime(&current_time);
     for (auto ts : thread_list) {
       fprintf(stderr, "%18" PRIu64 " %10s %12s %20s %13s %45s %12s",
           ts.thread_id,
@@ -2040,7 +2283,6 @@ class Benchmark {
   DBWithColumnFamilies db_;
   std::vector<DBWithColumnFamilies> multi_dbs_;
   int64_t num_;
-  int value_size_;
   int key_size_;
   int prefix_size_;
   int64_t keys_per_prefix_;
@@ -2053,6 +2295,7 @@ class Benchmark {
   Options open_options_;  // keep options around to properly destroy db later
 #ifndef ROCKSDB_LITE
   TraceOptions trace_options_;
+  TraceOptions block_cache_trace_options_;
 #endif
   int64_t reads_;
   int64_t deletes_;
@@ -2089,10 +2332,10 @@ class Benchmark {
       cv_.SignalAll();
     }
 
-    bool WaitForRecovery(uint64_t /*abs_time_us*/) {
+    bool WaitForRecovery(uint64_t abs_time_us) {
       InstrumentedMutexLock l(&mutex_);
       if (!recovery_complete_) {
-        cv_.Wait(/*abs_time_us*/);
+        cv_.TimedWait(abs_time_us);
       }
       if (recovery_complete_) {
         recovery_complete_ = false;
@@ -2128,31 +2371,31 @@ class Benchmark {
                             const Slice& input, std::string* compressed) {
     bool ok = true;
     switch (FLAGS_compression_type_e) {
-      case rocksdb::kSnappyCompression:
+      case ROCKSDB_NAMESPACE::kSnappyCompression:
         ok = Snappy_Compress(compression_info, input.data(), input.size(),
                              compressed);
         break;
-      case rocksdb::kZlibCompression:
+      case ROCKSDB_NAMESPACE::kZlibCompression:
         ok = Zlib_Compress(compression_info, 2, input.data(), input.size(),
                            compressed);
         break;
-      case rocksdb::kBZip2Compression:
+      case ROCKSDB_NAMESPACE::kBZip2Compression:
         ok = BZip2_Compress(compression_info, 2, input.data(), input.size(),
                             compressed);
         break;
-      case rocksdb::kLZ4Compression:
+      case ROCKSDB_NAMESPACE::kLZ4Compression:
         ok = LZ4_Compress(compression_info, 2, input.data(), input.size(),
                           compressed);
         break;
-      case rocksdb::kLZ4HCCompression:
+      case ROCKSDB_NAMESPACE::kLZ4HCCompression:
         ok = LZ4HC_Compress(compression_info, 2, input.data(), input.size(),
                             compressed);
         break;
-      case rocksdb::kXpressCompression:
+      case ROCKSDB_NAMESPACE::kXpressCompression:
         ok = XPRESS_Compress(input.data(),
           input.size(), compressed);
         break;
-      case rocksdb::kZSTD:
+      case ROCKSDB_NAMESPACE::kZSTD:
         ok = ZSTD_Compress(compression_info, input.data(), input.size(),
                            compressed);
         break;
@@ -2165,17 +2408,28 @@ class Benchmark {
   void PrintHeader() {
     PrintEnvironment();
     fprintf(stdout, "Keys:       %d bytes each\n", FLAGS_key_size);
-    fprintf(stdout, "Values:     %d bytes each (%d bytes after compression)\n",
-            FLAGS_value_size,
-            static_cast<int>(FLAGS_value_size * FLAGS_compression_ratio + 0.5));
+    auto avg_value_size = FLAGS_value_size;
+    if (FLAGS_value_size_distribution_type_e == kFixed) {
+      fprintf(stdout, "Values:     %d bytes each (%d bytes after compression)\n",
+              avg_value_size,
+              static_cast<int>(avg_value_size * FLAGS_compression_ratio + 0.5));
+    } else {
+      avg_value_size = (FLAGS_value_size_min + FLAGS_value_size_max) / 2;
+      fprintf(stdout, "Values:     %d avg bytes each (%d bytes after compression)\n",
+              avg_value_size,
+              static_cast<int>(avg_value_size * FLAGS_compression_ratio + 0.5));
+      fprintf(stdout, "Values Distribution: %s (min: %d, max: %d)\n",
+              FLAGS_value_size_distribution_type.c_str(),
+              FLAGS_value_size_min, FLAGS_value_size_max);
+    }
     fprintf(stdout, "Entries:    %" PRIu64 "\n", num_);
     fprintf(stdout, "Prefix:    %d bytes\n", FLAGS_prefix_size);
     fprintf(stdout, "Keys per prefix:    %" PRIu64 "\n", keys_per_prefix_);
     fprintf(stdout, "RawSize:    %.1f MB (estimated)\n",
-            ((static_cast<int64_t>(FLAGS_key_size + FLAGS_value_size) * num_)
+            ((static_cast<int64_t>(FLAGS_key_size + avg_value_size) * num_)
              / 1048576.0));
     fprintf(stdout, "FileSize:   %.1f MB (estimated)\n",
-            (((FLAGS_key_size + FLAGS_value_size * FLAGS_compression_ratio)
+            (((FLAGS_key_size + avg_value_size * FLAGS_compression_ratio)
               * num_)
              / 1048576.0));
     fprintf(stdout, "Write rate: %" PRIu64 " bytes/second\n",
@@ -2230,7 +2484,7 @@ class Benchmark {
     fprintf(stdout,
             "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n");
 #endif
-    if (FLAGS_compression_type_e != rocksdb::kNoCompression) {
+    if (FLAGS_compression_type_e != ROCKSDB_NAMESPACE::kNoCompression) {
       // The test string should not be too small.
       const int len = FLAGS_block_size;
       std::string input_str(len, 'y');
@@ -2355,16 +2609,17 @@ class Benchmark {
       return nullptr;
     }
     if (FLAGS_use_clock_cache) {
-      auto cache = NewClockCache((size_t)capacity, FLAGS_cache_numshardbits);
+      auto cache = NewClockCache(static_cast<size_t>(capacity),
+                                 FLAGS_cache_numshardbits);
       if (!cache) {
         fprintf(stderr, "Clock cache not supported.");
         exit(1);
       }
       return cache;
     } else {
-      return NewLRUCache((size_t)capacity, FLAGS_cache_numshardbits,
-                         false /*strict_capacity_limit*/,
-                         FLAGS_cache_high_pri_pool_ratio);
+      return NewLRUCache(
+          static_cast<size_t>(capacity), FLAGS_cache_numshardbits,
+          false /*strict_capacity_limit*/, FLAGS_cache_high_pri_pool_ratio);
     }
   }
 
@@ -2378,7 +2633,6 @@ class Benchmark {
                            : nullptr),
         prefix_extractor_(NewFixedPrefixTransform(FLAGS_prefix_size)),
         num_(FLAGS_num),
-        value_size_(FLAGS_value_size),
         key_size_(FLAGS_key_size),
         prefix_size_(FLAGS_prefix_size),
         keys_per_prefix_(FLAGS_keys_per_prefix),
@@ -2415,7 +2669,7 @@ class Benchmark {
                 "at the same time");
         exit(1);
       }
-      FLAGS_env = new ReportFileOpEnv(rocksdb::Env::Default());
+      FLAGS_env = new ReportFileOpEnv(FLAGS_env);
     }
 
     if (FLAGS_prefix_size > FLAGS_key_size) {
@@ -2432,6 +2686,7 @@ class Benchmark {
     }
     if (!FLAGS_use_existing_db) {
       Options options;
+      options.env = FLAGS_env;
       if (!FLAGS_wal_dir.empty()) {
         options.wal_dir = FLAGS_wal_dir;
       }
@@ -2526,6 +2781,17 @@ class Benchmark {
     }
   }
 
+  void GenerateKeyFromIntForSeek(uint64_t v, int64_t num_keys, Slice* key) {
+    GenerateKeyFromInt(v, num_keys, key);
+    if (FLAGS_seek_missing_prefix) {
+      assert(prefix_size_ > 8);
+      char* key_ptr = const_cast<char*>(key->data());
+      // This rely on GenerateKeyFromInt filling paddings with '0's.
+      // Putting a '1' will create a non-existing prefix.
+      key_ptr[8] = '1';
+    }
+  }
+
   std::string GetPathForMultiple(std::string base_name, size_t id) {
     if (!base_name.empty()) {
 #ifndef OS_WIN
@@ -2541,36 +2807,38 @@ class Benchmark {
     return base_name + ToString(id);
   }
 
-void VerifyDBFromDB(std::string& truth_db_name) {
-  DBWithColumnFamilies truth_db;
-  auto s = DB::OpenForReadOnly(open_options_, truth_db_name, &truth_db.db);
-  if (!s.ok()) {
-    fprintf(stderr, "open error: %s\n", s.ToString().c_str());
-    exit(1);
-  }
-  ReadOptions ro;
-  ro.total_order_seek = true;
-  std::unique_ptr<Iterator> truth_iter(truth_db.db->NewIterator(ro));
-  std::unique_ptr<Iterator> db_iter(db_.db->NewIterator(ro));
-  // Verify that all the key/values in truth_db are retrivable in db with ::Get
-  fprintf(stderr, "Verifying db >= truth_db with ::Get...\n");
-  for (truth_iter->SeekToFirst(); truth_iter->Valid(); truth_iter->Next()) {
+  void VerifyDBFromDB(std::string& truth_db_name) {
+    DBWithColumnFamilies truth_db;
+    auto s = DB::OpenForReadOnly(open_options_, truth_db_name, &truth_db.db);
+    if (!s.ok()) {
+      fprintf(stderr, "open error: %s\n", s.ToString().c_str());
+      exit(1);
+    }
+    ReadOptions ro;
+    ro.total_order_seek = true;
+    std::unique_ptr<Iterator> truth_iter(truth_db.db->NewIterator(ro));
+    std::unique_ptr<Iterator> db_iter(db_.db->NewIterator(ro));
+    // Verify that all the key/values in truth_db are retrivable in db with
+    // ::Get
+    fprintf(stderr, "Verifying db >= truth_db with ::Get...\n");
+    for (truth_iter->SeekToFirst(); truth_iter->Valid(); truth_iter->Next()) {
       std::string value;
       s = db_.db->Get(ro, truth_iter->key(), &value);
       assert(s.ok());
       // TODO(myabandeh): provide debugging hints
       assert(Slice(value) == truth_iter->value());
+    }
+    // Verify that the db iterator does not give any extra key/value
+    fprintf(stderr, "Verifying db == truth_db...\n");
+    for (db_iter->SeekToFirst(), truth_iter->SeekToFirst(); db_iter->Valid();
+         db_iter->Next(), truth_iter->Next()) {
+      assert(truth_iter->Valid());
+      assert(truth_iter->value() == db_iter->value());
+    }
+    // No more key should be left unchecked in truth_db
+    assert(!truth_iter->Valid());
+    fprintf(stderr, "...Verified\n");
   }
-  // Verify that the db iterator does not give any extra key/value
-  fprintf(stderr, "Verifying db == truth_db...\n");
-  for (db_iter->SeekToFirst(), truth_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next(), truth_iter->Next()) {
-    assert(truth_iter->Valid());
-    assert(truth_iter->value() == db_iter->value());
-  }
-  // No more key should be left unchecked in truth_db
-  assert(!truth_iter->Valid());
-  fprintf(stderr, "...Verified\n");
-}
 
   void Run() {
     if (!SanityCheck()) {
@@ -2587,7 +2855,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
       reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads);
       writes_ = (FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes);
       deletes_ = (FLAGS_deletes < 0 ? FLAGS_num : FLAGS_deletes);
-      value_size_ = FLAGS_value_size;
+      value_size = FLAGS_value_size;
       key_size_ = FLAGS_key_size;
       entries_per_batch_ = FLAGS_batch_size;
       writes_before_delete_range_ = FLAGS_writes_before_delete_range;
@@ -2689,10 +2957,18 @@ void VerifyDBFromDB(std::string& truth_db_name) {
       } else if (name == "fill100K") {
         fresh_db = true;
         num_ /= 1000;
-        value_size_ = 100 * 1000;
+        value_size = 100 * 1000;
         method = &Benchmark::WriteRandom;
       } else if (name == "readseq") {
         method = &Benchmark::ReadSequential;
+      } else if (name == "readtorowcache") {
+        if (!FLAGS_use_existing_keys || !FLAGS_row_cache_size) {
+          fprintf(stderr,
+                  "Please set use_existing_keys to true and specify a "
+                  "row cache size in readtorowcache benchmark\n");
+          exit(1);
+        }
+        method = &Benchmark::ReadToRowCache;
       } else if (name == "readtocache") {
         method = &Benchmark::ReadSequential;
         num_threads = 1;
@@ -2700,6 +2976,10 @@ void VerifyDBFromDB(std::string& truth_db_name) {
       } else if (name == "readreverse") {
         method = &Benchmark::ReadReverse;
       } else if (name == "readrandom") {
+        if (FLAGS_multiread_stride) {
+          fprintf(stderr, "entries_per_batch = %" PRIi64 "\n",
+                  entries_per_batch_);
+        }
         method = &Benchmark::ReadRandom;
       } else if (name == "readrandomfast") {
         method = &Benchmark::ReadRandomFast;
@@ -2808,6 +3088,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
         PrintStats("rocksdb.levelstats");
       } else if (name == "sstables") {
         PrintStats("rocksdb.sstables");
+      } else if (name == "stats_history") {
+        PrintStatsHistory();
       } else if (name == "replay") {
         if (num_threads > 1) {
           fprintf(stderr, "Multi-threaded replay is not yet supported\n");
@@ -2818,6 +3100,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
           exit(1);
         }
         method = &Benchmark::Replay;
+      } else if (name == "getmergeoperands") {
+        method = &Benchmark::GetMergeOperands;
       } else if (!name.empty()) {  // No error message for empty name
         fprintf(stderr, "unknown benchmark '%s'\n", name.c_str());
         exit(1);
@@ -2872,6 +3156,47 @@ void VerifyDBFromDB(std::string& truth_db_name) {
           fprintf(stdout, "Tracing the workload to: [%s]\n",
                   FLAGS_trace_file.c_str());
         }
+        // Start block cache tracing.
+        if (!FLAGS_block_cache_trace_file.empty()) {
+          // Sanity checks.
+          if (FLAGS_block_cache_trace_sampling_frequency <= 0) {
+            fprintf(stderr,
+                    "Block cache trace sampling frequency must be higher than "
+                    "0.\n");
+            exit(1);
+          }
+          if (FLAGS_block_cache_trace_max_trace_file_size_in_bytes <= 0) {
+            fprintf(stderr,
+                    "The maximum file size for block cache tracing must be "
+                    "higher than 0.\n");
+            exit(1);
+          }
+          block_cache_trace_options_.max_trace_file_size =
+              FLAGS_block_cache_trace_max_trace_file_size_in_bytes;
+          block_cache_trace_options_.sampling_frequency =
+              FLAGS_block_cache_trace_sampling_frequency;
+          std::unique_ptr<TraceWriter> block_cache_trace_writer;
+          Status s = NewFileTraceWriter(FLAGS_env, EnvOptions(),
+                                        FLAGS_block_cache_trace_file,
+                                        &block_cache_trace_writer);
+          if (!s.ok()) {
+            fprintf(stderr,
+                    "Encountered an error when creating trace writer, %s\n",
+                    s.ToString().c_str());
+            exit(1);
+          }
+          s = db_.db->StartBlockCacheTrace(block_cache_trace_options_,
+                                           std::move(block_cache_trace_writer));
+          if (!s.ok()) {
+            fprintf(
+                stderr,
+                "Encountered an error when starting block cache tracing, %s\n",
+                s.ToString().c_str());
+            exit(1);
+          }
+          fprintf(stdout, "Tracing block cache accesses to: [%s]\n",
+                  FLAGS_block_cache_trace_file.c_str());
+        }
 #endif  // ROCKSDB_LITE
 
         if (num_warmup > 0) {
@@ -2900,6 +3225,12 @@ void VerifyDBFromDB(std::string& truth_db_name) {
       }
     }
 
+    if (secondary_update_thread_) {
+      secondary_update_stopped_.store(1, std::memory_order_relaxed);
+      secondary_update_thread_->join();
+      secondary_update_thread_.reset();
+    }
+
 #ifndef ROCKSDB_LITE
     if (name != "replay" && FLAGS_trace_file != "") {
       Status s = db_.db->EndTrace();
@@ -2908,6 +3239,14 @@ void VerifyDBFromDB(std::string& truth_db_name) {
                 s.ToString().c_str());
       }
     }
+    if (!FLAGS_block_cache_trace_file.empty()) {
+      Status s = db_.db->EndBlockCacheTrace();
+      if (!s.ok()) {
+        fprintf(stderr,
+                "Encountered an error ending the block cache tracing, %s\n",
+                s.ToString().c_str());
+      }
+    }
 #endif  // ROCKSDB_LITE
 
     if (FLAGS_statistics) {
@@ -2919,11 +3258,22 @@ void VerifyDBFromDB(std::string& truth_db_name) {
                   ->ToString()
                   .c_str());
     }
+
+#ifndef ROCKSDB_LITE
+    if (FLAGS_use_secondary_db) {
+      fprintf(stdout, "Secondary instance updated  %" PRIu64 " times.\n",
+              secondary_db_updates_);
+    }
+#endif  // ROCKSDB_LITE
   }
 
  private:
   std::shared_ptr<TimestampEmulator> timestamp_emulator_;
-
+  std::unique_ptr<port::Thread> secondary_update_thread_;
+  std::atomic<int> secondary_update_stopped_{0};
+#ifndef ROCKSDB_LITE
+  uint64_t secondary_db_updates_ = 0;
+#endif  // ROCKSDB_LITE
   struct ThreadArg {
     Benchmark* bm;
     SharedState* shared;
@@ -3149,7 +3499,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     while (ok && bytes < 1024 * 1048576) {
       CacheAllocationPtr uncompressed;
       switch (FLAGS_compression_type_e) {
-        case rocksdb::kSnappyCompression: {
+        case ROCKSDB_NAMESPACE::kSnappyCompression: {
           // get size and allocate here to make comparison fair
           size_t ulength = 0;
           if (!Snappy_GetUncompressedLength(compressed.data(),
@@ -3162,38 +3512,39 @@ void VerifyDBFromDB(std::string& truth_db_name) {
                                  uncompressed.get());
           break;
         }
-      case rocksdb::kZlibCompression:
-        uncompressed = Zlib_Uncompress(uncompression_info, compressed.data(),
-                                       compressed.size(), &decompress_size, 2);
-        ok = uncompressed.get() != nullptr;
-        break;
-      case rocksdb::kBZip2Compression:
-        uncompressed = BZip2_Uncompress(compressed.data(), compressed.size(),
-                                        &decompress_size, 2);
-        ok = uncompressed.get() != nullptr;
-        break;
-      case rocksdb::kLZ4Compression:
-        uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(),
-                                      compressed.size(), &decompress_size, 2);
-        ok = uncompressed.get() != nullptr;
-        break;
-      case rocksdb::kLZ4HCCompression:
-        uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(),
-                                      compressed.size(), &decompress_size, 2);
-        ok = uncompressed.get() != nullptr;
-        break;
-      case rocksdb::kXpressCompression:
-        uncompressed.reset(XPRESS_Uncompress(
-            compressed.data(), compressed.size(), &decompress_size));
-        ok = uncompressed.get() != nullptr;
-        break;
-      case rocksdb::kZSTD:
-        uncompressed = ZSTD_Uncompress(uncompression_info, compressed.data(),
-                                       compressed.size(), &decompress_size);
-        ok = uncompressed.get() != nullptr;
-        break;
-      default:
-        ok = false;
+        case ROCKSDB_NAMESPACE::kZlibCompression:
+          uncompressed =
+              Zlib_Uncompress(uncompression_info, compressed.data(),
+                              compressed.size(), &decompress_size, 2);
+          ok = uncompressed.get() != nullptr;
+          break;
+        case ROCKSDB_NAMESPACE::kBZip2Compression:
+          uncompressed = BZip2_Uncompress(compressed.data(), compressed.size(),
+                                          &decompress_size, 2);
+          ok = uncompressed.get() != nullptr;
+          break;
+        case ROCKSDB_NAMESPACE::kLZ4Compression:
+          uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(),
+                                        compressed.size(), &decompress_size, 2);
+          ok = uncompressed.get() != nullptr;
+          break;
+        case ROCKSDB_NAMESPACE::kLZ4HCCompression:
+          uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(),
+                                        compressed.size(), &decompress_size, 2);
+          ok = uncompressed.get() != nullptr;
+          break;
+        case ROCKSDB_NAMESPACE::kXpressCompression:
+          uncompressed.reset(XPRESS_Uncompress(
+              compressed.data(), compressed.size(), &decompress_size));
+          ok = uncompressed.get() != nullptr;
+          break;
+        case ROCKSDB_NAMESPACE::kZSTD:
+          uncompressed = ZSTD_Uncompress(uncompression_info, compressed.data(),
+                                         compressed.size(), &decompress_size);
+          ok = uncompressed.get() != nullptr;
+          break;
+        default:
+          ok = false;
       }
       bytes += input.size();
       thread->stats.FinishedOps(nullptr, nullptr, 1, kUncompress);
@@ -3214,8 +3565,9 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     DBOptions db_opts;
     std::vector<ColumnFamilyDescriptor> cf_descs;
     if (FLAGS_options_file != "") {
-      auto s = LoadOptionsFromFile(FLAGS_options_file, Env::Default(), &db_opts,
+      auto s = LoadOptionsFromFile(FLAGS_options_file, FLAGS_env, &db_opts,
                                    &cf_descs);
+      db_opts.env = FLAGS_env;
       if (s.ok()) {
         *opts = Options(db_opts, cf_descs[0].options);
         return true;
@@ -3236,6 +3588,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
 
     assert(db_.db == nullptr);
 
+    options.env = FLAGS_env;
     options.max_open_files = FLAGS_open_files;
     if (FLAGS_cost_write_buffer_to_cache || FLAGS_db_write_buffer_size != 0) {
       options.write_buffer_manager.reset(
@@ -3247,6 +3600,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
       FLAGS_min_write_buffer_number_to_merge;
     options.max_write_buffer_number_to_maintain =
         FLAGS_max_write_buffer_number_to_maintain;
+    options.max_write_buffer_size_to_maintain =
+        FLAGS_max_write_buffer_size_to_maintain;
     options.max_background_jobs = FLAGS_max_background_jobs;
     options.max_background_compactions = FLAGS_max_background_compactions;
     options.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
@@ -3291,6 +3646,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     options.new_table_reader_for_compaction_inputs =
         FLAGS_new_table_reader_for_compaction_inputs;
     options.compaction_readahead_size = FLAGS_compaction_readahead_size;
+    options.log_readahead_size = FLAGS_log_readahead_size;
     options.random_access_max_buffer_size = FLAGS_random_access_max_buffer_size;
     options.writable_file_max_buffer_size = FLAGS_writable_file_max_buffer_size;
     options.use_fsync = FLAGS_use_fsync;
@@ -3367,7 +3723,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
         exit(1);
       }
 
-      rocksdb::CuckooTableOptions table_options;
+      ROCKSDB_NAMESPACE::CuckooTableOptions table_options;
       table_options.hash_table_ratio = FLAGS_cuckoo_hash_ratio;
       table_options.identity_as_first_hash = FLAGS_identity_as_first_hash;
       options.table_factory = std::shared_ptr<TableFactory>(
@@ -3429,10 +3785,10 @@ void VerifyDBFromDB(std::string& truth_db_name) {
       block_based_options.block_align = FLAGS_block_align;
       if (FLAGS_use_data_block_hash_index) {
         block_based_options.data_block_index_type =
-            rocksdb::BlockBasedTableOptions::kDataBlockBinaryAndHash;
+            ROCKSDB_NAMESPACE::BlockBasedTableOptions::kDataBlockBinaryAndHash;
       } else {
         block_based_options.data_block_index_type =
-            rocksdb::BlockBasedTableOptions::kDataBlockBinarySearch;
+            ROCKSDB_NAMESPACE::BlockBasedTableOptions::kDataBlockBinarySearch;
       }
       block_based_options.data_block_hash_table_util_ratio =
           FLAGS_data_block_hash_table_util_ratio;
@@ -3480,9 +3836,10 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     }
     if (FLAGS_max_bytes_for_level_multiplier_additional_v.size() > 0) {
       if (FLAGS_max_bytes_for_level_multiplier_additional_v.size() !=
-          (unsigned int)FLAGS_num_levels) {
+          static_cast<unsigned int>(FLAGS_num_levels)) {
         fprintf(stderr, "Insufficient number of fanouts specified %d\n",
-                (int)FLAGS_max_bytes_for_level_multiplier_additional_v.size());
+                static_cast<int>(
+                    FLAGS_max_bytes_for_level_multiplier_additional_v.size()));
         exit(1);
       }
       options.max_bytes_for_level_multiplier_additional =
@@ -3524,6 +3881,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     options.enable_write_thread_adaptive_yield =
         FLAGS_enable_write_thread_adaptive_yield;
     options.enable_pipelined_write = FLAGS_enable_pipelined_write;
+    options.unordered_write = FLAGS_unordered_write;
     options.write_thread_max_yield_usec = FLAGS_write_thread_max_yield_usec;
     options.write_thread_slow_yield_usec = FLAGS_write_thread_slow_yield_usec;
     options.rate_limit_delay_max_milliseconds =
@@ -3583,6 +3941,11 @@ void VerifyDBFromDB(std::string& truth_db_name) {
       fprintf(stderr, "Cannot use readonly flag with transaction_db\n");
       exit(1);
     }
+    if (FLAGS_use_secondary_db &&
+        (FLAGS_transaction_db || FLAGS_optimistic_transaction_db)) {
+      fprintf(stderr, "Cannot use use_secondary_db flag with transaction_db\n");
+      exit(1);
+    }
 #endif  // ROCKSDB_LITE
 
   }
@@ -3599,6 +3962,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
         static_cast<unsigned int>(FLAGS_stats_dump_period_sec);
     options.stats_persist_period_sec =
         static_cast<unsigned int>(FLAGS_stats_persist_period_sec);
+    options.persist_stats_to_disk = FLAGS_persist_stats_to_disk;
     options.stats_history_buffer_size =
         static_cast<size_t>(FLAGS_stats_history_buffer_size);
 
@@ -3753,6 +4117,11 @@ void VerifyDBFromDB(std::string& truth_db_name) {
       } else if (FLAGS_transaction_db) {
         TransactionDB* ptr;
         TransactionDBOptions txn_db_options;
+        if (options.unordered_write) {
+          options.two_write_queues = true;
+          txn_db_options.skip_concurrency_control = true;
+          txn_db_options.write_policy = WRITE_PREPARED;
+        }
         s = TransactionDB::Open(options, txn_db_options, db_name,
                                 column_families, &db->cfh, &ptr);
         if (s.ok()) {
@@ -3779,6 +4148,11 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     } else if (FLAGS_transaction_db) {
       TransactionDB* ptr = nullptr;
       TransactionDBOptions txn_db_options;
+      if (options.unordered_write) {
+        options.two_write_queues = true;
+        txn_db_options.skip_concurrency_control = true;
+        txn_db_options.write_policy = WRITE_PREPARED;
+      }
       s = CreateLoggerFromOptions(db_name, options, &options.info_log);
       if (s.ok()) {
         s = TransactionDB::Open(options, txn_db_options, db_name, &ptr);
@@ -3789,17 +4163,45 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     } else if (FLAGS_use_blob_db) {
       blob_db::BlobDBOptions blob_db_options;
       blob_db_options.enable_garbage_collection = FLAGS_blob_db_enable_gc;
+      blob_db_options.garbage_collection_cutoff = FLAGS_blob_db_gc_cutoff;
       blob_db_options.is_fifo = FLAGS_blob_db_is_fifo;
       blob_db_options.max_db_size = FLAGS_blob_db_max_db_size;
       blob_db_options.ttl_range_secs = FLAGS_blob_db_ttl_range_secs;
       blob_db_options.min_blob_size = FLAGS_blob_db_min_blob_size;
       blob_db_options.bytes_per_sync = FLAGS_blob_db_bytes_per_sync;
       blob_db_options.blob_file_size = FLAGS_blob_db_file_size;
+      blob_db_options.compression = FLAGS_blob_db_compression_type_e;
       blob_db::BlobDB* ptr = nullptr;
       s = blob_db::BlobDB::Open(options, blob_db_options, db_name, &ptr);
       if (s.ok()) {
         db->db = ptr;
       }
+    } else if (FLAGS_use_secondary_db) {
+      if (FLAGS_secondary_path.empty()) {
+        std::string default_secondary_path;
+        FLAGS_env->GetTestDirectory(&default_secondary_path);
+        default_secondary_path += "/dbbench_secondary";
+        FLAGS_secondary_path = default_secondary_path;
+      }
+      s = DB::OpenAsSecondary(options, db_name, FLAGS_secondary_path, &db->db);
+      if (s.ok() && FLAGS_secondary_update_interval > 0) {
+        secondary_update_thread_.reset(new port::Thread(
+            [this](int interval, DBWithColumnFamilies* _db) {
+              while (0 == secondary_update_stopped_.load(
+                              std::memory_order_relaxed)) {
+                Status secondary_update_status =
+                    _db->db->TryCatchUpWithPrimary();
+                if (!secondary_update_status.ok()) {
+                  fprintf(stderr, "Failed to catch up with primary: %s\n",
+                          secondary_update_status.ToString().c_str());
+                  break;
+                }
+                ++secondary_db_updates_;
+                FLAGS_env->SleepForMicroseconds(interval * 1000000);
+              }
+            },
+            FLAGS_secondary_update_interval, db));
+      }
 #endif  // ROCKSDB_LITE
     } else {
       s = DB::Open(options, db_name, &db->db);
@@ -3964,38 +4366,34 @@ void VerifyDBFromDB(std::string& truth_db_name) {
       size_t id = thread->rand.Next() % num_key_gens;
       DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(id);
       batch.Clear();
-
-      if (thread->shared->write_rate_limiter.get() != nullptr) {
-        thread->shared->write_rate_limiter->Request(
-            entries_per_batch_ * (value_size_ + key_size_), Env::IO_HIGH,
-            nullptr /* stats */, RateLimiter::OpType::kWrite);
-        // Set time at which last op finished to Now() to hide latency and
-        // sleep from rate limiter. Also, do the check once per batch, not
-        // once per write.
-        thread->stats.ResetLastOpTime();
-      }
+      int64_t batch_bytes = 0;
 
       for (int64_t j = 0; j < entries_per_batch_; j++) {
         int64_t rand_num = key_gens[id]->Next();
         GenerateKeyFromInt(rand_num, FLAGS_num, &key);
+        Slice val = gen.Generate();
         if (use_blob_db_) {
 #ifndef ROCKSDB_LITE
-          Slice val = gen.Generate(value_size_);
-          int ttl = rand() % FLAGS_blob_db_max_ttl_range;
           blob_db::BlobDB* blobdb =
               static_cast<blob_db::BlobDB*>(db_with_cfh->db);
-          s = blobdb->PutWithTTL(write_options_, key, val, ttl);
+          if (FLAGS_blob_db_max_ttl_range > 0) {
+            int ttl = rand() % FLAGS_blob_db_max_ttl_range;
+            s = blobdb->PutWithTTL(write_options_, key, val, ttl);
+          } else {
+            s = blobdb->Put(write_options_, key, val);
+          }
 #endif  //  ROCKSDB_LITE
         } else if (FLAGS_num_column_families <= 1) {
-          batch.Put(key, gen.Generate(value_size_));
+          batch.Put(key, val);
         } else {
           // We use same rand_num as seed for key and column family so that we
           // can deterministically find the cfh corresponding to a particular
           // key while reading the key.
           batch.Put(db_with_cfh->GetCfh(rand_num), key,
-                    gen.Generate(value_size_));
+                    val);
         }
-        bytes += value_size_ + key_size_;
+        batch_bytes += val.size() + key_size_;
+        bytes += val.size() + key_size_;
         ++num_written;
         if (writes_per_range_tombstone_ > 0 &&
             num_written > writes_before_delete_range_ &&
@@ -4042,6 +4440,15 @@ void VerifyDBFromDB(std::string& truth_db_name) {
           }
         }
       }
+      if (thread->shared->write_rate_limiter.get() != nullptr) {
+        thread->shared->write_rate_limiter->Request(
+            batch_bytes, Env::IO_HIGH,
+            nullptr /* stats */, RateLimiter::OpType::kWrite);
+        // Set time at which last op finished to Now() to hide latency and
+        // sleep from rate limiter. Also, do the check once per batch, not
+        // once per write.
+        thread->stats.ResetLastOpTime();
+      }
       if (!use_blob_db_) {
         s = db_with_cfh->db->Write(write_options_, &batch);
       }
@@ -4417,7 +4824,66 @@ void VerifyDBFromDB(std::string& truth_db_name) {
 
     delete iter;
     thread->stats.AddBytes(bytes);
-    if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
+    if (FLAGS_perf_level > ROCKSDB_NAMESPACE::PerfLevel::kDisable) {
+      thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
+                               get_perf_context()->ToString());
+    }
+  }
+
+  void ReadToRowCache(ThreadState* thread) {
+    int64_t read = 0;
+    int64_t found = 0;
+    int64_t bytes = 0;
+    int64_t key_rand = 0;
+    ReadOptions options(FLAGS_verify_checksum, true);
+    std::unique_ptr<const char[]> key_guard;
+    Slice key = AllocateKey(&key_guard);
+    PinnableSlice pinnable_val;
+
+    while (key_rand < FLAGS_num) {
+      DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(thread);
+      // We use same key_rand as seed for key and column family so that we can
+      // deterministically find the cfh corresponding to a particular key, as it
+      // is done in DoWrite method.
+      GenerateKeyFromInt(key_rand, FLAGS_num, &key);
+      key_rand++;
+      read++;
+      Status s;
+      if (FLAGS_num_column_families > 1) {
+        s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key,
+                                 &pinnable_val);
+      } else {
+        pinnable_val.Reset();
+        s = db_with_cfh->db->Get(options,
+                                 db_with_cfh->db->DefaultColumnFamily(), key,
+                                 &pinnable_val);
+      }
+
+      if (s.ok()) {
+        found++;
+        bytes += key.size() + pinnable_val.size();
+      } else if (!s.IsNotFound()) {
+        fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());
+        abort();
+      }
+
+      if (thread->shared->read_rate_limiter.get() != nullptr &&
+          read % 256 == 255) {
+        thread->shared->read_rate_limiter->Request(
+            256, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead);
+      }
+
+      thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kRead);
+    }
+
+    char msg[100];
+    snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n", found,
+             read);
+
+    thread->stats.AddBytes(bytes);
+    thread->stats.AddMessage(msg);
+
+    if (FLAGS_perf_level > ROCKSDB_NAMESPACE::PerfLevel::kDisable) {
       thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
                                get_perf_context()->ToString());
     }
@@ -4500,7 +4966,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
 
     thread->stats.AddMessage(msg);
 
-    if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
+    if (FLAGS_perf_level > ROCKSDB_NAMESPACE::PerfLevel::kDisable) {
       thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
                                get_perf_context()->ToString());
     }
@@ -4531,6 +4997,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     int64_t read = 0;
     int64_t found = 0;
     int64_t bytes = 0;
+    int num_keys = 0;
+    int64_t key_rand = GetRandomKey(&thread->rand);
     ReadOptions options(FLAGS_verify_checksum, true);
     std::unique_ptr<const char[]> key_guard;
     Slice key = AllocateKey(&key_guard);
@@ -4542,8 +5010,21 @@ void VerifyDBFromDB(std::string& truth_db_name) {
       // We use same key_rand as seed for key and column family so that we can
       // deterministically find the cfh corresponding to a particular key, as it
       // is done in DoWrite method.
-      int64_t key_rand = GetRandomKey(&thread->rand);
       GenerateKeyFromInt(key_rand, FLAGS_num, &key);
+      if (entries_per_batch_ > 1 && FLAGS_multiread_stride) {
+        if (++num_keys == entries_per_batch_) {
+          num_keys = 0;
+          key_rand = GetRandomKey(&thread->rand);
+          if ((key_rand + (entries_per_batch_ - 1) * FLAGS_multiread_stride) >=
+              FLAGS_num) {
+            key_rand = FLAGS_num - entries_per_batch_ * FLAGS_multiread_stride;
+          }
+        } else {
+          key_rand += FLAGS_multiread_stride;
+        }
+      } else {
+        key_rand = GetRandomKey(&thread->rand);
+      }
       read++;
       Status s;
       if (FLAGS_num_column_families > 1) {
@@ -4579,7 +5060,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     thread->stats.AddBytes(bytes);
     thread->stats.AddMessage(msg);
 
-    if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
+    if (FLAGS_perf_level > ROCKSDB_NAMESPACE::PerfLevel::kDisable) {
       thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
                                get_perf_context()->ToString());
     }
@@ -4595,6 +5076,9 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     std::vector<Slice> keys;
     std::vector<std::unique_ptr<const char[]> > key_guards;
     std::vector<std::string> values(entries_per_batch_);
+    PinnableSlice* pin_values = new PinnableSlice[entries_per_batch_];
+    std::unique_ptr<PinnableSlice[]> pin_values_guard(pin_values);
+    std::vector<Status> stat_list(entries_per_batch_);
     while (static_cast<int64_t>(keys.size()) < entries_per_batch_) {
       key_guards.push_back(std::unique_ptr<const char[]>());
       keys.push_back(AllocateKey(&key_guards.back()));
@@ -4603,21 +5087,52 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     Duration duration(FLAGS_duration, reads_);
     while (!duration.Done(1)) {
       DB* db = SelectDB(thread);
-      for (int64_t i = 0; i < entries_per_batch_; ++i) {
-        GenerateKeyFromInt(GetRandomKey(&thread->rand), FLAGS_num, &keys[i]);
+      if (FLAGS_multiread_stride) {
+        int64_t key = GetRandomKey(&thread->rand);
+        if ((key + (entries_per_batch_ - 1) * FLAGS_multiread_stride) >=
+            static_cast<int64_t>(FLAGS_num)) {
+          key = FLAGS_num - entries_per_batch_ * FLAGS_multiread_stride;
+        }
+        for (int64_t i = 0; i < entries_per_batch_; ++i) {
+          GenerateKeyFromInt(key, FLAGS_num, &keys[i]);
+          key += FLAGS_multiread_stride;
+        }
+      } else {
+        for (int64_t i = 0; i < entries_per_batch_; ++i) {
+          GenerateKeyFromInt(GetRandomKey(&thread->rand), FLAGS_num, &keys[i]);
+        }
       }
-      std::vector<Status> statuses = db->MultiGet(options, keys, &values);
-      assert(static_cast<int64_t>(statuses.size()) == entries_per_batch_);
-
-      read += entries_per_batch_;
-      num_multireads++;
-      for (int64_t i = 0; i < entries_per_batch_; ++i) {
-        if (statuses[i].ok()) {
-          ++found;
-        } else if (!statuses[i].IsNotFound()) {
-          fprintf(stderr, "MultiGet returned an error: %s\n",
-                  statuses[i].ToString().c_str());
-          abort();
+      if (!FLAGS_multiread_batched) {
+        std::vector<Status> statuses = db->MultiGet(options, keys, &values);
+        assert(static_cast<int64_t>(statuses.size()) == entries_per_batch_);
+
+        read += entries_per_batch_;
+        num_multireads++;
+        for (int64_t i = 0; i < entries_per_batch_; ++i) {
+          if (statuses[i].ok()) {
+            ++found;
+          } else if (!statuses[i].IsNotFound()) {
+            fprintf(stderr, "MultiGet returned an error: %s\n",
+                    statuses[i].ToString().c_str());
+            abort();
+          }
+        }
+      } else {
+        db->MultiGet(options, db->DefaultColumnFamily(), keys.size(),
+                     keys.data(), pin_values, stat_list.data());
+
+        read += entries_per_batch_;
+        num_multireads++;
+        for (int64_t i = 0; i < entries_per_batch_; ++i) {
+          if (stat_list[i].ok()) {
+            ++found;
+          } else if (!stat_list[i].IsNotFound()) {
+            fprintf(stderr, "MultiGet returned an error: %s\n",
+                    stat_list[i].ToString().c_str());
+            abort();
+          }
+          stat_list[i] = Status::OK();
+          pin_values[i].Reset();
         }
       }
       if (thread->shared->read_rate_limiter.get() != nullptr &&
@@ -4635,7 +5150,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     thread->stats.AddMessage(msg);
   }
 
-  // THe reverse function of Pareto function
+  // The inverse function of Pareto distribution
   int64_t ParetoCdfInversion(double u, double theta, double k, double sigma) {
     double ret;
     if (k == 0.0) {
@@ -4645,7 +5160,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     }
     return static_cast<int64_t>(ceil(ret));
   }
-  // inversion of y=ax^b
+  // The inverse function of power distribution (y=ax^b)
   int64_t PowerCdfInversion(double u, double a, double b) {
     double ret;
     ret = std::pow((u / a), (1 / b));
@@ -4666,7 +5181,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     }
   }
 
-  // decide the query type
+  // Decide the ratio of different query types
   // 0 Get, 1 Put, 2 Seek, 3 SeekForPrev, 4 Delete, 5 SingleDelete, 6 merge
   class QueryDecider {
    public:
@@ -4707,7 +5222,157 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     }
   };
 
-  // The graph wokrload mixed with Get, Put, Iterator
+  // KeyrangeUnit is the struct of a keyrange. It is used in a keyrange vector
+  // to transfer a random value to one keyrange based on the hotness.
+  struct KeyrangeUnit {
+    int64_t keyrange_start;
+    int64_t keyrange_access;
+    int64_t keyrange_keys;
+  };
+
+  // From our observations, the prefix hotness (key-range hotness) follows
+  // the two-term-exponential distribution: f(x) = a*exp(b*x) + c*exp(d*x).
+  // However, we cannot directly use the inverse function to decide a
+  // key-range from a random distribution. To achieve it, we create a list of
+  // KeyrangeUnit, each KeyrangeUnit occupies a range of integers whose size is
+  // decided based on the hotness of the key-range. When a random value is
+  // generated based on uniform distribution, we map it to the KeyrangeUnit Vec
+  // and one KeyrangeUnit is selected. The probability of a  KeyrangeUnit being
+  // selected is the same as the hotness of this KeyrangeUnit. After that, the
+  // key can be randomly allocated to the key-range of this KeyrangeUnit, or we
+  // can based on the power distribution (y=ax^b) to generate the offset of
+  // the key in the selected key-range. In this way, we generate the keyID
+  // based on the hotness of the prefix and also the key hotness distribution.
+  class GenerateTwoTermExpKeys {
+   public:
+    int64_t keyrange_rand_max_;
+    int64_t keyrange_size_;
+    int64_t keyrange_num_;
+    bool initiated_;
+    std::vector<KeyrangeUnit> keyrange_set_;
+
+    GenerateTwoTermExpKeys() {
+      keyrange_rand_max_ = FLAGS_num;
+      initiated_ = false;
+    }
+
+    ~GenerateTwoTermExpKeys() {}
+
+    // Initiate the KeyrangeUnit vector and calculate the size of each
+    // KeyrangeUnit.
+    Status InitiateExpDistribution(int64_t total_keys, double prefix_a,
+                                   double prefix_b, double prefix_c,
+                                   double prefix_d) {
+      int64_t amplify = 0;
+      int64_t keyrange_start = 0;
+      initiated_ = true;
+      if (FLAGS_keyrange_num <= 0) {
+        keyrange_num_ = 1;
+      } else {
+        keyrange_num_ = FLAGS_keyrange_num;
+      }
+      keyrange_size_ = total_keys / keyrange_num_;
+
+      // Calculate the key-range shares size based on the input parameters
+      for (int64_t pfx = keyrange_num_; pfx >= 1; pfx--) {
+        // Step 1. Calculate the probability that this key range will be
+        // accessed in a query. It is based on the two-term expoential
+        // distribution
+        double keyrange_p = prefix_a * std::exp(prefix_b * pfx) +
+                            prefix_c * std::exp(prefix_d * pfx);
+        if (keyrange_p < std::pow(10.0, -16.0)) {
+          keyrange_p = 0.0;
+        }
+        // Step 2. Calculate the amplify
+        // In order to allocate a query to a key-range based on the random
+        // number generated for this query, we need to extend the probability
+        // of each key range from [0,1] to [0, amplify]. Amplify is calculated
+        // by 1/(smallest key-range probability). In this way, we ensure that
+        // all key-ranges are assigned with an Integer that  >=0
+        if (amplify == 0 && keyrange_p > 0) {
+          amplify = static_cast<int64_t>(std::floor(1 / keyrange_p)) + 1;
+        }
+
+        // Step 3. For each key-range, we calculate its position in the
+        // [0, amplify] range, including the start, the size (keyrange_access)
+        KeyrangeUnit p_unit;
+        p_unit.keyrange_start = keyrange_start;
+        if (0.0 >= keyrange_p) {
+          p_unit.keyrange_access = 0;
+        } else {
+          p_unit.keyrange_access =
+              static_cast<int64_t>(std::floor(amplify * keyrange_p));
+        }
+        p_unit.keyrange_keys = keyrange_size_;
+        keyrange_set_.push_back(p_unit);
+        keyrange_start += p_unit.keyrange_access;
+      }
+      keyrange_rand_max_ = keyrange_start;
+
+      // Step 4. Shuffle the key-ranges randomly
+      // Since the access probability is calculated from small to large,
+      // If we do not re-allocate them, hot key-ranges are always at the end
+      // and cold key-ranges are at the begin of the key space. Therefore, the
+      // key-ranges are shuffled and the rand seed is only decide by the
+      // key-range hotness distribution. With the same distribution parameters
+      // the shuffle results are the same.
+      Random64 rand_loca(keyrange_rand_max_);
+      for (int64_t i = 0; i < FLAGS_keyrange_num; i++) {
+        int64_t pos = rand_loca.Next() % FLAGS_keyrange_num;
+        assert(i >= 0 && i < static_cast<int64_t>(keyrange_set_.size()) &&
+               pos >= 0 && pos < static_cast<int64_t>(keyrange_set_.size()));
+        std::swap(keyrange_set_[i], keyrange_set_[pos]);
+      }
+
+      // Step 5. Recalculate the prefix start postion after shuffling
+      int64_t offset = 0;
+      for (auto& p_unit : keyrange_set_) {
+        p_unit.keyrange_start = offset;
+        offset += p_unit.keyrange_access;
+      }
+
+      return Status::OK();
+    }
+
+    // Generate the Key ID according to the input ini_rand and key distribution
+    int64_t DistGetKeyID(int64_t ini_rand, double key_dist_a,
+                         double key_dist_b) {
+      int64_t keyrange_rand = ini_rand % keyrange_rand_max_;
+
+      // Calculate and select one key-range that contains the new key
+      int64_t start = 0, end = static_cast<int64_t>(keyrange_set_.size());
+      while (start + 1 < end) {
+        int64_t mid = start + (end - start) / 2;
+        assert(mid >= 0 && mid < static_cast<int64_t>(keyrange_set_.size()));
+        if (keyrange_rand < keyrange_set_[mid].keyrange_start) {
+          end = mid;
+        } else {
+          start = mid;
+        }
+      }
+      int64_t keyrange_id = start;
+
+      // Select one key in the key-range and compose the keyID
+      int64_t key_offset = 0, key_seed;
+      if (key_dist_a == 0.0 && key_dist_b == 0.0) {
+        key_offset = ini_rand % keyrange_size_;
+      } else {
+        key_seed = static_cast<int64_t>(
+            ceil(std::pow((ini_rand / key_dist_a), (1 / key_dist_b))));
+        Random64 rand_key(key_seed);
+        key_offset = static_cast<int64_t>(rand_key.Next()) % keyrange_size_;
+      }
+      return keyrange_size_ * keyrange_id + key_offset;
+    }
+  };
+
+  // The social graph wokrload mixed with Get, Put, Iterator queries.
+  // The value size and iterator length follow Pareto distribution.
+  // The overall key access follow power distribution. If user models the
+  // workload based on different key-ranges (or different prefixes), user
+  // can use two-term-exponential distribution to fit the workload. User
+  // needs to decides the ratio between Get, Put, Iterator queries before
+  // starting the benchmark.
   void MixGraph(ThreadState* thread) {
     int64_t read = 0;  // including single gets and Next of iterators
     int64_t gets = 0;
@@ -4721,6 +5386,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     int64_t scan_len_max = FLAGS_mix_max_scan_len;
     double write_rate = 1000000.0;
     double read_rate = 1000000.0;
+    bool use_prefix_modeling = false;
+    GenerateTwoTermExpKeys gen_exp;
     std::vector<double> ratio{FLAGS_mix_get_ratio, FLAGS_mix_put_ratio,
                               FLAGS_mix_seek_ratio};
     char value_buffer[default_value_max];
@@ -4740,21 +5407,38 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     // the limit of qps initiation
     if (FLAGS_sine_a != 0 || FLAGS_sine_d != 0) {
       thread->shared->read_rate_limiter.reset(NewGenericRateLimiter(
-          read_rate, 100000 /* refill_period_us */, 10 /* fairness */,
+          static_cast<int64_t>(read_rate), 100000 /* refill_period_us */, 10 /* fairness */,
           RateLimiter::Mode::kReadsOnly));
       thread->shared->write_rate_limiter.reset(
-          NewGenericRateLimiter(write_rate));
+          NewGenericRateLimiter(static_cast<int64_t>(write_rate)));
+    }
+
+    // Decide if user wants to use prefix based key generation
+    if (FLAGS_keyrange_dist_a != 0.0 || FLAGS_keyrange_dist_b != 0.0 ||
+        FLAGS_keyrange_dist_c != 0.0 || FLAGS_keyrange_dist_d != 0.0) {
+      use_prefix_modeling = true;
+      gen_exp.InitiateExpDistribution(
+          FLAGS_num, FLAGS_keyrange_dist_a, FLAGS_keyrange_dist_b,
+          FLAGS_keyrange_dist_c, FLAGS_keyrange_dist_d);
     }
 
     Duration duration(FLAGS_duration, reads_);
     while (!duration.Done(1)) {
       DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(thread);
-      int64_t rand_v, key_rand, key_seed;
-      rand_v = GetRandomKey(&thread->rand) % FLAGS_num;
+      int64_t ini_rand, rand_v, key_rand, key_seed;
+      ini_rand = GetRandomKey(&thread->rand);
+      rand_v = ini_rand % FLAGS_num;
       double u = static_cast<double>(rand_v) / FLAGS_num;
-      key_seed = PowerCdfInversion(u, FLAGS_key_dist_a, FLAGS_key_dist_b);
-      Random64 rand(key_seed);
-      key_rand = static_cast<int64_t>(rand.Next()) % FLAGS_num;
+
+      // Generate the keyID based on the key hotness and prefix hotness
+      if (use_prefix_modeling) {
+        key_rand =
+            gen_exp.DistGetKeyID(ini_rand, FLAGS_key_dist_a, FLAGS_key_dist_b);
+      } else {
+        key_seed = PowerCdfInversion(u, FLAGS_key_dist_a, FLAGS_key_dist_b);
+        Random64 rand(key_seed);
+        key_rand = static_cast<int64_t>(rand.Next()) % FLAGS_num;
+      }
       GenerateKeyFromInt(key_rand, FLAGS_num, &key);
       int query_type = query.GetType(rand_v);
 
@@ -4779,9 +5463,9 @@ void VerifyDBFromDB(std::string& truth_db_name) {
             mix_rate_with_noise * query.ratio_[1] * FLAGS_mix_ave_kv_size;
 
         thread->shared->write_rate_limiter.reset(
-            NewGenericRateLimiter(write_rate));
+            NewGenericRateLimiter(static_cast<int64_t>(write_rate)));
         thread->shared->read_rate_limiter.reset(NewGenericRateLimiter(
-            read_rate,
+            static_cast<int64_t>(read_rate),
             FLAGS_sine_mix_rate_interval_milliseconds * uint64_t{1000}, 10,
             RateLimiter::Mode::kReadsOnly));
       }
@@ -4818,16 +5502,16 @@ void VerifyDBFromDB(std::string& truth_db_name) {
       } else if (query_type == 1) {
         // the Put query
         puts++;
-        int64_t value_size = ParetoCdfInversion(
+        int64_t val_size = ParetoCdfInversion(
             u, FLAGS_value_theta, FLAGS_value_k, FLAGS_value_sigma);
-        if (value_size < 0) {
-          value_size = 10;
-        } else if (value_size > value_max) {
-          value_size = value_size % value_max;
+        if (val_size < 0) {
+          val_size = 10;
+        } else if (val_size > value_max) {
+          val_size = val_size % value_max;
         }
         s = db_with_cfh->db->Put(
             write_options_, key,
-            gen.Generate(static_cast<unsigned int>(value_size)));
+            gen.Generate(static_cast<unsigned int>(val_size)));
         if (!s.ok()) {
           fprintf(stderr, "put error: %s\n", s.ToString().c_str());
           exit(1);
@@ -4835,7 +5519,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
 
         if (thread->shared->write_rate_limiter) {
           thread->shared->write_rate_limiter->Request(
-              key.size() + value_size, Env::IO_HIGH, nullptr /*stats*/,
+              key.size() + val_size, Env::IO_HIGH, nullptr /*stats*/,
               RateLimiter::OpType::kWrite);
         }
         thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kWrite);
@@ -4878,7 +5562,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     thread->stats.AddBytes(bytes);
     thread->stats.AddMessage(msg);
 
-    if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
+    if (FLAGS_perf_level > ROCKSDB_NAMESPACE::PerfLevel::kDisable) {
       thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
                                get_perf_context()->ToString());
     }
@@ -4908,7 +5592,10 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     int64_t found = 0;
     int64_t bytes = 0;
     ReadOptions options(FLAGS_verify_checksum, true);
+    options.total_order_seek = FLAGS_total_order_seek;
+    options.prefix_same_as_start = FLAGS_prefix_same_as_start;
     options.tailing = FLAGS_use_tailing_iterator;
+    options.readahead_size = FLAGS_readahead_size;
 
     Iterator* single_iter = nullptr;
     std::vector<Iterator*> multi_iters;
@@ -4932,7 +5619,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     char value_buffer[256];
     while (!duration.Done(1)) {
       int64_t seek_pos = thread->rand.Next() % FLAGS_num;
-      GenerateKeyFromInt((uint64_t)seek_pos, FLAGS_num, &key);
+      GenerateKeyFromIntForSeek(static_cast<uint64_t>(seek_pos), FLAGS_num,
+                                &key);
       if (FLAGS_max_scan_distance != 0) {
         if (FLAGS_reverse_iterator) {
           GenerateKeyFromInt(
@@ -4941,9 +5629,10 @@ void VerifyDBFromDB(std::string& truth_db_name) {
               FLAGS_num, &lower_bound);
           options.iterate_lower_bound = &lower_bound;
         } else {
-          GenerateKeyFromInt(
-              (uint64_t)std::min(FLAGS_num, seek_pos + FLAGS_max_scan_distance),
-              FLAGS_num, &upper_bound);
+          auto min_num =
+              std::min(FLAGS_num, seek_pos + FLAGS_max_scan_distance);
+          GenerateKeyFromInt(static_cast<uint64_t>(min_num), FLAGS_num,
+                             &upper_bound);
           options.iterate_upper_bound = &upper_bound;
         }
       }
@@ -5007,7 +5696,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
              found, read);
     thread->stats.AddBytes(bytes);
     thread->stats.AddMessage(msg);
-    if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
+    if (FLAGS_perf_level > ROCKSDB_NAMESPACE::PerfLevel::kDisable) {
       thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
                                get_perf_context()->ToString());
     }
@@ -5111,7 +5800,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
             // Wait for the writes to be finished
             if (!hint_printed) {
               fprintf(stderr, "Reads are finished. Have %d more writes to do\n",
-                      (int)writes_ - written);
+                      static_cast<int>(writes_) - written);
               hint_printed = true;
             }
           } else {
@@ -5124,10 +5813,11 @@ void VerifyDBFromDB(std::string& truth_db_name) {
       GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
       Status s;
 
+      Slice val = gen.Generate();
       if (write_merge == kWrite) {
-        s = db->Put(write_options_, key, gen.Generate(value_size_));
+        s = db->Put(write_options_, key, val);
       } else {
-        s = db->Merge(write_options_, key, gen.Generate(value_size_));
+        s = db->Merge(write_options_, key, val);
       }
       written++;
 
@@ -5135,12 +5825,12 @@ void VerifyDBFromDB(std::string& truth_db_name) {
         fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
         exit(1);
       }
-      bytes += key.size() + value_size_;
+      bytes += key.size() + val.size();
       thread->stats.FinishedOps(&db_, db_.db, 1, kWrite);
 
       if (FLAGS_benchmark_write_rate_limit > 0) {
         write_rate_limiter->Request(
-            entries_per_batch_ * (value_size_ + key_size_), Env::IO_HIGH,
+            key.size() + val.size(), Env::IO_HIGH,
             nullptr /* stats */, RateLimiter::OpType::kWrite);
       }
     }
@@ -5312,7 +6002,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
       } else if (put_weight > 0) {
         // then do all the corresponding number of puts
         // for all the gets we have done earlier
-        Status s = PutMany(db, write_options_, key, gen.Generate(value_size_));
+        Status s = PutMany(db, write_options_, key, gen.Generate());
         if (!s.ok()) {
           fprintf(stderr, "putmany error: %s\n", s.ToString().c_str());
           exit(1);
@@ -5380,7 +6070,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
       } else  if (put_weight > 0) {
         // then do all the corresponding number of puts
         // for all the gets we have done earlier
-        Status s = db->Put(write_options_, key, gen.Generate(value_size_));
+        Status s = db->Put(write_options_, key, gen.Generate());
         if (!s.ok()) {
           fprintf(stderr, "put error: %s\n", s.ToString().c_str());
           exit(1);
@@ -5426,16 +6116,17 @@ void VerifyDBFromDB(std::string& truth_db_name) {
 
       if (thread->shared->write_rate_limiter) {
         thread->shared->write_rate_limiter->Request(
-            key.size() + value_size_, Env::IO_HIGH, nullptr /*stats*/,
+            key.size() + value.size(), Env::IO_HIGH, nullptr /*stats*/,
             RateLimiter::OpType::kWrite);
       }
 
-      Status s = db->Put(write_options_, key, gen.Generate(value_size_));
+      Slice val = gen.Generate();
+      Status s = db->Put(write_options_, key, val);
       if (!s.ok()) {
         fprintf(stderr, "put error: %s\n", s.ToString().c_str());
         exit(1);
       }
-      bytes += key.size() + value_size_;
+      bytes += key.size() + val.size();
       thread->stats.FinishedOps(nullptr, db, 1, kUpdate);
     }
     char msg[100];
@@ -5474,7 +6165,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
         exit(1);
       }
 
-      Slice value = gen.Generate(value_size_);
+      Slice value = gen.Generate(static_cast<unsigned int>(existing_value.size()));
       std::string new_value;
 
       if (status.ok()) {
@@ -5529,7 +6220,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
       }
 
       // Update the value (by appending data)
-      Slice operand = gen.Generate(value_size_);
+      Slice operand = gen.Generate();
       if (value.size() > 0) {
         // Use a delimiter to match the semantics for StringAppendOperator
         value.append(1,',');
@@ -5576,21 +6267,22 @@ void VerifyDBFromDB(std::string& truth_db_name) {
       GenerateKeyFromInt(key_rand, merge_keys_, &key);
 
       Status s;
+      Slice val = gen.Generate();
       if (FLAGS_num_column_families > 1) {
         s = db_with_cfh->db->Merge(write_options_,
                                    db_with_cfh->GetCfh(key_rand), key,
-                                   gen.Generate(value_size_));
+                                   val);
       } else {
         s = db_with_cfh->db->Merge(write_options_,
                                    db_with_cfh->db->DefaultColumnFamily(), key,
-                                   gen.Generate(value_size_));
+                                   val);
       }
 
       if (!s.ok()) {
         fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
         exit(1);
       }
-      bytes += key.size() + value_size_;
+      bytes += key.size() + val.size();
       thread->stats.FinishedOps(nullptr, db_with_cfh->db, 1, kMerge);
     }
 
@@ -5628,7 +6320,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
       bool do_merge = int(thread->rand.Next() % 100) < FLAGS_mergereadpercent;
 
       if (do_merge) {
-        Status s = db->Merge(write_options_, key, gen.Generate(value_size_));
+        Status s = db->Merge(write_options_, key, gen.Generate());
         if (!s.ok()) {
           fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
           exit(1);
@@ -5695,6 +6387,97 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     }
   }
 
+  bool binary_search(std::vector<int>& data, int start, int end, int key) {
+    if (data.empty()) return false;
+    if (start > end) return false;
+    int mid = start + (end - start) / 2;
+    if (mid > static_cast<int>(data.size()) - 1) return false;
+    if (data[mid] == key) {
+      return true;
+    } else if (data[mid] > key) {
+      return binary_search(data, start, mid - 1, key);
+    } else {
+      return binary_search(data, mid + 1, end, key);
+    }
+  }
+
+  // Does a bunch of merge operations for a key(key1) where the merge operand
+  // is a sorted list. Next performance comparison is done between doing a Get
+  // for key1 followed by searching for another key(key2) in the large sorted
+  // list vs calling GetMergeOperands for key1 and then searching for the key2
+  // in all the sorted sub-lists. Later case is expected to be a lot faster.
+  void GetMergeOperands(ThreadState* thread) {
+    DB* db = SelectDB(thread);
+    const int kTotalValues = 100000;
+    const int kListSize = 100;
+    std::string key = "my_key";
+    std::string value;
+
+    for (int i = 1; i < kTotalValues; i++) {
+      if (i % kListSize == 0) {
+        // Remove trailing ','
+        value.pop_back();
+        db->Merge(WriteOptions(), key, value);
+        value.clear();
+      } else {
+        value.append(std::to_string(i)).append(",");
+      }
+    }
+
+    SortList s;
+    std::vector<int> data;
+    // This value can be experimented with and it will demonstrate the
+    // perf difference between doing a Get and searching for lookup_key in the
+    // resultant large sorted list vs doing GetMergeOperands and searching
+    // for lookup_key within this resultant sorted sub-lists.
+    int lookup_key = 1;
+
+    // Get API call
+    std::cout << "--- Get API call --- \n";
+    PinnableSlice p_slice;
+    uint64_t st = FLAGS_env->NowNanos();
+    db->Get(ReadOptions(), db->DefaultColumnFamily(), key, &p_slice);
+    s.MakeVector(data, p_slice);
+    bool found =
+        binary_search(data, 0, static_cast<int>(data.size() - 1), lookup_key);
+    std::cout << "Found key? " << std::to_string(found) << "\n";
+    uint64_t sp = FLAGS_env->NowNanos();
+    std::cout << "Get: " << (sp - st) / 1000000000.0 << " seconds\n";
+    std::string* dat_ = p_slice.GetSelf();
+    std::cout << "Sample data from Get API call: " << dat_->substr(0, 10)
+              << "\n";
+    data.clear();
+
+    // GetMergeOperands API call
+    std::cout << "--- GetMergeOperands API --- \n";
+    std::vector<PinnableSlice> a_slice((kTotalValues / kListSize) + 1);
+    st = FLAGS_env->NowNanos();
+    int number_of_operands = 0;
+    GetMergeOperandsOptions get_merge_operands_options;
+    get_merge_operands_options.expected_max_number_of_operands =
+        (kTotalValues / 100) + 1;
+    db->GetMergeOperands(ReadOptions(), db->DefaultColumnFamily(), key,
+                         a_slice.data(), &get_merge_operands_options,
+                         &number_of_operands);
+    for (PinnableSlice& psl : a_slice) {
+      s.MakeVector(data, psl);
+      found =
+          binary_search(data, 0, static_cast<int>(data.size() - 1), lookup_key);
+      data.clear();
+      if (found) break;
+    }
+    std::cout << "Found key? " << std::to_string(found) << "\n";
+    sp = FLAGS_env->NowNanos();
+    std::cout << "Get Merge operands: " << (sp - st) / 1000000000.0
+              << " seconds \n";
+    int to_print = 0;
+    std::cout << "Sample data from GetMergeOperands API call: ";
+    for (PinnableSlice& psl : a_slice) {
+      std::cout << "List: " << to_print << " : " << *psl.GetSelf() << "\n";
+      if (to_print++ > 2) break;
+    }
+  }
+
 #ifndef ROCKSDB_LITE
   // This benchmark stress tests Transactions.  For a given --duration (or
   // total number of --writes, a Transaction will perform a read-modify-write
@@ -5769,7 +6552,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     }
     thread->stats.AddMessage(msg);
 
-    if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
+    if (FLAGS_perf_level > ROCKSDB_NAMESPACE::PerfLevel::kDisable) {
       thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
                                get_perf_context()->ToString());
     }
@@ -5813,7 +6596,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     DB* db = SelectDB(thread);
     for (int64_t i = 0; i < FLAGS_numdistinct; i++) {
       GenerateKeyFromInt(i * max_counter, FLAGS_num, &key);
-      s = db->Put(write_options_, key, gen.Generate(value_size_));
+      s = db->Put(write_options_, key, gen.Generate());
       if (!s.ok()) {
         fprintf(stderr, "Operation failed: %s\n", s.ToString().c_str());
         exit(1);
@@ -5932,7 +6715,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
              read);
     thread->stats.AddBytes(bytes);
     thread->stats.AddMessage(msg);
-    if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
+    if (FLAGS_perf_level > ROCKSDB_NAMESPACE::PerfLevel::kDisable) {
       thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
                                get_perf_context()->ToString());
     }
@@ -5980,20 +6763,20 @@ void VerifyDBFromDB(std::string& truth_db_name) {
       timestamp_emulator_->Inc();
 
       Status s;
-
-      s = db->Put(write_options_, key, gen.Generate(value_size_));
+      Slice val = gen.Generate();
+      s = db->Put(write_options_, key, val);
 
       if (!s.ok()) {
         fprintf(stderr, "put error: %s\n", s.ToString().c_str());
         exit(1);
       }
-      bytes = key.size() + value_size_;
+      bytes = key.size() + val.size();
       thread->stats.FinishedOps(&db_, db_.db, 1, kWrite);
       thread->stats.AddBytes(bytes);
 
       if (FLAGS_benchmark_write_rate_limit > 0) {
         write_rate_limiter->Request(
-            entries_per_batch_ * (value_size_ + key_size_), Env::IO_HIGH,
+            key.size() + val.size(), Env::IO_HIGH,
             nullptr /* stats */, RateLimiter::OpType::kWrite);
       }
     }
@@ -6014,7 +6797,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
   void Compact(ThreadState* thread) {
     DB* db = SelectDB(thread);
     CompactRangeOptions cro;
-    cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+    cro.bottommost_level_compaction =
+        BottommostLevelCompaction::kForceOptimized;
     db->CompactRange(cro, nullptr, nullptr);
   }
 
@@ -6036,6 +6820,39 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     }
   }
 
+  void PrintStatsHistory() {
+    if (db_.db != nullptr) {
+      PrintStatsHistoryImpl(db_.db, false);
+    }
+    for (const auto& db_with_cfh : multi_dbs_) {
+      PrintStatsHistoryImpl(db_with_cfh.db, true);
+    }
+  }
+
+  void PrintStatsHistoryImpl(DB* db, bool print_header) {
+    if (print_header) {
+      fprintf(stdout, "\n==== DB: %s ===\n", db->GetName().c_str());
+    }
+
+    std::unique_ptr<StatsHistoryIterator> shi;
+    Status s = db->GetStatsHistory(0, port::kMaxUint64, &shi);
+    if (!s.ok()) {
+      fprintf(stdout, "%s\n", s.ToString().c_str());
+      return;
+    }
+    assert(shi);
+    while (shi->Valid()) {
+      uint64_t stats_time = shi->GetStatsTime();
+      fprintf(stdout, "------ %s ------\n",
+              TimeToHumanString(static_cast<int>(stats_time)).c_str());
+      for (auto& entry : shi->GetStatsMap()) {
+        fprintf(stdout, " %" PRIu64 "   %s  %" PRIu64 "\n", stats_time,
+                entry.first.c_str(), entry.second);
+      }
+      shi->Next();
+    }
+  }
+
   void PrintStats(const char* key) {
     if (db_.db != nullptr) {
       PrintStats(db_.db, key, false);
@@ -6077,7 +6894,10 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     }
     Replayer replayer(db_with_cfh->db, db_with_cfh->cfh,
                       std::move(trace_reader));
-    s = replayer.Replay();
+    replayer.SetFastForward(
+        static_cast<uint32_t>(FLAGS_trace_replay_fast_forward));
+    s = replayer.MultiThreadReplay(
+        static_cast<uint32_t>(FLAGS_trace_replay_threads));
     if (s.ok()) {
       fprintf(stdout, "Replay started from trace_file: %s\n",
               FLAGS_trace_file.c_str());
@@ -6089,7 +6909,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
 };
 
 int db_bench_tool(int argc, char** argv) {
-  rocksdb::port::InstallStackTraceHandler();
+  ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
   static bool initialized = false;
   if (!initialized) {
     SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) +
@@ -6097,7 +6917,8 @@ int db_bench_tool(int argc, char** argv) {
     initialized = true;
   }
   ParseCommandLineFlags(&argc, &argv, true);
-  FLAGS_compaction_style_e = (rocksdb::CompactionStyle) FLAGS_compaction_style;
+  FLAGS_compaction_style_e =
+      (ROCKSDB_NAMESPACE::CompactionStyle)FLAGS_compaction_style;
 #ifndef ROCKSDB_LITE
   if (FLAGS_statistics && !FLAGS_statistics_string.empty()) {
     fprintf(stderr,
@@ -6105,26 +6926,26 @@ int db_bench_tool(int argc, char** argv) {
     exit(1);
   }
   if (!FLAGS_statistics_string.empty()) {
-    std::unique_ptr<Statistics> custom_stats_guard;
-    dbstats.reset(NewCustomObject<Statistics>(FLAGS_statistics_string,
-                                              &custom_stats_guard));
-    custom_stats_guard.release();
+    Status s = ObjectRegistry::NewInstance()->NewSharedObject<Statistics>(
+        FLAGS_statistics_string, &dbstats);
     if (dbstats == nullptr) {
-      fprintf(stderr, "No Statistics registered matching string: %s\n",
-              FLAGS_statistics_string.c_str());
+      fprintf(stderr,
+              "No Statistics registered matching string: %s status=%s\n",
+              FLAGS_statistics_string.c_str(), s.ToString().c_str());
       exit(1);
     }
   }
 #endif  // ROCKSDB_LITE
   if (FLAGS_statistics) {
-    dbstats = rocksdb::CreateDBStatistics();
+    dbstats = ROCKSDB_NAMESPACE::CreateDBStatistics();
   }
   if (dbstats) {
     dbstats->set_stats_level(static_cast<StatsLevel>(FLAGS_stats_level));
   }
-  FLAGS_compaction_pri_e = (rocksdb::CompactionPri)FLAGS_compaction_pri;
+  FLAGS_compaction_pri_e =
+      (ROCKSDB_NAMESPACE::CompactionPri)FLAGS_compaction_pri;
 
-  std::vector<std::string> fanout = rocksdb::StringSplit(
+  std::vector<std::string> fanout = ROCKSDB_NAMESPACE::StringSplit(
       FLAGS_max_bytes_for_level_multiplier_additional, ',');
   for (size_t j = 0; j < fanout.size(); j++) {
     FLAGS_max_bytes_for_level_multiplier_additional_v.push_back(
@@ -6139,12 +6960,14 @@ int db_bench_tool(int argc, char** argv) {
     StringToCompressionType(FLAGS_compression_type.c_str());
 
 #ifndef ROCKSDB_LITE
-  std::unique_ptr<Env> custom_env_guard;
+  FLAGS_blob_db_compression_type_e =
+    StringToCompressionType(FLAGS_blob_db_compression_type.c_str());
+
   if (!FLAGS_hdfs.empty() && !FLAGS_env_uri.empty()) {
     fprintf(stderr, "Cannot provide both --hdfs and --env_uri.\n");
     exit(1);
   } else if (!FLAGS_env_uri.empty()) {
-    FLAGS_env = NewCustomObject<Env>(FLAGS_env_uri, &custom_env_guard);
+    Status s = Env::LoadEnv(FLAGS_env_uri, &FLAGS_env, &env_guard);
     if (FLAGS_env == nullptr) {
       fprintf(stderr, "No Env registered for URI: %s\n", FLAGS_env_uri.c_str());
       exit(1);
@@ -6159,37 +6982,40 @@ int db_bench_tool(int argc, char** argv) {
   }
 
   if (!FLAGS_hdfs.empty()) {
-    FLAGS_env  = new rocksdb::HdfsEnv(FLAGS_hdfs);
+    FLAGS_env = new ROCKSDB_NAMESPACE::HdfsEnv(FLAGS_hdfs);
   }
 
   if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "NONE"))
-    FLAGS_compaction_fadvice_e = rocksdb::Options::NONE;
+    FLAGS_compaction_fadvice_e = ROCKSDB_NAMESPACE::Options::NONE;
   else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "NORMAL"))
-    FLAGS_compaction_fadvice_e = rocksdb::Options::NORMAL;
+    FLAGS_compaction_fadvice_e = ROCKSDB_NAMESPACE::Options::NORMAL;
   else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "SEQUENTIAL"))
-    FLAGS_compaction_fadvice_e = rocksdb::Options::SEQUENTIAL;
+    FLAGS_compaction_fadvice_e = ROCKSDB_NAMESPACE::Options::SEQUENTIAL;
   else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "WILLNEED"))
-    FLAGS_compaction_fadvice_e = rocksdb::Options::WILLNEED;
+    FLAGS_compaction_fadvice_e = ROCKSDB_NAMESPACE::Options::WILLNEED;
   else {
     fprintf(stdout, "Unknown compaction fadvice:%s\n",
             FLAGS_compaction_fadvice.c_str());
   }
 
+  FLAGS_value_size_distribution_type_e =
+    StringToDistributionType(FLAGS_value_size_distribution_type.c_str());
+
   FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str());
 
   // Note options sanitization may increase thread pool sizes according to
   // max_background_flushes/max_background_compactions/max_background_jobs
   FLAGS_env->SetBackgroundThreads(FLAGS_num_high_pri_threads,
-                                  rocksdb::Env::Priority::HIGH);
+                                  ROCKSDB_NAMESPACE::Env::Priority::HIGH);
   FLAGS_env->SetBackgroundThreads(FLAGS_num_bottom_pri_threads,
-                                  rocksdb::Env::Priority::BOTTOM);
+                                  ROCKSDB_NAMESPACE::Env::Priority::BOTTOM);
   FLAGS_env->SetBackgroundThreads(FLAGS_num_low_pri_threads,
-                                  rocksdb::Env::Priority::LOW);
+                                  ROCKSDB_NAMESPACE::Env::Priority::LOW);
 
   // Choose a location for the test database if none given with --db=<path>
   if (FLAGS_db.empty()) {
     std::string default_db_path;
-    rocksdb::Env::Default()->GetTestDirectory(&default_db_path);
+    FLAGS_env->GetTestDirectory(&default_db_path);
     default_db_path += "/dbbench";
     FLAGS_db = default_db_path;
   }
@@ -6200,18 +7026,23 @@ int db_bench_tool(int argc, char** argv) {
     FLAGS_stats_interval = 1000;
   }
 
-  rocksdb::Benchmark benchmark;
+  if (FLAGS_seek_missing_prefix && FLAGS_prefix_size <= 8) {
+    fprintf(stderr, "prefix_size > 8 required by --seek_missing_prefix\n");
+    exit(1);
+  }
+
+  ROCKSDB_NAMESPACE::Benchmark benchmark;
   benchmark.Run();
 
 #ifndef ROCKSDB_LITE
   if (FLAGS_print_malloc_stats) {
     std::string stats_string;
-    rocksdb::DumpMallocStats(&stats_string);
+    ROCKSDB_NAMESPACE::DumpMallocStats(&stats_string);
     fprintf(stdout, "Malloc stats:\n%s\n", stats_string.c_str());
   }
 #endif  // ROCKSDB_LITE
 
   return 0;
 }
-}  // namespace rocksdb
+}  // namespace ROCKSDB_NAMESPACE
 #endif