]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/tools/db_stress.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / tools / db_stress.cc
index 45a7c9a0d0a0c815380e322cc239fb477c237574..7f8c4b53f7bd43197c111127d2f8ba107848f333 100644 (file)
@@ -100,6 +100,8 @@ DEFINE_uint64(seed, 2341234, "Seed for PRNG");
 static const bool FLAGS_seed_dummy __attribute__((__unused__)) =
     RegisterFlagValidator(&FLAGS_seed, &ValidateUint32Range);
 
+DEFINE_bool(read_only, false, "True if open DB in read-only mode during tests");
+
 DEFINE_int64(max_key, 1 * KB* KB,
              "Max number of key/values to place in database");
 
@@ -133,6 +135,13 @@ DEFINE_bool(test_batches_snapshots, false,
             "\t(b) No long validation at the end (more speed up)\n"
             "\t(c) Test snapshot and atomicity of batch writes");
 
+DEFINE_bool(atomic_flush, false,
+            "If set, enables atomic flush in the options.\n");
+
+DEFINE_bool(test_atomic_flush, false,
+            "If set, runs the stress test dedicated to verifying atomic flush "
+            "functionality. Setting this implies `atomic_flush=true`.\n");
+
 DEFINE_int32(threads, 32, "Number of concurrent threads to run.");
 
 DEFINE_int32(ttl, -1,
@@ -201,6 +210,10 @@ DEFINE_double(memtable_prefix_bloom_size_ratio,
               "creates prefix blooms for memtables, each with size "
               "`write_buffer_size * memtable_prefix_bloom_size_ratio`.");
 
+DEFINE_bool(memtable_whole_key_filtering,
+            rocksdb::Options().memtable_whole_key_filtering,
+            "Enable whole key filtering in memtables.");
+
 DEFINE_int32(open_files, rocksdb::Options().max_open_files,
              "Maximum number of files to keep open at the same time "
              "(use default if == 0)");
@@ -366,6 +379,9 @@ extern std::vector<std::string> rocksdb_kill_prefix_blacklist;
 
 DEFINE_bool(disable_wal, false, "If true, do not write WAL for write.");
 
+DEFINE_uint64(recycle_log_file_num, rocksdb::Options().recycle_log_file_num,
+              "Number of old WAL files to keep around for later recycling");
+
 DEFINE_int64(target_file_size_base, rocksdb::Options().target_file_size_base,
              "Target level-1 file size for compaction");
 
@@ -790,46 +806,36 @@ class Stats {
     }
   }
 
