]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/tools/db_bench_tool.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / tools / db_bench_tool.cc
index e3560d6fac86d027504fd5185f1ea4a3b7907550..0cb4e0eb27ec185d085d9db4f4c8f74e4ec1b6ac 100644 (file)
@@ -21,6 +21,7 @@
 #endif
 #include <fcntl.h>
 #include <inttypes.h>
+#include <math.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <sys/types.h>
 #include <unordered_map>
 
 #include "db/db_impl.h"
+#include "db/malloc_stats.h"
 #include "db/version_set.h"
 #include "hdfs/env_hdfs.h"
 #include "monitoring/histogram.h"
 #include "monitoring/statistics.h"
+#include "options/cf_options.h"
 #include "port/port.h"
 #include "port/stack_trace.h"
 #include "rocksdb/cache.h"
@@ -45,7 +48,6 @@
 #include "rocksdb/filter_policy.h"
 #include "rocksdb/memtablerep.h"
 #include "rocksdb/options.h"
-#include "options/cf_options.h"
 #include "rocksdb/perf_context.h"
 #include "rocksdb/persistent_cache.h"
 #include "rocksdb/rate_limiter.h"
@@ -101,6 +103,7 @@ DEFINE_string(
     "compact,"
     "compactall,"
     "multireadrandom,"
+    "mixgraph,"
     "readseq,"
     "readtocache,"
     "readreverse,"
@@ -248,6 +251,10 @@ DEFINE_bool(reverse_iterator, false,
             "When true use Prev rather than Next for iterators that do "
             "Seek and then Next");
 
+DEFINE_int64(max_scan_distance, 0,
+             "Used to define iterate_upper_bound (or iterate_lower_bound "
+             "if FLAGS_reverse_iterator is set to true) when value is nonzero");
+
 DEFINE_bool(use_uint64_comparator, false, "use Uint64 user comparator");
 
 DEFINE_int64(batch_size, 1, "Batch size");
@@ -507,6 +514,8 @@ DEFINE_int32(bloom_bits, -1, "Bloom filter bits per key. Negative means"
 DEFINE_double(memtable_bloom_size_ratio, 0,
               "Ratio of memtable size used for bloom filter. 0 means no bloom "
               "filter.");
+DEFINE_bool(memtable_whole_key_filtering, false,
+            "Try to use whole key bloom filter in memtables.");
 DEFINE_bool(memtable_use_huge_page, false,
             "Try to use huge page in memtables.");
 
@@ -514,6 +523,14 @@ DEFINE_bool(use_existing_db, false, "If true, do not destroy the existing"
             " database.  If you set this flag and also specify a benchmark that"
             " wants a fresh database, that benchmark will fail.");
 
+DEFINE_bool(use_existing_keys, false,
+            "If true, uses existing keys in the DB, "
+            "rather than generating new ones. This involves some startup "
+            "latency to load all keys into memory. It is supported for the "
+            "same read/overwrite benchmarks as `-use_existing_db=true`, which "
+            "must also be set for this flag to be enabled. When this flag is "
+            "set, the value for `-num` will be ignored.");
+
 DEFINE_bool(show_table_properties, false,
             "If true, then per-level table"
             " properties will be printed on every stats-interval when"
@@ -551,6 +568,8 @@ DEFINE_bool(verify_checksum, true,
             " from storage");
 
 DEFINE_bool(statistics, false, "Database statistics");
+DEFINE_int32(stats_level, rocksdb::StatsLevel::kExceptDetailedTimers,
+             "stats level for statistics");
 DEFINE_string(statistics_string, "", "Serialized statistics string");
 static class std::shared_ptr<rocksdb::Statistics> dbstats;
 
@@ -640,9 +659,11 @@ DEFINE_bool(optimize_filters_for_hits, false,
 DEFINE_uint64(delete_obsolete_files_period_micros, 0,
               "Ignored. Left here for backward compatibility");
 
+DEFINE_int64(writes_before_delete_range, 0,
+             "Number of writes before DeleteRange is called regularly.");
+
 DEFINE_int64(writes_per_range_tombstone, 0,
-             "Number of writes between range "
-             "tombstones");
+             "Number of writes between range tombstones");
 
 DEFINE_int64(range_tombstone_width, 100, "Number of keys in tombstone's range");
 
@@ -687,6 +708,7 @@ DEFINE_string(
     "RocksDB options related command-line arguments, all other arguments "
     "that are related to RocksDB options will be ignored:\n"
     "\t--use_existing_db\n"
+    "\t--use_existing_keys\n"
     "\t--statistics\n"
     "\t--row_cache_size\n"
     "\t--row_cache_numshardbits\n"
@@ -779,6 +801,8 @@ DEFINE_string(compression_type, "snappy",
 static enum rocksdb::CompressionType FLAGS_compression_type_e =
     rocksdb::kSnappyCompression;
 
+DEFINE_int64(sample_for_compression, 0, "Sample every N block for compression");
+
 DEFINE_int32(compression_level, rocksdb::CompressionOptions().level,
              "Compression level. The meaning of this value is library-"
              "dependent. If unset, we try to use the default for the library "
@@ -925,6 +949,52 @@ DEFINE_uint64(
     "If non-zero, db_bench will rate-limit the writes going into RocksDB. This "
     "is the global rate in bytes/second.");
 
+// the parameters of mix_graph
+DEFINE_double(key_dist_a, 0.0,
+              "The parameter 'a' of key access distribution model "
+              "f(x)=a*x^b");
+DEFINE_double(key_dist_b, 0.0,
+              "The parameter 'b' of key access distribution model "
+              "f(x)=a*x^b");
+DEFINE_double(value_theta, 0.0,
+              "The parameter 'theta' of Generized Pareto Distribution "
+              "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
+DEFINE_double(value_k, 0.0,
+              "The parameter 'k' of Generized Pareto Distribution "
+              "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
+DEFINE_double(value_sigma, 0.0,
+              "The parameter 'theta' of Generized Pareto Distribution "
+              "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
+DEFINE_double(iter_theta, 0.0,
+              "The parameter 'theta' of Generized Pareto Distribution "
+              "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
+DEFINE_double(iter_k, 0.0,
+              "The parameter 'k' of Generized Pareto Distribution "
+              "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
+DEFINE_double(iter_sigma, 0.0,
+              "The parameter 'sigma' of Generized Pareto Distribution "
+              "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
+DEFINE_double(mix_get_ratio, 1.0,
+              "The ratio of Get queries of mix_graph workload");
+DEFINE_double(mix_put_ratio, 0.0,
+              "The ratio of Put queries of mix_graph workload");
+DEFINE_double(mix_seek_ratio, 0.0,
+              "The ratio of Seek queries of mix_graph workload");
+DEFINE_int64(mix_max_scan_len, 10000, "The max scan length of Iterator");
+DEFINE_int64(mix_ave_kv_size, 512,
+             "The average key-value size of this workload");
+DEFINE_int64(mix_max_value_size, 1024, "The max value size of this workload");
+DEFINE_double(
+    sine_mix_rate_noise, 0.0,
+    "Add the noise ratio to the sine rate, it is between 0.0 and 1.0");
+DEFINE_bool(sine_mix_rate, false,
+            "Enable the sine QPS control on the mix workload");
+DEFINE_uint64(
+    sine_mix_rate_interval_milliseconds, 10000,
+    "Interval of which the sine wave read_rate_limit is recalculated");
+DEFINE_int64(mix_accesses, -1,
+             "The total query accesses of mix_graph workload");
+
 DEFINE_uint64(
     benchmark_read_rate_limit, 0,
     "If non-zero, db_bench will rate-limit the reads from RocksDB. This "
@@ -935,6 +1005,9 @@ DEFINE_uint64(max_compaction_bytes, rocksdb::Options().max_compaction_bytes,
 
 #ifndef ROCKSDB_LITE
 DEFINE_bool(readonly, false, "Run read only benchmarks.");
+
+DEFINE_bool(print_malloc_stats, false,
+            "Print malloc stats to stdout after benchmarks finish.");
 #endif  // ROCKSDB_LITE
 
 DEFINE_bool(disable_auto_compactions, false, "Do not auto trigger compactions");
@@ -1035,13 +1108,18 @@ DEFINE_bool(identity_as_first_hash, false, "the first hash function of cuckoo "
 DEFINE_bool(dump_malloc_stats, true, "Dump malloc stats in LOG ");
 DEFINE_uint64(stats_dump_period_sec, rocksdb::Options().stats_dump_period_sec,
               "Gap between printing stats to log in seconds");
+DEFINE_uint64(stats_persist_period_sec,
+              rocksdb::Options().stats_persist_period_sec,
+              "Gap between persisting stats in seconds");
+DEFINE_uint64(stats_history_buffer_size,
+              rocksdb::Options().stats_history_buffer_size,
+              "Max number of stats snapshots to keep in memory");
 
 enum RepFactory {
   kSkipList,
   kPrefixHash,
   kVectorRep,
   kHashLinkedList,
-  kCuckoo
 };
 
 static enum RepFactory StringToRepFactory(const char* ctype) {
@@ -1055,8 +1133,6 @@ static enum RepFactory StringToRepFactory(const char* ctype) {
     return kVectorRep;
   else if (!strcasecmp(ctype, "hash_linkedlist"))
     return kHashLinkedList;
-  else if (!strcasecmp(ctype, "cuckoo"))
-    return kCuckoo;
 
   fprintf(stdout, "Cannot parse memreptable %s\n", ctype);
   return kSkipList;
@@ -1137,19 +1213,20 @@ class ReportFileOpEnv : public EnvWrapper {
     counters_.bytes_written_ = 0;
   }
 
-  Status NewSequentialFile(const std::string& f, unique_ptr<SequentialFile>* r,
+  Status NewSequentialFile(const std::string& f,
+                           std::unique_ptr<SequentialFile>* r,
                            const EnvOptions& soptions) override {
     class CountingFile : public SequentialFile {
      private:
-      unique_ptr<SequentialFile> target_;
+      std::unique_ptr<SequentialFile> target_;
       ReportFileOpCounters* counters_;
 
      public:
-      CountingFile(unique_ptr<SequentialFile>&& target,
+      CountingFile(std::unique_ptr<SequentialFile>&& target,
                    ReportFileOpCounters* counters)
           : target_(std::move(target)), counters_(counters) {}
 
-      virtual Status Read(size_t n, Slice* result, char* scratch) override {
+      Status Read(size_t n, Slice* result, char* scratch) override {
         counters_->read_counter_.fetch_add(1, std::memory_order_relaxed);
         Status rv = target_->Read(n, result, scratch);
         counters_->bytes_read_.fetch_add(result->size(),
@@ -1157,7 +1234,7 @@ class ReportFileOpEnv : public EnvWrapper {
         return rv;
       }
 
-      virtual Status Skip(uint64_t n) override { return target_->Skip(n); }
+      Status Skip(uint64_t n) override { return target_->Skip(n); }
     };
 
     Status s = target()->NewSequentialFile(f, r, soptions);
@@ -1169,19 +1246,19 @@ class ReportFileOpEnv : public EnvWrapper {
   }
 
   Status NewRandomAccessFile(const std::string& f,
-                             unique_ptr<RandomAccessFile>* r,
+                             std::unique_ptr<RandomAccessFile>* r,
                              const EnvOptions& soptions) override {
     class CountingFile : public RandomAccessFile {
      private:
-      unique_ptr<RandomAccessFile> target_;
+      std::unique_ptr<RandomAccessFile> target_;
       ReportFileOpCounters* counters_;
 
      public:
-      CountingFile(unique_ptr<RandomAccessFile>&& target,
+      CountingFile(std::unique_ptr<RandomAccessFile>&& target,
                    ReportFileOpCounters* counters)
           : target_(std::move(target)), counters_(counters) {}
-      virtual Status Read(uint64_t offset, size_t n, Slice* result,
-                          char* scratch) const override {
+      Status Read(uint64_t offset, size_t n, Slice* result,
+                  char* scratch) const override {
         counters_->read_counter_.fetch_add(1, std::memory_order_relaxed);
         Status rv = target_->Read(offset, n, result, scratch);
         counters_->bytes_read_.fetch_add(result->size(),
@@ -1198,15 +1275,15 @@ class ReportFileOpEnv : public EnvWrapper {
     return s;
   }
 
-  Status NewWritableFile(const std::string& f, unique_ptr<WritableFile>* r,
+  Status NewWritableFile(const std::string& f, std::unique_ptr<WritableFile>* r,
                          const EnvOptions& soptions) override {
     class CountingFile : public WritableFile {
      private:
-      unique_ptr<WritableFile> target_;
+      std::unique_ptr<WritableFile> target_;
       ReportFileOpCounters* counters_;
 
      public:
-      CountingFile(unique_ptr<WritableFile>&& target,
+      CountingFile(std::unique_ptr<WritableFile>&& target,
                    ReportFileOpCounters* counters)
           : target_(std::move(target)), counters_(counters) {}
 
@@ -1772,7 +1849,7 @@ class Stats {
 
     fprintf(stdout, "%-12s : %11.3f micros/op %ld ops/sec;%s%s\n",
             name.ToString().c_str(),
-            elapsed * 1e6 / done_,
+            seconds_ * 1e6 / done_,
             (long)throughput,
             (extra.empty() ? "" : " "),
             extra.c_str());
@@ -1968,12 +2045,15 @@ class Benchmark {
   int prefix_size_;
   int64_t keys_per_prefix_;
   int64_t entries_per_batch_;
+  int64_t writes_before_delete_range_;
   int64_t writes_per_range_tombstone_;
   int64_t range_tombstone_width_;
   int64_t max_num_range_tombstones_;
   WriteOptions write_options_;
   Options open_options_;  // keep options around to properly destroy db later
+#ifndef ROCKSDB_LITE
   TraceOptions trace_options_;
+#endif
   int64_t reads_;
   int64_t deletes_;
   double read_random_exp_range_;
@@ -1982,25 +2062,28 @@ class Benchmark {
   int64_t merge_keys_;
   bool report_file_operations_;
   bool use_blob_db_;
+  std::vector<std::string> keys_;
 
   class ErrorHandlerListener : public EventListener {
    public:
+#ifndef ROCKSDB_LITE
     ErrorHandlerListener()
         : mutex_(),
           cv_(&mutex_),
           no_auto_recovery_(false),
           recovery_complete_(false) {}
 
-    ~ErrorHandlerListener() {}
+    ~ErrorHandlerListener() override {}
 
     void OnErrorRecoveryBegin(BackgroundErrorReason /*reason*/,
-                              Status /*bg_error*/, bool* auto_recovery) {
+                              Status /*bg_error*/,
+                              bool* auto_recovery) override {
       if (*auto_recovery && no_auto_recovery_) {
         *auto_recovery = false;
       }
     }
 
-    void OnErrorRecoveryCompleted(Status /*old_bg_error*/) {
+    void OnErrorRecoveryCompleted(Status /*old_bg_error*/) override {
       InstrumentedMutexLock l(&mutex_);
       recovery_complete_ = true;
       cv_.SignalAll();
@@ -2025,6 +2108,10 @@ class Benchmark {
     InstrumentedCondVar cv_;
     bool no_auto_recovery_;
     bool recovery_complete_;
+#else   // ROCKSDB_LITE
+    bool WaitForRecovery(uint64_t /*abs_time_us*/) { return true; }
+    void EnableAutoRecovery(bool /*enable*/) {}
+#endif  // ROCKSDB_LITE
   };
 
   std::shared_ptr<ErrorHandlerListener> listener_;
@@ -2037,28 +2124,28 @@ class Benchmark {
     return true;
   }
 
-  inline bool CompressSlice(const CompressionContext& compression_ctx,
+  inline bool CompressSlice(const CompressionInfo& compression_info,
                             const Slice& input, std::string* compressed) {
     bool ok = true;
     switch (FLAGS_compression_type_e) {
       case rocksdb::kSnappyCompression:
-        ok = Snappy_Compress(compression_ctx, input.data(), input.size(),
+        ok = Snappy_Compress(compression_info, input.data(), input.size(),
                              compressed);
         break;
       case rocksdb::kZlibCompression:
-        ok = Zlib_Compress(compression_ctx, 2, input.data(), input.size(),
+        ok = Zlib_Compress(compression_info, 2, input.data(), input.size(),
                            compressed);
         break;
       case rocksdb::kBZip2Compression:
-        ok = BZip2_Compress(compression_ctx, 2, input.data(), input.size(),
+        ok = BZip2_Compress(compression_info, 2, input.data(), input.size(),
                             compressed);
         break;
       case rocksdb::kLZ4Compression:
-        ok = LZ4_Compress(compression_ctx, 2, input.data(), input.size(),
+        ok = LZ4_Compress(compression_info, 2, input.data(), input.size(),
                           compressed);
         break;
       case rocksdb::kLZ4HCCompression:
-        ok = LZ4HC_Compress(compression_ctx, 2, input.data(), input.size(),
+        ok = LZ4HC_Compress(compression_info, 2, input.data(), input.size(),
                             compressed);
         break;
       case rocksdb::kXpressCompression:
@@ -2066,7 +2153,7 @@ class Benchmark {
           input.size(), compressed);
         break;
       case rocksdb::kZSTD:
-        ok = ZSTD_Compress(compression_ctx, input.data(), input.size(),
+        ok = ZSTD_Compress(compression_info, input.data(), input.size(),
                            compressed);
         break;
       default:
@@ -2110,6 +2197,8 @@ class Benchmark {
 
     auto compression = CompressionTypeToString(FLAGS_compression_type_e);
     fprintf(stdout, "Compression: %s\n", compression.c_str());
+    fprintf(stdout, "Compression sampling rate: %" PRId64 "\n",
+            FLAGS_sample_for_compression);
 
     switch (FLAGS_rep_factory) {
       case kPrefixHash:
@@ -2124,9 +2213,6 @@ class Benchmark {
       case kHashLinkedList:
         fprintf(stdout, "Memtablerep: hash_linkedlist\n");
         break;
-      case kCuckoo:
-        fprintf(stdout, "Memtablerep: cuckoo\n");
-        break;
     }
     fprintf(stdout, "Perf Level: %d\n", FLAGS_perf_level);
 
@@ -2149,10 +2235,12 @@ class Benchmark {
       const int len = FLAGS_block_size;
       std::string input_str(len, 'y');
       std::string compressed;
-      CompressionContext compression_ctx(FLAGS_compression_type_e,
-                                         Options().compression_opts);
-      bool result =
-          CompressSlice(compression_ctx, Slice(input_str), &compressed);
+      CompressionOptions opts;
+      CompressionContext context(FLAGS_compression_type_e);
+      CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
+                           FLAGS_compression_type_e,
+                           FLAGS_sample_for_compression);
+      bool result = CompressSlice(info, Slice(input_str), &compressed);
 
       if (!result) {
         fprintf(stdout, "WARNING: %s compression is not enabled\n",
@@ -2253,13 +2341,13 @@ class Benchmark {
 
   class KeepFilter : public CompactionFilter {
    public:
-    virtual bool Filter(int /*level*/, const Slice& /*key*/,
-                        const Slice& /*value*/, std::string* /*new_value*/,
-                        bool* /*value_changed*/) const override {
+    bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
+                std::string* /*new_value*/,
+                bool* /*value_changed*/) const override {
       return false;
     }
 
-    virtual const char* Name() const override { return "KeepFilter"; }
+    const char* Name() const override { return "KeepFilter"; }
   };
 
   std::shared_ptr<Cache> NewCache(int64_t capacity) {
@@ -2397,6 +2485,13 @@ class Benchmark {
   //   |        key 00000         |
   //   ----------------------------
   void GenerateKeyFromInt(uint64_t v, int64_t num_keys, Slice* key) {
+    if (!keys_.empty()) {
+      assert(FLAGS_use_existing_keys);
+      assert(keys_.size() == static_cast<size_t>(num_keys));
+      assert(v < static_cast<uint64_t>(num_keys));
+      *key = keys_[v];
+      return;
+    }
     char* start = const_cast<char*>(key->data());
     char* pos = start;
     if (keys_per_prefix_ > 0) {
@@ -2495,6 +2590,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
       value_size_ = FLAGS_value_size;
       key_size_ = FLAGS_key_size;
       entries_per_batch_ = FLAGS_batch_size;
+      writes_before_delete_range_ = FLAGS_writes_before_delete_range;
       writes_per_range_tombstone_ = FLAGS_writes_per_range_tombstone;
       range_tombstone_width_ = FLAGS_range_tombstone_width;
       max_num_range_tombstones_ = FLAGS_max_num_range_tombstones;
@@ -2611,6 +2707,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
         fprintf(stderr, "entries_per_batch = %" PRIi64 "\n",
                 entries_per_batch_);
         method = &Benchmark::MultiReadRandom;
+      } else if (name == "mixgraph") {
+        method = &Benchmark::MixGraph;
       } else if (name == "readmissing") {
         ++key_size_;
         method = &Benchmark::ReadRandom;
@@ -2849,6 +2947,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     }
 
     SetPerfLevel(static_cast<PerfLevel> (shared->perf_level));
+    perf_context.EnablePerLevelPerfContext();
     thread->stats.Start(thread->tid);
     (arg->bm->*(arg->method))(thread);
     thread->stats.Stop();
@@ -3004,13 +3103,15 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     int64_t produced = 0;
     bool ok = true;
     std::string compressed;
-    CompressionContext compression_ctx(FLAGS_compression_type_e,
-                                       Options().compression_opts);
-
+    CompressionOptions opts;
+    CompressionContext context(FLAGS_compression_type_e);
+    CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
+                         FLAGS_compression_type_e,
+                         FLAGS_sample_for_compression);
     // Compress 1G
     while (ok && bytes < int64_t(1) << 30) {
       compressed.clear();
-      ok = CompressSlice(compression_ctx, input, &compressed);
+      ok = CompressSlice(info, input, &compressed);
       produced += compressed.size();
       bytes += input.size();
       thread->stats.FinishedOps(nullptr, nullptr, 1, kCompress);
@@ -3032,15 +3133,21 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     Slice input = gen.Generate(FLAGS_block_size);
     std::string compressed;
 
+    CompressionContext compression_ctx(FLAGS_compression_type_e);
+    CompressionOptions compression_opts;
+    CompressionInfo compression_info(
+        compression_opts, compression_ctx, CompressionDict::GetEmptyDict(),
+        FLAGS_compression_type_e, FLAGS_sample_for_compression);
     UncompressionContext uncompression_ctx(FLAGS_compression_type_e);
-    CompressionContext compression_ctx(FLAGS_compression_type_e,
-                                       Options().compression_opts);
+    UncompressionInfo uncompression_info(uncompression_ctx,
+                                         UncompressionDict::GetEmptyDict(),
+                                         FLAGS_compression_type_e);
 
-    bool ok = CompressSlice(compression_ctx, input, &compressed);
+    bool ok = CompressSlice(compression_info, input, &compressed);
     int64_t bytes = 0;
     int decompress_size;
     while (ok && bytes < 1024 * 1048576) {
-      char *uncompressed = nullptr;
+      CacheAllocationPtr uncompressed;
       switch (FLAGS_compression_type_e) {
         case rocksdb::kSnappyCompression: {
           // get size and allocate here to make comparison fair
@@ -3050,45 +3157,44 @@ void VerifyDBFromDB(std::string& truth_db_name) {
             ok = false;
             break;
           }
-          uncompressed = new char[ulength];
+          uncompressed = AllocateBlock(ulength, nullptr);
           ok = Snappy_Uncompress(compressed.data(), compressed.size(),
-                                 uncompressed);
+                                 uncompressed.get());
           break;
         }
       case rocksdb::kZlibCompression:
-        uncompressed = Zlib_Uncompress(uncompression_ctx, compressed.data(),
+        uncompressed = Zlib_Uncompress(uncompression_info, compressed.data(),
                                        compressed.size(), &decompress_size, 2);
-        ok = uncompressed != nullptr;
+        ok = uncompressed.get() != nullptr;
         break;
       case rocksdb::kBZip2Compression:
         uncompressed = BZip2_Uncompress(compressed.data(), compressed.size(),
                                         &decompress_size, 2);
-        ok = uncompressed != nullptr;
+        ok = uncompressed.get() != nullptr;
         break;
       case rocksdb::kLZ4Compression:
-        uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(),
+        uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(),
                                       compressed.size(), &decompress_size, 2);
-        ok = uncompressed != nullptr;
+        ok = uncompressed.get() != nullptr;
         break;
       case rocksdb::kLZ4HCCompression:
-        uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(),
+        uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(),
                                       compressed.size(), &decompress_size, 2);
-        ok = uncompressed != nullptr;
+        ok = uncompressed.get() != nullptr;
         break;
       case rocksdb::kXpressCompression:
-        uncompressed = XPRESS_Uncompress(compressed.data(), compressed.size(),
-          &decompress_size);
-        ok = uncompressed != nullptr;
+        uncompressed.reset(XPRESS_Uncompress(
+            compressed.data(), compressed.size(), &decompress_size));
+        ok = uncompressed.get() != nullptr;
         break;
       case rocksdb::kZSTD:
-        uncompressed = ZSTD_Uncompress(uncompression_ctx, compressed.data(),
+        uncompressed = ZSTD_Uncompress(uncompression_info, compressed.data(),
                                        compressed.size(), &decompress_size);
-        ok = uncompressed != nullptr;
+        ok = uncompressed.get() != nullptr;
         break;
       default:
         ok = false;
       }
-      delete[] uncompressed;
       bytes += input.size();
       thread->stats.FinishedOps(nullptr, nullptr, 1, kUncompress);
     }
@@ -3153,9 +3259,10 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     options.use_direct_io_for_flush_and_compaction =
         FLAGS_use_direct_io_for_flush_and_compaction;
 #ifndef ROCKSDB_LITE
+    options.ttl = FLAGS_fifo_compaction_ttl;
     options.compaction_options_fifo = CompactionOptionsFIFO(
         FLAGS_fifo_compaction_max_table_files_size_mb * 1024 * 1024,
-        FLAGS_fifo_compaction_allow_compaction, FLAGS_fifo_compaction_ttl);
+        FLAGS_fifo_compaction_allow_compaction);
 #endif  // ROCKSDB_LITE
     if (FLAGS_prefix_size != 0) {
       options.prefix_extractor.reset(
@@ -3173,6 +3280,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     }
     options.memtable_huge_page_size = FLAGS_memtable_use_huge_page ? 2048 : 0;
     options.memtable_prefix_bloom_size_ratio = FLAGS_memtable_bloom_size_ratio;
+    options.memtable_whole_key_filtering = FLAGS_memtable_whole_key_filtering;
     if (FLAGS_memtable_insert_with_hint_prefix_size > 0) {
       options.memtable_insert_with_hint_prefix_extractor.reset(
           NewCappedPrefixTransform(
@@ -3219,10 +3327,6 @@ void VerifyDBFromDB(std::string& truth_db_name) {
           new VectorRepFactory
         );
         break;
-      case kCuckoo:
-        options.memtable_factory.reset(NewHashCuckooRepFactory(
-            options.write_buffer_size, FLAGS_key_size + FLAGS_value_size));
-        break;
 #else
       default:
         fprintf(stderr, "Only skip list is supported in lite mode\n");
@@ -3390,6 +3494,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     options.level0_slowdown_writes_trigger =
       FLAGS_level0_slowdown_writes_trigger;
     options.compression = FLAGS_compression_type_e;
+    options.sample_for_compression = FLAGS_sample_for_compression;
     options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds;
     options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB;
     options.max_total_wal_size = FLAGS_max_total_wal_size;
@@ -3492,6 +3597,10 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     options.dump_malloc_stats = FLAGS_dump_malloc_stats;
     options.stats_dump_period_sec =
         static_cast<unsigned int>(FLAGS_stats_dump_period_sec);
+    options.stats_persist_period_sec =
+        static_cast<unsigned int>(FLAGS_stats_persist_period_sec);
+    options.stats_history_buffer_size =
+        static_cast<size_t>(FLAGS_stats_history_buffer_size);
 
     options.compression_opts.level = FLAGS_compression_level;
     options.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes;
@@ -3569,6 +3678,19 @@ void VerifyDBFromDB(std::string& truth_db_name) {
       options.compaction_filter = new KeepFilter();
       fprintf(stdout, "A noop compaction filter is used\n");
     }
+
+    if (FLAGS_use_existing_keys) {
+      // Only work on single database
+      assert(db_.db != nullptr);
+      ReadOptions read_opts;
+      read_opts.total_order_seek = true;
+      Iterator* iter = db_.db->NewIterator(read_opts);
+      for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+        keys_.emplace_back(iter->key().ToString());
+      }
+      delete iter;
+      FLAGS_num = keys_.size();
+    }
   }
 
   void Open(Options* opts) {
@@ -3876,9 +3998,13 @@ void VerifyDBFromDB(std::string& truth_db_name) {
         bytes += value_size_ + key_size_;
         ++num_written;
         if (writes_per_range_tombstone_ > 0 &&
-            num_written / writes_per_range_tombstone_ <=
+            num_written > writes_before_delete_range_ &&
+            (num_written - writes_before_delete_range_) /
+                    writes_per_range_tombstone_ <=
                 max_num_range_tombstones_ &&
-            num_written % writes_per_range_tombstone_ == 0) {
+            (num_written - writes_before_delete_range_) %
+                    writes_per_range_tombstone_ ==
+                0) {
           int64_t begin_num = key_gens[id]->Next();
           if (FLAGS_expand_range_tombstones) {
             for (int64_t offset = 0; offset < range_tombstone_width_;
@@ -4229,7 +4355,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
         }
         if (levelMeta.level == 0) {
           for (auto& fileMeta : levelMeta.files) {
-            fprintf(stdout, "Level[%d]: %s(size: %" PRIu64 " bytes)\n",
+            fprintf(stdout, "Level[%d]: %s(size: %" ROCKSDB_PRIszt " bytes)\n",
                     levelMeta.level, fileMeta.name.c_str(), fileMeta.size);
           }
         } else {
@@ -4509,6 +4635,255 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     thread->stats.AddMessage(msg);
   }
 
+  // THe reverse function of Pareto function
+  int64_t ParetoCdfInversion(double u, double theta, double k, double sigma) {
+    double ret;
+    if (k == 0.0) {
+      ret = theta - sigma * std::log(u);
+    } else {
+      ret = theta + sigma * (std::pow(u, -1 * k) - 1) / k;
+    }
+    return static_cast<int64_t>(ceil(ret));
+  }
+  // inversion of y=ax^b
+  int64_t PowerCdfInversion(double u, double a, double b) {
+    double ret;
+    ret = std::pow((u / a), (1 / b));
+    return static_cast<int64_t>(ceil(ret));
+  }
+
+  // Add the noice to the QPS
+  double AddNoise(double origin, double noise_ratio) {
+    if (noise_ratio < 0.0 || noise_ratio > 1.0) {
+      return origin;
+    }
+    int band_int = static_cast<int>(FLAGS_sine_a);
+    double delta = (rand() % band_int - band_int / 2) * noise_ratio;
+    if (origin + delta < 0) {
+      return origin;
+    } else {
+      return (origin + delta);
+    }
+  }
+
+  // decide the query type
+  // 0 Get, 1 Put, 2 Seek, 3 SeekForPrev, 4 Delete, 5 SingleDelete, 6 merge
+  class QueryDecider {
+   public:
+    std::vector<int> type_;
+    std::vector<double> ratio_;
+    int range_;
+
+    QueryDecider() {}
+    ~QueryDecider() {}
+
+    Status Initiate(std::vector<double> ratio_input) {
+      int range_max = 1000;
+      double sum = 0.0;
+      for (auto& ratio : ratio_input) {
+        sum += ratio;
+      }
+      range_ = 0;
+      for (auto& ratio : ratio_input) {
+        range_ += static_cast<int>(ceil(range_max * (ratio / sum)));
+        type_.push_back(range_);
+        ratio_.push_back(ratio / sum);
+      }
+      return Status::OK();
+    }
+
+    int GetType(int64_t rand_num) {
+      if (rand_num < 0) {
+        rand_num = rand_num * (-1);
+      }
+      assert(range_ != 0);
+      int pos = static_cast<int>(rand_num % range_);
+      for (int i = 0; i < static_cast<int>(type_.size()); i++) {
+        if (pos < type_[i]) {
+          return i;
+        }
+      }
+      return 0;
+    }
+  };
+
+  // The graph wokrload mixed with Get, Put, Iterator
+  void MixGraph(ThreadState* thread) {
+    int64_t read = 0;  // including single gets and Next of iterators
+    int64_t gets = 0;
+    int64_t puts = 0;
+    int64_t found = 0;
+    int64_t seek = 0;
+    int64_t seek_found = 0;
+    int64_t bytes = 0;
+    const int64_t default_value_max = 1 * 1024 * 1024;
+    int64_t value_max = default_value_max;
+    int64_t scan_len_max = FLAGS_mix_max_scan_len;
+    double write_rate = 1000000.0;
+    double read_rate = 1000000.0;
+    std::vector<double> ratio{FLAGS_mix_get_ratio, FLAGS_mix_put_ratio,
+                              FLAGS_mix_seek_ratio};
+    char value_buffer[default_value_max];
+    QueryDecider query;
+    RandomGenerator gen;
+    Status s;
+    if (value_max > FLAGS_mix_max_value_size) {
+      value_max = FLAGS_mix_max_value_size;
+    }
+
+    ReadOptions options(FLAGS_verify_checksum, true);
+    std::unique_ptr<const char[]> key_guard;
+    Slice key = AllocateKey(&key_guard);
+    PinnableSlice pinnable_val;
+    query.Initiate(ratio);
+
+    // the limit of qps initiation
+    if (FLAGS_sine_a != 0 || FLAGS_sine_d != 0) {
+      thread->shared->read_rate_limiter.reset(NewGenericRateLimiter(
+          read_rate, 100000 /* refill_period_us */, 10 /* fairness */,
+          RateLimiter::Mode::kReadsOnly));
+      thread->shared->write_rate_limiter.reset(
+          NewGenericRateLimiter(write_rate));
+    }
+
+    Duration duration(FLAGS_duration, reads_);
+    while (!duration.Done(1)) {
+      DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(thread);
+      int64_t rand_v, key_rand, key_seed;
+      rand_v = GetRandomKey(&thread->rand) % FLAGS_num;
+      double u = static_cast<double>(rand_v) / FLAGS_num;
+      key_seed = PowerCdfInversion(u, FLAGS_key_dist_a, FLAGS_key_dist_b);
+      Random64 rand(key_seed);
+      key_rand = static_cast<int64_t>(rand.Next()) % FLAGS_num;
+      GenerateKeyFromInt(key_rand, FLAGS_num, &key);
+      int query_type = query.GetType(rand_v);
+
+      // change the qps
+      uint64_t now = FLAGS_env->NowMicros();
+      uint64_t usecs_since_last;
+      if (now > thread->stats.GetSineInterval()) {
+        usecs_since_last = now - thread->stats.GetSineInterval();
+      } else {
+        usecs_since_last = 0;
+      }
+
+      if (usecs_since_last >
+          (FLAGS_sine_mix_rate_interval_milliseconds * uint64_t{1000})) {
+        double usecs_since_start =
+            static_cast<double>(now - thread->stats.GetStart());
+        thread->stats.ResetSineInterval();
+        double mix_rate_with_noise = AddNoise(
+            SineRate(usecs_since_start / 1000000.0), FLAGS_sine_mix_rate_noise);
+        read_rate = mix_rate_with_noise * (query.ratio_[0] + query.ratio_[2]);
+        write_rate =
+            mix_rate_with_noise * query.ratio_[1] * FLAGS_mix_ave_kv_size;
+
+        thread->shared->write_rate_limiter.reset(
+            NewGenericRateLimiter(write_rate));
+        thread->shared->read_rate_limiter.reset(NewGenericRateLimiter(
+            read_rate,
+            FLAGS_sine_mix_rate_interval_milliseconds * uint64_t{1000}, 10,
+            RateLimiter::Mode::kReadsOnly));
+      }
+      // Start the query
+      if (query_type == 0) {
+        // the Get query
+        gets++;
+        read++;
+        if (FLAGS_num_column_families > 1) {
+          s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key,
+                                   &pinnable_val);
+        } else {
+          pinnable_val.Reset();
+          s = db_with_cfh->db->Get(options,
+                                   db_with_cfh->db->DefaultColumnFamily(), key,
+                                   &pinnable_val);
+        }
+
+        if (s.ok()) {
+          found++;
+          bytes += key.size() + pinnable_val.size();
+        } else if (!s.IsNotFound()) {
+          fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());
+          abort();
+        }
+
+        if (thread->shared->read_rate_limiter.get() != nullptr &&
+            read % 256 == 255) {
+          thread->shared->read_rate_limiter->Request(
+              256, Env::IO_HIGH, nullptr /* stats */,
+              RateLimiter::OpType::kRead);
+        }
+        thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kRead);
+      } else if (query_type == 1) {
+        // the Put query
+        puts++;
+        int64_t value_size = ParetoCdfInversion(
+            u, FLAGS_value_theta, FLAGS_value_k, FLAGS_value_sigma);
+        if (value_size < 0) {
+          value_size = 10;
+        } else if (value_size > value_max) {
+          value_size = value_size % value_max;
+        }
+        s = db_with_cfh->db->Put(
+            write_options_, key,
+            gen.Generate(static_cast<unsigned int>(value_size)));
+        if (!s.ok()) {
+          fprintf(stderr, "put error: %s\n", s.ToString().c_str());
+          exit(1);
+        }
+
+        if (thread->shared->write_rate_limiter) {
+          thread->shared->write_rate_limiter->Request(
+              key.size() + value_size, Env::IO_HIGH, nullptr /*stats*/,
+              RateLimiter::OpType::kWrite);
+        }
+        thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kWrite);
+      } else if (query_type == 2) {
+        // Seek query
+        if (db_with_cfh->db != nullptr) {
+          Iterator* single_iter = nullptr;
+          single_iter = db_with_cfh->db->NewIterator(options);
+          if (single_iter != nullptr) {
+            single_iter->Seek(key);
+            seek++;
+            read++;
+            if (single_iter->Valid() && single_iter->key().compare(key) == 0) {
+              seek_found++;
+            }
+            int64_t scan_length =
+                ParetoCdfInversion(u, FLAGS_iter_theta, FLAGS_iter_k,
+                                   FLAGS_iter_sigma) %
+                scan_len_max;
+            for (int64_t j = 0; j < scan_length && single_iter->Valid(); j++) {
+              Slice value = single_iter->value();
+              memcpy(value_buffer, value.data(),
+                     std::min(value.size(), sizeof(value_buffer)));
+              bytes += single_iter->key().size() + single_iter->value().size();
+              single_iter->Next();
+              assert(single_iter->status().ok());
+            }
+          }
+          delete single_iter;
+        }
+        thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kSeek);
+      }
+    }
+    char msg[256];
+    snprintf(msg, sizeof(msg),
+             "( Gets:%" PRIu64 " Puts:%" PRIu64 " Seek:%" PRIu64 " of %" PRIu64
+             " in %" PRIu64 " found)\n",
+             gets, puts, seek, found, read);
+
+    thread->stats.AddBytes(bytes);
+    thread->stats.AddMessage(msg);
+
+    if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
+      thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
+                               get_perf_context()->ToString());
+    }
+  }
+
   void IteratorCreation(ThreadState* thread) {
     Duration duration(FLAGS_duration, reads_);
     ReadOptions options(FLAGS_verify_checksum, true);
@@ -4548,9 +4923,31 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     std::unique_ptr<const char[]> key_guard;
     Slice key = AllocateKey(&key_guard);
 
+    std::unique_ptr<const char[]> upper_bound_key_guard;
+    Slice upper_bound = AllocateKey(&upper_bound_key_guard);
+    std::unique_ptr<const char[]> lower_bound_key_guard;
+    Slice lower_bound = AllocateKey(&lower_bound_key_guard);
+
     Duration duration(FLAGS_duration, reads_);
     char value_buffer[256];
     while (!duration.Done(1)) {
+      int64_t seek_pos = thread->rand.Next() % FLAGS_num;
+      GenerateKeyFromInt((uint64_t)seek_pos, FLAGS_num, &key);
+      if (FLAGS_max_scan_distance != 0) {
+        if (FLAGS_reverse_iterator) {
+          GenerateKeyFromInt(
+              static_cast<uint64_t>(std::max(
+                  static_cast<int64_t>(0), seek_pos - FLAGS_max_scan_distance)),
+              FLAGS_num, &lower_bound);
+          options.iterate_lower_bound = &lower_bound;
+        } else {
+          GenerateKeyFromInt(
+              (uint64_t)std::min(FLAGS_num, seek_pos + FLAGS_max_scan_distance),
+              FLAGS_num, &upper_bound);
+          options.iterate_upper_bound = &upper_bound;
+        }
+      }
+
       if (!FLAGS_use_tailing_iterator) {
         if (db_.db != nullptr) {
           delete single_iter;
@@ -4571,7 +4968,6 @@ void VerifyDBFromDB(std::string& truth_db_name) {
         iter_to_use = multi_iters[thread->rand.Next() % multi_iters.size()];
       }
 
-      GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
       iter_to_use->Seek(key);
       read++;
       if (iter_to_use->Valid() && iter_to_use->key().compare(key) == 0) {
@@ -5668,7 +6064,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
 
   void Replay(ThreadState* /*thread*/, DBWithColumnFamilies* db_with_cfh) {
     Status s;
-    unique_ptr<TraceReader> trace_reader;
+    std::unique_ptr<TraceReader> trace_reader;
     s = NewFileTraceReader(FLAGS_env, EnvOptions(), FLAGS_trace_file,
                            &trace_reader);
     if (!s.ok()) {
@@ -5723,6 +6119,9 @@ int db_bench_tool(int argc, char** argv) {
   if (FLAGS_statistics) {
     dbstats = rocksdb::CreateDBStatistics();
   }
+  if (dbstats) {
+    dbstats->set_stats_level(static_cast<StatsLevel>(FLAGS_stats_level));
+  }
   FLAGS_compaction_pri_e = (rocksdb::CompactionPri)FLAGS_compaction_pri;
 
   std::vector<std::string> fanout = rocksdb::StringSplit(
@@ -5752,6 +6151,13 @@ int db_bench_tool(int argc, char** argv) {
     }
   }
 #endif  // ROCKSDB_LITE
+  if (FLAGS_use_existing_keys && !FLAGS_use_existing_db) {
+    fprintf(stderr,
+            "`-use_existing_db` must be true for `-use_existing_keys` to be "
+            "settable\n");
+    exit(1);
+  }
+
   if (!FLAGS_hdfs.empty()) {
     FLAGS_env  = new rocksdb::HdfsEnv(FLAGS_hdfs);
   }
@@ -5796,6 +6202,15 @@ int db_bench_tool(int argc, char** argv) {
 
   rocksdb::Benchmark benchmark;
   benchmark.Run();
+
+#ifndef ROCKSDB_LITE
+  if (FLAGS_print_malloc_stats) {
+    std::string stats_string;
+    rocksdb::DumpMallocStats(&stats_string);
+    fprintf(stdout, "Malloc stats:\n%s\n", stats_string.c_str());
+  }
+#endif  // ROCKSDB_LITE
+
   return 0;
 }
 }  // namespace rocksdb