-  void AddBytesForWrites(int nwrites, size_t nbytes) {
+  void AddBytesForWrites(long nwrites, size_t nbytes) {
     writes_ += nwrites;
     bytes_ += nbytes;
   }
 
-  void AddGets(int ngets, int nfounds) {
+  void AddGets(long ngets, long nfounds) {
     founds_ += nfounds;
     gets_ += ngets;
   }
 
-  void AddPrefixes(int nprefixes, int count) {
+  void AddPrefixes(long nprefixes, long count) {
     prefixes_ += nprefixes;
     iterator_size_sums_ += count;
   }
 
-  void AddIterations(int n) {
-    iterations_ += n;
-  }
+  void AddIterations(long n) { iterations_ += n; }
 
-  void AddDeletes(int n) {
-    deletes_ += n;
-  }
+  void AddDeletes(long n) { deletes_ += n; }
 
   void AddSingleDeletes(size_t n) { single_deletes_ += n; }
 
-  void AddRangeDeletions(int n) {
-    range_deletions_ += n;
-  }
+  void AddRangeDeletions(long n) { range_deletions_ += n; }
 
-  void AddCoveredByRangeDeletions(int n) {
-    covered_by_range_deletions_ += n;
-  }
+  void AddCoveredByRangeDeletions(long n) { covered_by_range_deletions_ += n; }
 
-  void AddErrors(int n) {
-    errors_ += n;
-  }
+  void AddErrors(long n) { errors_ += n; }
 
-  void AddNumCompactFilesSucceed(int n) { num_compact_files_succeed_ += n; }
+  void AddNumCompactFilesSucceed(long n) { num_compact_files_succeed_ += n; }
 
-  void AddNumCompactFilesFailed(int n) { num_compact_files_failed_ += n; }
+  void AddNumCompactFilesFailed(long n) { num_compact_files_failed_ += n; }
 
   void Report(const char* name) {
     std::string extra;
@@ -948,7 +954,7 @@ class SharedState {
       if (status.ok()) {
         status = FLAGS_env->GetFileSize(FLAGS_expected_values_path, &size);
       }
-      unique_ptr<WritableFile> wfile;
+      std::unique_ptr<WritableFile> wfile;
       if (status.ok() && size == 0) {
         const EnvOptions soptions;
         status = FLAGS_env->NewWritableFile(FLAGS_expected_values_path, &wfile,
@@ -1289,7 +1295,8 @@ class DbStressListener : public EventListener {
     }
     assert(info.job_id > 0 || FLAGS_compact_files_one_in > 0);
     if (info.status.ok() && info.file_size > 0) {
-      assert(info.table_properties.data_size > 0);
+      assert(info.table_properties.data_size > 0 ||
+             info.table_properties.num_range_deletions > 0);
       assert(info.table_properties.raw_key_size > 0);
       assert(info.table_properties.num_entries > 0);
     }
@@ -1386,7 +1393,8 @@ class StressTest {
         txn_db_(nullptr),
 #endif
         new_column_family_name_(1),
-        num_times_reopened_(0) {
+        num_times_reopened_(0),
+        db_preload_finished_(false) {
     if (FLAGS_destroy_db_initially) {
       std::vector<std::string> files;
       FLAGS_env->GetChildren(FLAGS_db, &files);
@@ -1395,7 +1403,14 @@ class StressTest {
           FLAGS_env->DeleteFile(FLAGS_db + "/" + files[i]);
         }
       }
-      DestroyDB(FLAGS_db, Options());
+      Options options;
+      options.env = FLAGS_env;
+      Status s = DestroyDB(FLAGS_db, options);
+      if (!s.ok()) {
+        fprintf(stderr, "Cannot destroy original db: %s\n",
+                s.ToString().c_str());
+        exit(1);
+      }
     }
   }
 
@@ -1513,6 +1528,13 @@ class StressTest {
     Open();
     BuildOptionsTable();
     SharedState shared(this);
+
+    if (FLAGS_read_only) {
+      now = FLAGS_env->NowMicros();
+      fprintf(stdout, "%s Preloading db with %" PRIu64 " KVs\n",
+              FLAGS_env->TimeToString(now / 1000000).c_str(), FLAGS_max_key);
+      PreloadDbAndReopenAsReadOnly(FLAGS_max_key, &shared);
+    }
     uint32_t n = shared.GetNumThreads();
 
     now = FLAGS_env->NowMicros();
@@ -1743,6 +1765,9 @@ class StressTest {
       }
     }
     if (snap_state.key_vec != nullptr) {
+      // When `prefix_extractor` is set, seeking to beginning and scanning
+      // across prefixes are only supported with `total_order_seek` set.
+      ropt.total_order_seek = true;
       std::unique_ptr<Iterator> iterator(db->NewIterator(ropt));
       std::unique_ptr<std::vector<bool>> tmp_bitvec(new std::vector<bool>(FLAGS_max_key));
       for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
@@ -1760,6 +1785,93 @@ class StressTest {
     return Status::OK();
   }
 
+  // Currently PreloadDb has to be single-threaded.
+  void PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
+                                    SharedState* shared) {
+    WriteOptions write_opts;
+    write_opts.disableWAL = FLAGS_disable_wal;
+    if (FLAGS_sync) {
+      write_opts.sync = true;
+    }
+    char value[100];
+    int cf_idx = 0;
+    Status s;
+    for (auto cfh : column_families_) {
+      for (int64_t k = 0; k != number_of_keys; ++k) {
+        std::string key_str = Key(k);
+        Slice key = key_str;
+        size_t sz = GenerateValue(0 /*value_base*/, value, sizeof(value));
+        Slice v(value, sz);
+        shared->Put(cf_idx, k, 0, true /* pending */);
+
+        if (FLAGS_use_merge) {
+          if (!FLAGS_use_txn) {
+            s = db_->Merge(write_opts, cfh, key, v);
+          } else {
+#ifndef ROCKSDB_LITE
+            Transaction* txn;
+            s = NewTxn(write_opts, &txn);
+            if (s.ok()) {
+              s = txn->Merge(cfh, key, v);
+              if (s.ok()) {
+                s = CommitTxn(txn);
+              }
+            }
+#endif
+          }
+        } else {
+          if (!FLAGS_use_txn) {
+            s = db_->Put(write_opts, cfh, key, v);
+          } else {
+#ifndef ROCKSDB_LITE
+            Transaction* txn;
+            s = NewTxn(write_opts, &txn);
+            if (s.ok()) {
+              s = txn->Put(cfh, key, v);
+              if (s.ok()) {
+                s = CommitTxn(txn);
+              }
+            }
+#endif
+          }
+        }
+
+        shared->Put(cf_idx, k, 0, false /* pending */);
+        if (!s.ok()) {
+          break;
+        }
+      }
+      if (!s.ok()) {
+        break;
+      }
+      ++cf_idx;
+    }
+    if (s.ok()) {
+      s = db_->Flush(FlushOptions(), column_families_);
+    }
+    if (s.ok()) {
+      for (auto cf : column_families_) {
+        delete cf;
+      }
+      column_families_.clear();
+      delete db_;
+      db_ = nullptr;
+#ifndef ROCKSDB_LITE
+      txn_db_ = nullptr;
+#endif
+
+      db_preload_finished_.store(true);
+      auto now = FLAGS_env->NowMicros();
+      fprintf(stdout, "%s Reopening database in read-only\n",
+              FLAGS_env->TimeToString(now / 1000000).c_str());
+      // Reopen as read-only, can ignore all options related to updates
+      Open();
+    } else {
+      fprintf(stderr, "Failed to preload db");
+      exit(1);
+    }
+  }
+
   Status SetOptions(ThreadState* thread) {
     assert(FLAGS_set_options_one_in > 0);
     std::unordered_map<std::string, std::string> opts;
@@ -1847,8 +1959,7 @@ class StressTest {
           if (thread->shared->AllVotedReopen()) {
             thread->shared->GetStressTest()->Reopen();
             thread->shared->GetCondVar()->SignalAll();
-          }
-          else {
+          } else {
             thread->shared->GetCondVar()->Wait();
           }
           // Commenting this out as we don't want to reset stats on each open.
@@ -1870,49 +1981,6 @@ class StressTest {
       MaybeClearOneColumnFamily(thread);
 
 #ifndef ROCKSDB_LITE
-      if (FLAGS_checkpoint_one_in > 0 &&
-          thread->rand.Uniform(FLAGS_checkpoint_one_in) == 0) {
-        std::string checkpoint_dir =
-            FLAGS_db + "/.checkpoint" + ToString(thread->tid);
-        DestroyDB(checkpoint_dir, Options());
-        Checkpoint* checkpoint;
-        Status s = Checkpoint::Create(db_, &checkpoint);
-        if (s.ok()) {
-          s = checkpoint->CreateCheckpoint(checkpoint_dir);
-        }
-        std::vector<std::string> files;
-        if (s.ok()) {
-          s = FLAGS_env->GetChildren(checkpoint_dir, &files);
-        }
-        DestroyDB(checkpoint_dir, Options());
-        delete checkpoint;
-        if (!s.ok()) {
-          printf("A checkpoint operation failed with: %s\n",
-                 s.ToString().c_str());
-        }
-      }
-
-      if (FLAGS_backup_one_in > 0 &&
-          thread->rand.Uniform(FLAGS_backup_one_in) == 0) {
-        std::string backup_dir = FLAGS_db + "/.backup" + ToString(thread->tid);
-        BackupableDBOptions backup_opts(backup_dir);
-        BackupEngine* backup_engine = nullptr;
-        Status s = BackupEngine::Open(FLAGS_env, backup_opts, &backup_engine);
-        if (s.ok()) {
-          s = backup_engine->CreateNewBackup(db_);
-        }
-        if (s.ok()) {
-          s = backup_engine->PurgeOldBackups(0 /* num_backups_to_keep */);
-        }
-        if (!s.ok()) {
-          printf("A BackupEngine operation failed with: %s\n",
-                 s.ToString().c_str());
-        }
-        if (backup_engine != nullptr) {
-          delete backup_engine;
-        }
-      }
-
       if (FLAGS_compact_files_one_in > 0 &&
           thread->rand.Uniform(FLAGS_compact_files_one_in) == 0) {
         auto* random_cf =
@@ -1975,15 +2043,6 @@ class StressTest {
 
       auto column_family = column_families_[rand_column_family];
 
-      if (FLAGS_flush_one_in > 0 &&
-          thread->rand.Uniform(FLAGS_flush_one_in) == 0) {
-        FlushOptions flush_opts;
-        Status status = db_->Flush(flush_opts, column_family);
-        if (!status.ok()) {
-          fprintf(stdout, "Unable to perform Flush(): %s\n", status.ToString().c_str());
-        }
-      }
-
       if (FLAGS_compact_range_one_in > 0 &&
           thread->rand.Uniform(FLAGS_compact_range_one_in) == 0) {
         int64_t end_key_num;
@@ -2007,6 +2066,21 @@ class StressTest {
 
       std::vector<int> rand_column_families =
           GenerateColumnFamilies(FLAGS_column_families, rand_column_family);
+
+      if (FLAGS_flush_one_in > 0 &&
+          thread->rand.Uniform(FLAGS_flush_one_in) == 0) {
+        FlushOptions flush_opts;
+        std::vector<ColumnFamilyHandle*> cfhs;
+        std::for_each(
+            rand_column_families.begin(), rand_column_families.end(),
+            [this, &cfhs](int k) { cfhs.push_back(column_families_[k]); });
+        Status status = db_->Flush(flush_opts, cfhs);
+        if (!status.ok()) {
+          fprintf(stdout, "Unable to perform Flush(): %s\n",
+                  status.ToString().c_str());
+        }
+      }
+
       std::vector<int64_t> rand_keys = GenerateKeys(rand_key);
 
       if (FLAGS_ingest_external_file_one_in > 0 &&
@@ -2014,6 +2088,23 @@ class StressTest {
         TestIngestExternalFile(thread, rand_column_families, rand_keys, lock);
       }
 
+      if (FLAGS_backup_one_in > 0 &&
+          thread->rand.Uniform(FLAGS_backup_one_in) == 0) {
+        Status s = TestBackupRestore(thread, rand_column_families, rand_keys);
+        if (!s.ok()) {
+          VerificationAbort(shared, "Backup/restore gave inconsistent state",
+                            s);
+        }
+      }
+
+      if (FLAGS_checkpoint_one_in > 0 &&
+          thread->rand.Uniform(FLAGS_checkpoint_one_in) == 0) {
+        Status s = TestCheckpoint(thread, rand_column_families, rand_keys);
+        if (!s.ok()) {
+          VerificationAbort(shared, "Checkpoint gave inconsistent state", s);
+        }
+      }
+
       if (FLAGS_acquire_snapshot_one_in > 0 &&
           thread->rand.Uniform(FLAGS_acquire_snapshot_one_in) == 0) {
         auto snapshot = db_->GetSnapshot();
@@ -2029,6 +2120,9 @@ class StressTest {
         if (FLAGS_compare_full_db_state_snapshot &&
             (thread->tid == 0)) {
           key_vec = new std::vector<bool>(FLAGS_max_key);
+          // When `prefix_extractor` is set, seeking to beginning and scanning
+          // across prefixes are only supported with `total_order_seek` set.
+          ropt.total_order_seek = true;
           std::unique_ptr<Iterator> iterator(db_->NewIterator(ropt));
           for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
             uint64_t key_val;
@@ -2199,6 +2293,190 @@ class StressTest {
     return s;
   }
 
+#ifdef ROCKSDB_LITE
+  virtual Status TestBackupRestore(
+      ThreadState* /* thread */,
+      const std::vector<int>& /* rand_column_families */,
+      const std::vector<int64_t>& /* rand_keys */) {
+    assert(false);
+    fprintf(stderr,
+            "RocksDB lite does not support "
+            "TestBackupRestore\n");
+    std::terminate();
+  }
+
+  virtual Status TestCheckpoint(
+      ThreadState* /* thread */,
+      const std::vector<int>& /* rand_column_families */,
+      const std::vector<int64_t>& /* rand_keys */) {
+    assert(false);
+    fprintf(stderr,
+            "RocksDB lite does not support "
+            "TestCheckpoint\n");
+    std::terminate();
+  }
+#else  // ROCKSDB_LITE
+  virtual Status TestBackupRestore(ThreadState* thread,
+                                   const std::vector<int>& rand_column_families,
+                                   const std::vector<int64_t>& rand_keys) {
+    // Note the column families chosen by `rand_column_families` cannot be
+    // dropped while the locks for `rand_keys` are held. So we should not have
+    // to worry about accessing those column families throughout this function.
+    assert(rand_column_families.size() == rand_keys.size());
+    std::string backup_dir = FLAGS_db + "/.backup" + ToString(thread->tid);
+    std::string restore_dir = FLAGS_db + "/.restore" + ToString(thread->tid);
+    BackupableDBOptions backup_opts(backup_dir);
+    BackupEngine* backup_engine = nullptr;
+    Status s = BackupEngine::Open(FLAGS_env, backup_opts, &backup_engine);
+    if (s.ok()) {
+      s = backup_engine->CreateNewBackup(db_);
+    }
+    if (s.ok()) {
+      delete backup_engine;
+      backup_engine = nullptr;
+      s = BackupEngine::Open(FLAGS_env, backup_opts, &backup_engine);
+    }
+    if (s.ok()) {
+      s = backup_engine->RestoreDBFromLatestBackup(restore_dir /* db_dir */,
+                                                   restore_dir /* wal_dir */);
+    }
+    if (s.ok()) {
+      s = backup_engine->PurgeOldBackups(0 /* num_backups_to_keep */);
+    }
+    DB* restored_db = nullptr;
+    std::vector<ColumnFamilyHandle*> restored_cf_handles;
+    if (s.ok()) {
+      Options restore_options(options_);
+      restore_options.listeners.clear();
+      std::vector<ColumnFamilyDescriptor> cf_descriptors;
+      // TODO(ajkr): `column_family_names_` is not safe to access here when
+      // `clear_column_family_one_in != 0`. But we can't easily switch to
+      // `ListColumnFamilies` to get names because it won't necessarily give
+      // the same order as `column_family_names_`.
+      assert(FLAGS_clear_column_family_one_in == 0);
+      for (auto name : column_family_names_) {
+        cf_descriptors.emplace_back(name, ColumnFamilyOptions(restore_options));
+      }
+      s = DB::Open(DBOptions(restore_options), restore_dir, cf_descriptors,
+                   &restored_cf_handles, &restored_db);
+    }
+    // for simplicity, currently only verifies existence/non-existence of a few
+    // keys
+    for (size_t i = 0; s.ok() && i < rand_column_families.size(); ++i) {
+      std::string key_str = Key(rand_keys[i]);
+      Slice key = key_str;
+      std::string restored_value;
+      Status get_status = restored_db->Get(
+          ReadOptions(), restored_cf_handles[rand_column_families[i]], key,
+          &restored_value);
+      bool exists =
+          thread->shared->Exists(rand_column_families[i], rand_keys[i]);
+      if (get_status.ok()) {
+        if (!exists) {
+          s = Status::Corruption(
+              "key exists in restore but not in original db");
+        }
+      } else if (get_status.IsNotFound()) {
+        if (exists) {
+          s = Status::Corruption(
+              "key exists in original db but not in restore");
+        }
+      } else {
+        s = get_status;
+      }
+    }
+    if (backup_engine != nullptr) {
+      delete backup_engine;
+      backup_engine = nullptr;
+    }
+    if (restored_db != nullptr) {
+      for (auto* cf_handle : restored_cf_handles) {
+        restored_db->DestroyColumnFamilyHandle(cf_handle);
+      }
+      delete restored_db;
+      restored_db = nullptr;
+    }
+    if (!s.ok()) {
+      printf("A backup/restore operation failed with: %s\n",
+             s.ToString().c_str());
+    }
+    return s;
+  }
+
+  virtual Status TestCheckpoint(ThreadState* thread,
+                                const std::vector<int>& rand_column_families,
+                                const std::vector<int64_t>& rand_keys) {
+    // Note the column families chosen by `rand_column_families` cannot be
+    // dropped while the locks for `rand_keys` are held. So we should not have
+    // to worry about accessing those column families throughout this function.
+    assert(rand_column_families.size() == rand_keys.size());
+    std::string checkpoint_dir =
+        FLAGS_db + "/.checkpoint" + ToString(thread->tid);
+    DestroyDB(checkpoint_dir, Options());
+    Checkpoint* checkpoint = nullptr;
+    Status s = Checkpoint::Create(db_, &checkpoint);
+    if (s.ok()) {
+      s = checkpoint->CreateCheckpoint(checkpoint_dir);
+    }
+    std::vector<ColumnFamilyHandle*> cf_handles;
+    DB* checkpoint_db = nullptr;
+    if (s.ok()) {
+      delete checkpoint;
+      checkpoint = nullptr;
+      Options options(options_);
+      options.listeners.clear();
+      std::vector<ColumnFamilyDescriptor> cf_descs;
+      // TODO(ajkr): `column_family_names_` is not safe to access here when
+      // `clear_column_family_one_in != 0`. But we can't easily switch to
+      // `ListColumnFamilies` to get names because it won't necessarily give
+      // the same order as `column_family_names_`.
+      if (FLAGS_clear_column_family_one_in == 0) {
+        for (const auto& name : column_family_names_) {
+          cf_descs.emplace_back(name, ColumnFamilyOptions(options));
+        }
+        s = DB::OpenForReadOnly(DBOptions(options), checkpoint_dir, cf_descs,
+                                &cf_handles, &checkpoint_db);
+      }
+    }
+    if (checkpoint_db != nullptr) {
+      for (size_t i = 0; s.ok() && i < rand_column_families.size(); ++i) {
+        std::string key_str = Key(rand_keys[i]);
+        Slice key = key_str;
+        std::string value;
+        Status get_status = checkpoint_db->Get(
+            ReadOptions(), cf_handles[rand_column_families[i]], key, &value);
+        bool exists =
+            thread->shared->Exists(rand_column_families[i], rand_keys[i]);
+        if (get_status.ok()) {
+          if (!exists) {
+            s = Status::Corruption(
+                "key exists in checkpoint but not in original db");
+          }
+        } else if (get_status.IsNotFound()) {
+          if (exists) {
+            s = Status::Corruption(
+                "key exists in original db but not in checkpoint");
+          }
+        } else {
+          s = get_status;
+        }
+      }
+      for (auto cfh : cf_handles) {
+        delete cfh;
+      }
+      cf_handles.clear();
+      delete checkpoint_db;
+      checkpoint_db = nullptr;
+    }
+    DestroyDB(checkpoint_dir, Options());
+    if (!s.ok()) {
+      fprintf(stderr, "A checkpoint operation failed with: %s\n",
+              s.ToString().c_str());
+    }
+    return s;
+  }
+#endif  // ROCKSDB_LITE
+
   void VerificationAbort(SharedState* shared, std::string msg, Status s) const {
     printf("Verification failed: %s. Status is %s\n", msg.c_str(),
            s.ToString().c_str());
@@ -2218,6 +2496,10 @@ class StressTest {
     fprintf(stdout, "Format version            : %d\n", FLAGS_format_version);
     fprintf(stdout, "TransactionDB             : %s\n",
             FLAGS_use_txn ? "true" : "false");
+    fprintf(stdout, "Read only mode            : %s\n",
+            FLAGS_read_only ? "true" : "false");
+    fprintf(stdout, "Atomic flush              : %s\n",
+            FLAGS_atomic_flush ? "true" : "false");
     fprintf(stdout, "Column families           : %d\n", FLAGS_column_families);
     if (!FLAGS_test_batches_snapshots) {
       fprintf(stdout, "Clear CFs one in          : %d\n",
@@ -2315,6 +2597,8 @@ class StressTest {
           FLAGS_max_write_buffer_number_to_maintain;
       options_.memtable_prefix_bloom_size_ratio =
           FLAGS_memtable_prefix_bloom_size_ratio;
+      options_.memtable_whole_key_filtering =
+          FLAGS_memtable_whole_key_filtering;
       options_.max_background_compactions = FLAGS_max_background_compactions;
       options_.max_background_flushes = FLAGS_max_background_flushes;
       options_.compaction_style =
@@ -2331,6 +2615,8 @@ class StressTest {
       options_.use_direct_reads = FLAGS_use_direct_reads;
       options_.use_direct_io_for_flush_and_compaction =
           FLAGS_use_direct_io_for_flush_and_compaction;
+      options_.recycle_log_file_num =
+          static_cast<size_t>(FLAGS_recycle_log_file_num);
       options_.target_file_size_base = FLAGS_target_file_size_base;
       options_.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
       options_.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
@@ -2363,6 +2649,7 @@ class StressTest {
           FLAGS_universal_max_merge_width;
       options_.compaction_options_universal.max_size_amplification_percent =
           FLAGS_universal_max_size_amplification_percent;
+      options_.atomic_flush = FLAGS_atomic_flush;
     } else {
 #ifdef ROCKSDB_LITE
       fprintf(stderr, "--options_file not supported in lite mode\n");
@@ -2484,8 +2771,13 @@ class StressTest {
           new DbStressListener(FLAGS_db, options_.db_paths, cf_descriptors));
       options_.create_missing_column_families = true;
       if (!FLAGS_use_txn) {
-        s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
-                     &column_families_, &db_);
+        if (db_preload_finished_.load() && FLAGS_read_only) {
+          s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db, cf_descriptors,
+                                  &column_families_, &db_);
+        } else {
+          s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
+                       &column_families_, &db_);
+        }
       } else {
 #ifndef ROCKSDB_LITE
         TransactionDBOptions txn_db_options;
@@ -2570,6 +2862,7 @@ class StressTest {
   int num_times_reopened_;
   std::unordered_map<std::string, std::vector<std::string>> options_table_;
   std::vector<std::string> options_index_;
+  std::atomic<bool> db_preload_finished_;
 };
 
 class NonBatchedOpsStressTest : public StressTest {
@@ -2594,7 +2887,7 @@ class NonBatchedOpsStressTest : public StressTest {
       }
       if (!thread->rand.OneIn(2)) {
         // Use iterator to verify this range
-        unique_ptr<Iterator> iter(
+        std::unique_ptr<Iterator> iter(
             db_->NewIterator(options, column_families_[cf]));
         iter->Seek(Key(start));
         for (auto i = start; i < end; i++) {
@@ -2733,16 +3026,15 @@ class NonBatchedOpsStressTest : public StressTest {
     }
 
     Iterator* iter = db_->NewIterator(ro_copy, cfh);
-    int64_t count = 0;
+    long count = 0;
     for (iter->Seek(prefix);
         iter->Valid() && iter->key().starts_with(prefix); iter->Next()) {
       ++count;
     }
-    assert(count <=
-        (static_cast<int64_t>(1) << ((8 - FLAGS_prefix_size) * 8)));
+    assert(count <= (static_cast<long>(1) << ((8 - FLAGS_prefix_size) * 8)));
     Status s = iter->status();
     if (iter->status().ok()) {
-      thread->stats.AddPrefixes(1, static_cast<int>(count));
+      thread->stats.AddPrefixes(1, count);
     } else {
       thread->stats.AddErrors(1);
     }
@@ -3272,7 +3564,7 @@ class BatchedOpsStressTest : public StressTest {
       iters[i]->Seek(prefix_slices[i]);
     }
 
-    int count = 0;
+    long count = 0;
     while (iters[0]->Valid() && iters[0]->key().starts_with(prefix_slices[0])) {
       count++;
       std::string values[10];
@@ -3327,6 +3619,334 @@ class BatchedOpsStressTest : public StressTest {
   virtual void VerifyDb(ThreadState* /* thread */) const {}
 };
 
+class AtomicFlushStressTest : public StressTest {
+ public:
+  AtomicFlushStressTest() : batch_id_(0) {}
+
+  virtual ~AtomicFlushStressTest() {}
+
+  virtual Status TestPut(ThreadState* thread, WriteOptions& write_opts,
+                         const ReadOptions& /* read_opts */,
+                         const std::vector<int>& rand_column_families,
+                         const std::vector<int64_t>& rand_keys,
+                         char (&value)[100],
+                         std::unique_ptr<MutexLock>& /* lock */) {
+    std::string key_str = Key(rand_keys[0]);
+    Slice key = key_str;
+    uint64_t value_base = batch_id_.fetch_add(1);
+    size_t sz =
+        GenerateValue(static_cast<uint32_t>(value_base), value, sizeof(value));
+    Slice v(value, sz);
+    WriteBatch batch;
+    for (auto cf : rand_column_families) {
+      ColumnFamilyHandle* cfh = column_families_[cf];
+      if (FLAGS_use_merge) {
+        batch.Merge(cfh, key, v);
+      } else { /* !FLAGS_use_merge */
+        batch.Put(cfh, key, v);
+      }
+    }
+    Status s = db_->Write(write_opts, &batch);
+    if (!s.ok()) {
+      fprintf(stderr, "multi put or merge error: %s\n", s.ToString().c_str());
+      thread->stats.AddErrors(1);
+    } else {
+      auto num = static_cast<long>(rand_column_families.size());
+      thread->stats.AddBytesForWrites(num, (sz + 1) * num);
+    }
+
+    return s;
+  }
+
+  virtual Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
+                            const std::vector<int>& rand_column_families,
+                            const std::vector<int64_t>& rand_keys,
+                            std::unique_ptr<MutexLock>& /* lock */) {
+    std::string key_str = Key(rand_keys[0]);
+    Slice key = key_str;
+    WriteBatch batch;
+    for (auto cf : rand_column_families) {
+      ColumnFamilyHandle* cfh = column_families_[cf];
+      batch.Delete(cfh, key);
+    }
+    Status s = db_->Write(write_opts, &batch);
+    if (!s.ok()) {
+      fprintf(stderr, "multidel error: %s\n", s.ToString().c_str());
+      thread->stats.AddErrors(1);
+    } else {
+      thread->stats.AddDeletes(static_cast<long>(rand_column_families.size()));
+    }
+    return s;
+  }
+
+  virtual Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
+                                 const std::vector<int>& rand_column_families,
+                                 const std::vector<int64_t>& rand_keys,
+                                 std::unique_ptr<MutexLock>& /* lock */) {
+    int64_t rand_key = rand_keys[0];
+    auto shared = thread->shared;
+    int64_t max_key = shared->GetMaxKey();
+    if (rand_key > max_key - FLAGS_range_deletion_width) {
+      rand_key =
+          thread->rand.Next() % (max_key - FLAGS_range_deletion_width + 1);
+    }
+    std::string key_str = Key(rand_key);
+    Slice key = key_str;
+    std::string end_key_str = Key(rand_key + FLAGS_range_deletion_width);
+    Slice end_key = end_key_str;
+    WriteBatch batch;
+    for (auto cf : rand_column_families) {
+      ColumnFamilyHandle* cfh = column_families_[rand_column_families[cf]];
+      batch.DeleteRange(cfh, key, end_key);
+    }
+    Status s = db_->Write(write_opts, &batch);
+    if (!s.ok()) {
+      fprintf(stderr, "multi del range error: %s\n", s.ToString().c_str());
+      thread->stats.AddErrors(1);
+    } else {
+      thread->stats.AddRangeDeletions(
+          static_cast<long>(rand_column_families.size()));
+    }
+    return s;
+  }
+
+  virtual void TestIngestExternalFile(
+      ThreadState* /* thread */,
+      const std::vector<int>& /* rand_column_families */,
+      const std::vector<int64_t>& /* rand_keys */,
+      std::unique_ptr<MutexLock>& /* lock */) {
+    assert(false);
+    fprintf(stderr,
+            "AtomicFlushStressTest does not support TestIngestExternalFile "
+            "because it's not possible to verify the result\n");
+    std::terminate();
+  }
+
+  virtual Status TestGet(ThreadState* thread, const ReadOptions& readoptions,
+                         const std::vector<int>& rand_column_families,
+                         const std::vector<int64_t>& rand_keys) {
+    std::string key_str = Key(rand_keys[0]);
+    Slice key = key_str;
+    auto cfh =
+        column_families_[rand_column_families[thread->rand.Next() %
+                                              rand_column_families.size()]];
+    std::string from_db;
+    Status s = db_->Get(readoptions, cfh, key, &from_db);
+    if (s.ok()) {
+      thread->stats.AddGets(1, 1);
+    } else if (s.IsNotFound()) {
+      thread->stats.AddGets(1, 0);
+    } else {
+      thread->stats.AddErrors(1);
+    }
+    return s;
+  }
+
+  virtual Status TestPrefixScan(ThreadState* thread,
+                                const ReadOptions& readoptions,
+                                const std::vector<int>& rand_column_families,
+                                const std::vector<int64_t>& rand_keys) {
+    std::string key_str = Key(rand_keys[0]);
+    Slice key = key_str;
+    Slice prefix = Slice(key.data(), FLAGS_prefix_size);
+
+    std::string upper_bound;
+    Slice ub_slice;
+    ReadOptions ro_copy = readoptions;
+    if (thread->rand.OneIn(2) && GetNextPrefix(prefix, &upper_bound)) {
+      ub_slice = Slice(upper_bound);
+      ro_copy.iterate_upper_bound = &ub_slice;
+    }
+    auto cfh =
+        column_families_[rand_column_families[thread->rand.Next() %
+                                              rand_column_families.size()]];
+    Iterator* iter = db_->NewIterator(ro_copy, cfh);
+    long count = 0;
+    for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
+         iter->Next()) {
+      ++count;
+    }
+    assert(count <= (static_cast<long>(1) << ((8 - FLAGS_prefix_size) * 8)));
+    Status s = iter->status();
+    if (s.ok()) {
+      thread->stats.AddPrefixes(1, count);
+    } else {
+      thread->stats.AddErrors(1);
+    }
+    delete iter;
+    return s;
+  }
+
+#ifdef ROCKSDB_LITE
+  virtual Status TestCheckpoint(
+      ThreadState* /* thread */,
+      const std::vector<int>& /* rand_column_families */,
+      const std::vector<int64_t>& /* rand_keys */) {
+    assert(false);
+    fprintf(stderr,
+            "RocksDB lite does not support "
+            "TestCheckpoint\n");
+    std::terminate();
+  }
+#else
+  virtual Status TestCheckpoint(
+      ThreadState* thread, const std::vector<int>& /* rand_column_families */,
+      const std::vector<int64_t>& /* rand_keys */) {
+    std::string checkpoint_dir =
+        FLAGS_db + "/.checkpoint" + ToString(thread->tid);
+    DestroyDB(checkpoint_dir, Options());
+    Checkpoint* checkpoint = nullptr;
+    Status s = Checkpoint::Create(db_, &checkpoint);
+    if (s.ok()) {
+      s = checkpoint->CreateCheckpoint(checkpoint_dir);
+    }
+    std::vector<ColumnFamilyHandle*> cf_handles;
+    DB* checkpoint_db = nullptr;
+    if (s.ok()) {
+      delete checkpoint;
+      checkpoint = nullptr;
+      Options options(options_);
+      options.listeners.clear();
+      std::vector<ColumnFamilyDescriptor> cf_descs;
+      // TODO(ajkr): `column_family_names_` is not safe to access here when
+      // `clear_column_family_one_in != 0`. But we can't easily switch to
+      // `ListColumnFamilies` to get names because it won't necessarily give
+      // the same order as `column_family_names_`.
+      if (FLAGS_clear_column_family_one_in == 0) {
+        for (const auto& name : column_family_names_) {
+          cf_descs.emplace_back(name, ColumnFamilyOptions(options));
+        }
+        s = DB::OpenForReadOnly(DBOptions(options), checkpoint_dir, cf_descs,
+                                &cf_handles, &checkpoint_db);
+      }
+    }
+    if (checkpoint_db != nullptr) {
+      for (auto cfh : cf_handles) {
+        delete cfh;
+      }
+      cf_handles.clear();
+      delete checkpoint_db;
+      checkpoint_db = nullptr;
+    }
+    DestroyDB(checkpoint_dir, Options());
+    if (!s.ok()) {
+      fprintf(stderr, "A checkpoint operation failed with: %s\n",
+              s.ToString().c_str());
+    }
+    return s;
+  }
+#endif  // !ROCKSDB_LITE
+
+  virtual void VerifyDb(ThreadState* thread) const {
+    ReadOptions options(FLAGS_verify_checksum, true);
+    // We must set total_order_seek to true because we are doing a SeekToFirst
+    // on a column family whose memtables may support (by default) prefix-based
+    // iterator. In this case, NewIterator with options.total_order_seek being
+    // false returns a prefix-based iterator. Calling SeekToFirst using this
+    // iterator causes the iterator to become invalid. That means we cannot
+    // iterate the memtable using this iterator any more, although the memtable
+    // contains the most up-to-date key-values.
+    options.total_order_seek = true;
+    assert(thread != nullptr);
+    auto shared = thread->shared;
+    std::vector<std::unique_ptr<Iterator> > iters(column_families_.size());
+    for (size_t i = 0; i != column_families_.size(); ++i) {
+      iters[i].reset(db_->NewIterator(options, column_families_[i]));
+    }
+    for (auto& iter : iters) {
+      iter->SeekToFirst();
+    }
+    size_t num = column_families_.size();
+    assert(num == iters.size());
+    std::vector<Status> statuses(num, Status::OK());
+    do {
+      size_t valid_cnt = 0;
+      size_t idx = 0;
+      for (auto& iter : iters) {
+        if (iter->Valid()) {
+          ++valid_cnt;
+        } else {
+          statuses[idx] = iter->status();
+        }
+        ++idx;
+      }
+      if (valid_cnt == 0) {
+        Status status;
+        for (size_t i = 0; i != num; ++i) {
+          const auto& s = statuses[i];
+          if (!s.ok()) {
+            status = s;
+            fprintf(stderr, "Iterator on cf %s has error: %s\n",
+                    column_families_[i]->GetName().c_str(),
+                    s.ToString().c_str());
+            shared->SetVerificationFailure();
+          }
+        }
+        if (status.ok()) {
+          fprintf(stdout, "Finished scanning all column families.\n");
+        }
+        break;
+      } else if (valid_cnt != iters.size()) {
+        for (size_t i = 0; i != num; ++i) {
+          if (!iters[i]->Valid()) {
+            if (statuses[i].ok()) {
+              fprintf(stderr, "Finished scanning cf %s\n",
+                      column_families_[i]->GetName().c_str());
+            } else {
+              fprintf(stderr, "Iterator on cf %s has error: %s\n",
+                      column_families_[i]->GetName().c_str(),
+                      statuses[i].ToString().c_str());
+            }
+          } else {
+            fprintf(stderr, "cf %s has remaining data to scan\n",
+                    column_families_[i]->GetName().c_str());
+          }
+        }
+        shared->SetVerificationFailure();
+        break;
+      }
+      // If the program reaches here, then all column families' iterators are
+      // still valid.
+      Slice key;
+      Slice value;
+      for (size_t i = 0; i != num; ++i) {
+        if (i == 0) {
+          key = iters[i]->key();
+          value = iters[i]->value();
+        } else {
+          if (key.compare(iters[i]->key()) != 0) {
+            fprintf(stderr, "Verification failed\n");
+            fprintf(stderr, "cf%s: %s => %s\n",
+                    column_families_[0]->GetName().c_str(),
+                    key.ToString(true /* hex */).c_str(),
+                    value.ToString(/* hex */).c_str());
+            fprintf(stderr, "cf%s: %s => %s\n",
+                    column_families_[i]->GetName().c_str(),
+                    iters[i]->key().ToString(true /* hex */).c_str(),
+                    iters[i]->value().ToString(true /* hex */).c_str());
+            shared->SetVerificationFailure();
+          }
+        }
+      }
+      for (auto& iter : iters) {
+        iter->Next();
+      }
+    } while (true);
+  }
+
+  virtual std::vector<int> GenerateColumnFamilies(
+      const int /* num_column_families */, int /* rand_column_family */) const {
+    std::vector<int> ret;
+    int num = static_cast<int>(column_families_.size());
+    int k = 0;
+    std::generate_n(back_inserter(ret), num, [&k]() -> int { return k++; });
+    return ret;
+  }
+
+ private:
+  std::atomic<int64_t> batch_id_;
+};
+
 }  // namespace rocksdb
 
 int main(int argc, char** argv) {
@@ -3415,6 +4035,26 @@ int main(int argc, char** argv) {
             "Error: nooverwritepercent must be 0 when using file ingestion\n");
     exit(1);
   }
+  if (FLAGS_clear_column_family_one_in > 0 && FLAGS_backup_one_in > 0) {
+    fprintf(stderr,
+            "Error: clear_column_family_one_in must be 0 when using backup\n");
+    exit(1);
+  }
+  if (FLAGS_test_atomic_flush) {
+    FLAGS_atomic_flush = true;
+  }
+  if (FLAGS_read_only) {
+    if (FLAGS_writepercent != 0 || FLAGS_delpercent != 0 ||
+        FLAGS_delrangepercent != 0) {
+      fprintf(stderr, "Error: updates are not supported in read only mode\n");
+      exit(1);
+    } else if (FLAGS_checkpoint_one_in > 0 &&
+               FLAGS_clear_column_family_one_in > 0) {
+      fprintf(stdout,
+              "Warn: checkpoint won't be validated since column families may "
+              "be dropped.\n");
+    }
+  }
 
   // Choose a location for the test database if none given with --db=<path>
   if (FLAGS_db.empty()) {
@@ -3428,7 +4068,9 @@ int main(int argc, char** argv) {
   rocksdb_kill_prefix_blacklist = SplitString(FLAGS_kill_prefix_blacklist);
 
   std::unique_ptr<rocksdb::StressTest> stress;
-  if (FLAGS_test_batches_snapshots) {
+  if (FLAGS_test_atomic_flush) {
+    stress.reset(new rocksdb::AtomicFlushStressTest());
+  } else if (FLAGS_test_batches_snapshots) {
     stress.reset(new rocksdb::BatchedOpsStressTest());
   } else {
     stress.reset(new rocksdb::NonBatchedOpsStressTest());