]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/db_impl.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / db / db_impl.h
index c6b4d756cbdf0f3362e15cec94d461b27226694d..2da8eca608f3da8654d6b6115045ead25c4ec2a8 100644 (file)
@@ -1,7 +1,7 @@
 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
-//  This source code is licensed under the BSD-style license found in the
-//  LICENSE file in the root directory of this source tree. An additional grant
-//  of patent rights can be found in the PATENTS file in the same directory.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
 //
 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
 // Use of this source code is governed by a BSD-style license that can be
@@ -14,7 +14,6 @@
 #include <limits>
 #include <list>
 #include <map>
-#include <queue>
 #include <set>
 #include <string>
 #include <utility>
 #include "db/column_family.h"
 #include "db/compaction_job.h"
 #include "db/dbformat.h"
+#include "db/error_handler.h"
+#include "db/event_helpers.h"
 #include "db/external_sst_file_ingestion_job.h"
 #include "db/flush_job.h"
 #include "db/flush_scheduler.h"
 #include "db/internal_stats.h"
 #include "db/log_writer.h"
+#include "db/logs_with_prep_tracker.h"
+#include "db/pre_release_callback.h"
+#include "db/read_callback.h"
+#include "db/snapshot_checker.h"
 #include "db/snapshot_impl.h"
 #include "db/version_edit.h"
 #include "db/wal_manager.h"
@@ -41,6 +46,7 @@
 #include "rocksdb/env.h"
 #include "rocksdb/memtablerep.h"
 #include "rocksdb/status.h"
+#include "rocksdb/trace_reader_writer.h"
 #include "rocksdb/transaction_log.h"
 #include "rocksdb/write_buffer_manager.h"
 #include "table/scoped_arena_iterator.h"
 #include "util/hash.h"
 #include "util/stop_watch.h"
 #include "util/thread_local.h"
+#include "util/trace_replay.h"
 
 namespace rocksdb {
 
+class Arena;
+class ArenaWrappedDBIter;
 class MemTable;
 class TableCache;
 class Version;
 class VersionEdit;
 class VersionSet;
-class Arena;
 class WriteCallback;
 struct JobContext;
 struct ExternalSstFileInfo;
@@ -65,9 +73,13 @@ struct MemTableInfo;
 
 class DBImpl : public DB {
  public:
-  DBImpl(const DBOptions& options, const std::string& dbname);
+  DBImpl(const DBOptions& options, const std::string& dbname,
+         const bool seq_per_batch = false, const bool batch_per_txn = true);
   virtual ~DBImpl();
 
+  using DB::Resume;
+  virtual Status Resume() override;
+
   // Implementations of the DB interface
   using DB::Put;
   virtual Status Put(const WriteOptions& options,
@@ -93,6 +105,14 @@ class DBImpl : public DB {
   virtual Status Get(const ReadOptions& options,
                      ColumnFamilyHandle* column_family, const Slice& key,
                      PinnableSlice* value) override;
+
+  // Function that Get and KeyMayExist call with no_io true or false
+  // Note: 'value_found' from KeyMayExist propagates here
+  Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
+                 const Slice& key, PinnableSlice* value,
+                 bool* value_found = nullptr, ReadCallback* callback = nullptr,
+                 bool* is_blob_index = nullptr);
+
   using DB::MultiGet;
   virtual std::vector<Status> MultiGet(
       const ReadOptions& options,
@@ -100,10 +120,19 @@ class DBImpl : public DB {
       const std::vector<Slice>& keys,
       std::vector<std::string>* values) override;
 
-  virtual Status CreateColumnFamily(const ColumnFamilyOptions& options,
+  virtual Status CreateColumnFamily(const ColumnFamilyOptions& cf_options,
                                     const std::string& column_family,
                                     ColumnFamilyHandle** handle) override;
+  virtual Status CreateColumnFamilies(
+      const ColumnFamilyOptions& cf_options,
+      const std::vector<std::string>& column_family_names,
+      std::vector<ColumnFamilyHandle*>* handles) override;
+  virtual Status CreateColumnFamilies(
+      const std::vector<ColumnFamilyDescriptor>& column_families,
+      std::vector<ColumnFamilyHandle*>* handles) override;
   virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override;
+  virtual Status DropColumnFamilies(
+      const std::vector<ColumnFamilyHandle*>& column_families) override;
 
   // Returns false if key doesn't exist in the database and true if it may.
   // If value_found is not passed in as null, then return the value if found in
@@ -114,6 +143,7 @@ class DBImpl : public DB {
                            ColumnFamilyHandle* column_family, const Slice& key,
                            std::string* value,
                            bool* value_found = nullptr) override;
+
   using DB::NewIterator;
   virtual Iterator* NewIterator(const ReadOptions& options,
                                 ColumnFamilyHandle* column_family) override;
@@ -121,15 +151,22 @@ class DBImpl : public DB {
       const ReadOptions& options,
       const std::vector<ColumnFamilyHandle*>& column_families,
       std::vector<Iterator*>* iterators) override;
+  ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options,
+                                      ColumnFamilyData* cfd,
+                                      SequenceNumber snapshot,
+                                      ReadCallback* read_callback,
+                                      bool allow_blob = false,
+                                      bool allow_refresh = true);
+
   virtual const Snapshot* GetSnapshot() override;
   virtual void ReleaseSnapshot(const Snapshot* snapshot) override;
   using DB::GetProperty;
   virtual bool GetProperty(ColumnFamilyHandle* column_family,
                            const Slice& property, std::string* value) override;
   using DB::GetMapProperty;
-  virtual bool GetMapProperty(ColumnFamilyHandle* column_family,
-                              const Slice& property,
-                              std::map<std::string, double>* value) override;
+  virtual bool GetMapProperty(
+      ColumnFamilyHandle* column_family, const Slice& property,
+      std::map<std::string, std::string>* value) override;
   using DB::GetIntProperty;
   virtual bool GetIntProperty(ColumnFamilyHandle* column_family,
                               const Slice& property, uint64_t* value) override;
@@ -156,7 +193,9 @@ class DBImpl : public DB {
                               ColumnFamilyHandle* column_family,
                               const std::vector<std::string>& input_file_names,
                               const int output_level,
-                              const int output_path_id = -1) override;
+                              const int output_path_id = -1,
+                              std::vector<std::string>* const output_file_names
+                              = nullptr) override;
 
   virtual Status PauseBackgroundWork() override;
   virtual Status ContinueBackgroundWork() override;
@@ -188,9 +227,20 @@ class DBImpl : public DB {
   using DB::Flush;
   virtual Status Flush(const FlushOptions& options,
                        ColumnFamilyHandle* column_family) override;
+  virtual Status FlushWAL(bool sync) override;
+  bool TEST_WALBufferIsEmpty();
   virtual Status SyncWAL() override;
 
   virtual SequenceNumber GetLatestSequenceNumber() const override;
+  // REQUIRES: joined the main write queue if two_write_queues is disabled, and
+  // the second write queue otherwise.
+  virtual void SetLastPublishedSequence(SequenceNumber seq);
+  // Returns LastSequence in last_seq_same_as_publish_seq_
+  // mode and LastAllocatedSequence otherwise. This is useful when visiblility
+  // depends also on data written to the WAL but not to the memtable.
+  SequenceNumber TEST_GetLastVisibleSequence() const;
+
+  virtual bool SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) override;
 
 #ifndef ROCKSDB_LITE
   using DB::ResetStats;
@@ -209,8 +259,9 @@ class DBImpl : public DB {
       const TransactionLogIterator::ReadOptions&
           read_options = TransactionLogIterator::ReadOptions()) override;
   virtual Status DeleteFile(std::string name) override;
-  Status DeleteFilesInRange(ColumnFamilyHandle* column_family,
-                            const Slice* begin, const Slice* end);
+  Status DeleteFilesInRanges(ColumnFamilyHandle* column_family,
+                             const RangePtr* ranges, size_t n,
+                             bool include_end = true);
 
   virtual void GetLiveFilesMetaData(
       std::vector<LiveFileMetaData>* metadata) override;
@@ -223,11 +274,11 @@ class DBImpl : public DB {
       ColumnFamilyHandle* column_family,
       ColumnFamilyMetaData* metadata) override;
 
-  // experimental API
   Status SuggestCompactRange(ColumnFamilyHandle* column_family,
-                             const Slice* begin, const Slice* end);
+                             const Slice* begin, const Slice* end) override;
 
-  Status PromoteL0(ColumnFamilyHandle* column_family, int target_level);
+  Status PromoteL0(ColumnFamilyHandle* column_family,
+                   int target_level) override;
 
   // Similar to Write() but will call the callback once on the single write
   // thread to determine whether it is safe to perform the write.
@@ -273,7 +324,8 @@ class DBImpl : public DB {
   // TODO(andrewkr): this API need to be aware of range deletion operations
   Status GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
                                  bool cache_only, SequenceNumber* seq,
-                                 bool* found_record_for_key);
+                                 bool* found_record_for_key,
+                                 bool* is_blob_index = nullptr);
 
   using DB::IngestExternalFile;
   virtual Status IngestExternalFile(
@@ -281,6 +333,17 @@ class DBImpl : public DB {
       const std::vector<std::string>& external_files,
       const IngestExternalFileOptions& ingestion_options) override;
 
+  virtual Status VerifyChecksum() override;
+
+  using DB::StartTrace;
+  virtual Status StartTrace(
+      const TraceOptions& options,
+      std::unique_ptr<TraceWriter>&& trace_writer) override;
+
+  using DB::EndTrace;
+  virtual Status EndTrace() override;
+  Status TraceIteratorSeek(const uint32_t& cf_id, const Slice& key);
+  Status TraceIteratorSeekForPrev(const uint32_t& cf_id, const Slice& key);
 #endif  // ROCKSDB_LITE
 
   // Similar to GetSnapshot(), but also lets the db know that this snapshot
@@ -297,6 +360,7 @@ class DBImpl : public DB {
 
   Status RunManualCompaction(ColumnFamilyData* cfd, int input_level,
                              int output_level, uint32_t output_path_id,
+                             uint32_t max_subcompactions,
                              const Slice* begin, const Slice* end,
                              bool exclusive,
                              bool disallow_trivial_move = false);
@@ -308,6 +372,10 @@ class DBImpl : public DB {
       Arena* arena, RangeDelAggregator* range_del_agg,
       ColumnFamilyHandle* column_family = nullptr);
 
+  LogsWithPrepTracker* logs_with_prep_tracker() {
+    return &logs_with_prep_tracker_;
+  }
+
 #ifndef NDEBUG
   // Extra methods (for testing) that are not in the public DB interface
   // Implemented in db_impl_debug.cc
@@ -317,25 +385,27 @@ class DBImpl : public DB {
                            ColumnFamilyHandle* column_family = nullptr,
                            bool disallow_trivial_move = false);
 
-  void TEST_HandleWALFull();
+  void TEST_SwitchWAL();
 
-  bool TEST_UnableToFlushOldestLog() {
-    return unable_to_flush_oldest_log_;
-  }
+  bool TEST_UnableToReleaseOldestLog() { return unable_to_release_oldest_log_; }
 
   bool TEST_IsLogGettingFlushed() {
     return alive_log_files_.begin()->getting_flushed;
   }
 
+  Status TEST_SwitchMemtable(ColumnFamilyData* cfd = nullptr);
+
   // Force current memtable contents to be flushed.
-  Status TEST_FlushMemTable(bool wait = true,
+  Status TEST_FlushMemTable(bool wait = true, bool allow_write_stall = false,
                             ColumnFamilyHandle* cfh = nullptr);
 
   // Wait for memtable compaction
   Status TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family = nullptr);
 
   // Wait for any compaction
-  Status TEST_WaitForCompact();
+  // We add a bool parameter to wait for unscheduledCompactions_ == 0, but this
+  // is only for the special test of CancelledCompactions
+  Status TEST_WaitForCompact(bool waitUnscheduled = false);
 
   // Return the maximum overlapping data (in bytes) at next level for any
   // file at a level >= 1.
@@ -345,6 +415,9 @@ class DBImpl : public DB {
   // Return the current manifest file no.
   uint64_t TEST_Current_Manifest_FileNo();
 
+  // Returns the number that'll be assigned to the next file that's created.
+  uint64_t TEST_Current_Next_FileNo();
+
   // get total level0 file size. Only for testing.
   uint64_t TEST_GetLevel0TotalSize();
 
@@ -376,9 +449,9 @@ class DBImpl : public DB {
   Status TEST_GetAllImmutableCFOptions(
       std::unordered_map<std::string, const ImmutableCFOptions*>* iopts_map);
 
-  // Return the lastest MutableCFOptions of of a column family
+  // Return the lastest MutableCFOptions of a column family
   Status TEST_GetLatestMutableCFOptions(ColumnFamilyHandle* column_family,
-                                        MutableCFOptions* mutable_cf_opitons);
+                                        MutableCFOptions* mutable_cf_options);
 
   Cache* TEST_table_cache() { return table_cache_.get(); }
 
@@ -386,14 +459,26 @@ class DBImpl : public DB {
 
   uint64_t TEST_FindMinLogContainingOutstandingPrep();
   uint64_t TEST_FindMinPrepLogReferencedByMemTable();
+  size_t TEST_PreparedSectionCompletedSize();
+  size_t TEST_LogsWithPrepSize();
 
   int TEST_BGCompactionsAllowed() const;
+  int TEST_BGFlushesAllowed() const;
+  size_t TEST_GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
 
 #endif  // NDEBUG
 
-  // Return maximum background compaction allowed to be scheduled based on
-  // compaction status.
-  int BGCompactionsAllowed() const;
+  struct BGJobLimits {
+    int max_flushes;
+    int max_compactions;
+  };
+  // Returns maximum background flushes and compactions allowed to be scheduled
+  BGJobLimits GetBGJobLimits() const;
+  // Need a static version that can be called during SanitizeOptions().
+  static BGJobLimits GetBGJobLimits(int max_background_flushes,
+                                    int max_background_compactions,
+                                    int max_background_jobs,
+                                    bool parallelize_compactions);
 
   // move logs pending closing from job_context to the DB queue and
   // schedule a purge
@@ -410,10 +495,12 @@ class DBImpl : public DB {
                          bool no_full_scan = false);
 
   // Diffs the files listed in filenames and those that do not
-  // belong to live files are posibly removed. Also, removes all the
+  // belong to live files are possibly removed. Also, removes all the
   // files in sst_delete_files and log_delete_files.
   // It is not necessary to hold the mutex when invoking this method.
-  void PurgeObsoleteFiles(const JobContext& background_contet,
+  // If FindObsoleteFiles() was run, we need to also run
+  // PurgeObsoleteFiles(), even if disable_delete_obsolete_files_ is true
+  void PurgeObsoleteFiles(JobContext& background_contet,
                           bool schedule_only = false);
 
   void SchedulePurge();
@@ -439,6 +526,9 @@ class DBImpl : public DB {
   // mutex is held.
   SuperVersion* GetAndRefSuperVersion(uint32_t column_family_id);
 
+  // Un-reference the super version and clean it up if it is the last reference.
+  void CleanupSuperVersion(SuperVersion* sv);
+
   // Un-reference the super version and return it to thread local cache if
   // needed. If it is the last reference of the super version. Clean it up
   // after un-referencing it.
@@ -454,6 +544,9 @@ class DBImpl : public DB {
   // mutex is released.
   ColumnFamilyHandle* GetColumnFamilyHandle(uint32_t column_family_id);
 
+  // Same as above, should called without mutex held and not on write thread.
+  ColumnFamilyHandle* GetColumnFamilyHandleUnlocked(uint32_t column_family_id);
+
   // Returns the number of currently running flushes.
   // REQUIREMENT: mutex_ must be held when calling this function.
   int num_running_flushes() {
@@ -470,18 +563,60 @@ class DBImpl : public DB {
 
   const WriteController& write_controller() { return write_controller_; }
 
+  InternalIterator* NewInternalIterator(const ReadOptions&,
+                                        ColumnFamilyData* cfd,
+                                        SuperVersion* super_version,
+                                        Arena* arena,
+                                        RangeDelAggregator* range_del_agg);
+
   // hollow transactions shell used for recovery.
   // these will then be passed to TransactionDB so that
   // locks can be reacquired before writing can resume.
   struct RecoveredTransaction {
-    uint64_t log_number_;
     std::string name_;
-    WriteBatch* batch_;
+    bool unprepared_;
+
+    struct BatchInfo {
+      uint64_t log_number_;
+      // TODO(lth): For unprepared, the memory usage here can be big for
+      // unprepared transactions. This is only useful for rollbacks, and we
+      // can in theory just keep keyset for that.
+      WriteBatch* batch_;
+      // Number of sub-batches. A new sub-batch is created if txn attempts to
+      // insert a duplicate key,seq to memtable. This is currently used in
+      // WritePreparedTxn/WriteUnpreparedTxn.
+      size_t batch_cnt_;
+    };
+
+    // This maps the seq of the first key in the batch to BatchInfo, which
+    // contains WriteBatch and other information relevant to the batch.
+    //
+    // For WriteUnprepared, batches_ can have size greater than 1, but for
+    // other write policies, it must be of size 1.
+    std::map<SequenceNumber, BatchInfo> batches_;
+
     explicit RecoveredTransaction(const uint64_t log, const std::string& name,
-                                  WriteBatch* batch)
-        : log_number_(log), name_(name), batch_(batch) {}
+                                  WriteBatch* batch, SequenceNumber seq,
+                                  size_t batch_cnt, bool unprepared)
+        : name_(name), unprepared_(unprepared) {
+      batches_[seq] = {log, batch, batch_cnt};
+    }
+
+    ~RecoveredTransaction() {
+      for (auto& it : batches_) {
+        delete it.second.batch_;
+      }
+    }
 
-    ~RecoveredTransaction() { delete batch_; }
+    void AddBatch(SequenceNumber seq, uint64_t log_number, WriteBatch* batch,
+                  size_t batch_cnt, bool unprepared) {
+      assert(batches_.count(seq) == 0);
+      batches_[seq] = {log_number, batch, batch_cnt};
+      // Prior state must be unprepared, since the prepare batch must be the
+      // last batch.
+      assert(unprepared_);
+      unprepared_ = unprepared;
+    }
   };
 
   bool allow_2pc() const { return immutable_db_options_.allow_2pc; }
@@ -501,9 +636,21 @@ class DBImpl : public DB {
   }
 
   void InsertRecoveredTransaction(const uint64_t log, const std::string& name,
-                                  WriteBatch* batch) {
-    recovered_transactions_[name] = new RecoveredTransaction(log, name, batch);
-    MarkLogAsContainingPrepSection(log);
+                                  WriteBatch* batch, SequenceNumber seq,
+                                  size_t batch_cnt, bool unprepared_batch) {
+    // For WriteUnpreparedTxn, InsertRecoveredTransaction is called multiple
+    // times for every unprepared batch encountered during recovery.
+    //
+    // If the transaction is prepared, then the last call to
+    // InsertRecoveredTransaction will have unprepared_batch = false.
+    auto rtxn = recovered_transactions_.find(name);
+    if (rtxn == recovered_transactions_.end()) {
+      recovered_transactions_[name] = new RecoveredTransaction(
+          log, name, batch, seq, batch_cnt, unprepared_batch);
+    } else {
+      rtxn->second->AddBatch(seq, log, batch, batch_cnt, unprepared_batch);
+    }
+    logs_with_prep_tracker_.MarkLogAsContainingPrepSection(log);
   }
 
   void DeleteRecoveredTransaction(const std::string& name) {
@@ -511,7 +658,10 @@ class DBImpl : public DB {
     assert(it != recovered_transactions_.end());
     auto* trx = it->second;
     recovered_transactions_.erase(it);
-    MarkLogAsHavingPrepSectionFlushed(trx->log_number_);
+    for (const auto& info : trx->batches_) {
+      logs_with_prep_tracker_.MarkLogAsHavingPrepSectionFlushed(
+          info.second.log_number_);
+    }
     delete trx;
   }
 
@@ -523,35 +673,50 @@ class DBImpl : public DB {
     recovered_transactions_.clear();
   }
 
-  void MarkLogAsHavingPrepSectionFlushed(uint64_t log);
-  void MarkLogAsContainingPrepSection(uint64_t log);
   void AddToLogsToFreeQueue(log::Writer* log_writer) {
     logs_to_free_queue_.push_back(log_writer);
   }
 
+  void SetSnapshotChecker(SnapshotChecker* snapshot_checker);
+
+  // Not thread-safe.
+  void SetRecoverableStatePreReleaseCallback(PreReleaseCallback* callback);
+
+  InstrumentedMutex* mutex() { return &mutex_; }
+
   Status NewDB();
 
+  // This is to be used only by internal rocksdb classes.
+  static Status Open(const DBOptions& db_options, const std::string& name,
+                     const std::vector<ColumnFamilyDescriptor>& column_families,
+                     std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
+                     const bool seq_per_batch, const bool batch_per_txn);
+
+  virtual Status Close() override;
+
+  static Status CreateAndNewDirectory(Env* env, const std::string& dirname,
+                                      std::unique_ptr<Directory>* directory);
+
  protected:
   Env* const env_;
   const std::string dbname_;
   unique_ptr<VersionSet> versions_;
+  // Flag to check whether we allocated and own the info log file
+  bool own_info_log_;
   const DBOptions initial_db_options_;
   const ImmutableDBOptions immutable_db_options_;
   MutableDBOptions mutable_db_options_;
   Statistics* stats_;
   std::unordered_map<std::string, RecoveredTransaction*>
       recovered_transactions_;
-
-  InternalIterator* NewInternalIterator(const ReadOptions&,
-                                        ColumnFamilyData* cfd,
-                                        SuperVersion* super_version,
-                                        Arena* arena,
-                                        RangeDelAggregator* range_del_agg);
+  std::unique_ptr<Tracer> tracer_;
+  InstrumentedMutex trace_mutex_;
 
   // Except in DB::Open(), WriteOptionsFile can only be called when:
-  // 1. WriteThread::Writer::EnterUnbatched() is used.
-  // 2. db_mutex is held
-  Status WriteOptionsFile();
+  // Persist options to options file.
+  // If need_mutex_lock = false, the method will lock DB mutex.
+  // If need_enter_write_thread = false, the method will enter write thread.
+  Status WriteOptionsFile(bool need_mutex_lock, bool need_enter_write_thread);
 
   // The following two functions can only be called when:
   // 1. WriteThread::Writer::EnterUnbatched() is used.
@@ -585,42 +750,95 @@ class DBImpl : public DB {
 
   void EraseThreadStatusDbInfo() const;
 
+  // If disable_memtable is set the application logic must guarantee that the
+  // batch will still be skipped from memtable during the recovery. An excption
+  // to this is seq_per_batch_ mode, in which since each batch already takes one
+  // seq, it is ok for the batch to write to memtable during recovery as long as
+  // it only takes one sequence number: i.e., no duplicate keys.
+  // In WriteCommitted it is guarnateed since disable_memtable is used for
+  // prepare batch which will be written to memtable later during the commit,
+  // and in WritePrepared it is guaranteed since it will be used only for WAL
+  // markers which will never be written to memtable. If the commit marker is
+  // accompanied with CommitTimeWriteBatch that is not written to memtable as
+  // long as it has no duplicate keys, it does not violate the one-seq-per-batch
+  // policy.
+  // batch_cnt is expected to be non-zero in seq_per_batch mode and
+  // indicates the number of sub-patches. A sub-patch is a subset of the write
+  // batch that does not have duplicate keys.
   Status WriteImpl(const WriteOptions& options, WriteBatch* updates,
                    WriteCallback* callback = nullptr,
                    uint64_t* log_used = nullptr, uint64_t log_ref = 0,
-                   bool disable_memtable = false);
-
-  uint64_t FindMinLogContainingOutstandingPrep();
-  uint64_t FindMinPrepLogReferencedByMemTable();
+                   bool disable_memtable = false, uint64_t* seq_used = nullptr,
+                   size_t batch_cnt = 0,
+                   PreReleaseCallback* pre_release_callback = nullptr);
+
+  Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates,
+                            WriteCallback* callback = nullptr,
+                            uint64_t* log_used = nullptr, uint64_t log_ref = 0,
+                            bool disable_memtable = false,
+                            uint64_t* seq_used = nullptr);
+
+  // batch_cnt is expected to be non-zero in seq_per_batch mode and indicates
+  // the number of sub-patches. A sub-patch is a subset of the write batch that
+  // does not have duplicate keys.
+  Status WriteImplWALOnly(const WriteOptions& options, WriteBatch* updates,
+                          WriteCallback* callback = nullptr,
+                          uint64_t* log_used = nullptr, uint64_t log_ref = 0,
+                          uint64_t* seq_used = nullptr, size_t batch_cnt = 0,
+                          PreReleaseCallback* pre_release_callback = nullptr);
+
+  // write cached_recoverable_state_ to memtable if it is not empty
+  // The writer must be the leader in write_thread_ and holding mutex_
+  Status WriteRecoverableState();
+
+  // Actual implementation of Close()
+  Status CloseImpl();
 
  private:
   friend class DB;
+  friend class ErrorHandler;
   friend class InternalStats;
-  friend class TransactionImpl;
+  friend class PessimisticTransaction;
+  friend class TransactionBaseImpl;
+  friend class WriteCommittedTxn;
+  friend class WritePreparedTxn;
+  friend class WritePreparedTxnDB;
+  friend class WriteBatchWithIndex;
+  friend class WriteUnpreparedTxnDB;
+  friend class WriteUnpreparedTxn;
+
 #ifndef ROCKSDB_LITE
   friend class ForwardIterator;
 #endif
   friend struct SuperVersion;
   friend class CompactedDBImpl;
+  friend class DBTest_ConcurrentFlushWAL_Test;
+  friend class DBTest_MixedSlowdownOptionsStop_Test;
 #ifndef NDEBUG
+  friend class DBTest2_ReadCallbackTest_Test;
+  friend class WriteCallbackTest_WriteWithCallbackTest_Test;
   friend class XFTransactionWriteHandler;
+  friend class DBBlobIndexTest;
+  friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
 #endif
   struct CompactionState;
 
   struct WriteContext {
-    autovector<SuperVersion*> superversions_to_free_;
+    SuperVersionContext superversion_context;
     autovector<MemTable*> memtables_to_free_;
 
+    explicit WriteContext(bool create_superversion = false)
+        : superversion_context(create_superversion) {}
+
     ~WriteContext() {
-      for (auto& sv : superversions_to_free_) {
-        delete sv;
-      }
+      superversion_context.Clean();
       for (auto& m : memtables_to_free_) {
         delete m;
       }
     }
   };
 
+  struct PrepickedCompaction;
   struct PurgeFileInfo;
 
   // Recover the descriptor from persistent storage.  May do a significant
@@ -630,16 +848,24 @@ class DBImpl : public DB {
                  bool read_only = false, bool error_if_log_file_exist = false,
                  bool error_if_data_exists_in_logs = false);
 
+  Status ResumeImpl();
+
   void MaybeIgnoreError(Status* s) const;
 
   const Status CreateArchivalDirectory();
 
+  Status CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
+                                const std::string& cf_name,
+                                ColumnFamilyHandle** handle);
+
+  Status DropColumnFamilyImpl(ColumnFamilyHandle* column_family);
+
   // Delete any unneeded files and stale in-memory entries.
   void DeleteObsoleteFiles();
   // Delete obsolete files and log status and information of file deletion
-  void DeleteObsoleteFileImpl(Status file_deletion_status, int job_id,
-                              const std::string& fname, FileType type,
-                              uint64_t number, uint32_t path_id);
+  void DeleteObsoleteFileImpl(int job_id, const std::string& fname,
+                              const std::string& path_to_sync, FileType type,
+                              uint64_t number);
 
   // Background process needs to call
   //     auto x = CaptureCurrentFileNumberInPendingOutputs()
@@ -663,12 +889,41 @@ class DBImpl : public DB {
   Status SyncClosedLogs(JobContext* job_context);
 
   // Flush the in-memory write buffer to storage.  Switches to a new
-  // log-file/memtable and writes a new descriptor iff successful.
+  // log-file/memtable and writes a new descriptor iff successful. Then
+  // installs a new super version for the column family.
   Status FlushMemTableToOutputFile(ColumnFamilyData* cfd,
                                    const MutableCFOptions& mutable_cf_options,
                                    bool* madeProgress, JobContext* job_context,
+                                   SuperVersionContext* superversion_context,
                                    LogBuffer* log_buffer);
 
+  // Argument required by background flush thread.
+  struct BGFlushArg {
+    BGFlushArg()
+        : cfd_(nullptr), memtable_id_(0), superversion_context_(nullptr) {}
+    BGFlushArg(ColumnFamilyData* cfd, uint64_t memtable_id,
+               SuperVersionContext* superversion_context)
+        : cfd_(cfd),
+          memtable_id_(memtable_id),
+          superversion_context_(superversion_context) {}
+
+    // Column family to flush.
+    ColumnFamilyData* cfd_;
+    // Maximum ID of memtable to flush. In this column family, memtables with
+    // IDs smaller than this value must be flushed before this flush completes.
+    uint64_t memtable_id_;
+    // Pointer to a SuperVersionContext object. After flush completes, RocksDB
+    // installs a new superversion for the column family. This operation
+    // requires a SuperVersionContext object (currently embedded in JobContext).
+    SuperVersionContext* superversion_context_;
+  };
+
+  // Flush the memtables of (multiple) column families to multiple files on
+  // persistent storage.
+  Status FlushMemTablesToOutputFiles(
+      const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
+      JobContext* job_context, LogBuffer* log_buffer);
+
   // REQUIRES: log_numbers are sorted in ascending order
   Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
                          SequenceNumber* next_sequence, bool read_only);
@@ -681,44 +936,82 @@ class DBImpl : public DB {
   Status WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
                                      MemTable* mem, VersionEdit* edit);
 
+  // Restore alive_log_files_ and total_log_size_ after recovery.
+  // It needs to run only when there's no flush during recovery
+  // (e.g. avoid_flush_during_recovery=true). May also trigger flush
+  // in case total_log_size > max_total_wal_size.
+  Status RestoreAliveLogFiles(const std::vector<uint64_t>& log_numbers);
+
   // num_bytes: for slowdown case, delay time is calculated based on
   //            `num_bytes` going through.
   Status DelayWrite(uint64_t num_bytes, const WriteOptions& write_options);
 
+  Status ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
+                                      WriteBatch* my_batch);
+
   Status ScheduleFlushes(WriteContext* context);
 
   Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context);
 
   // Force current memtable contents to be flushed.
   Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options,
-                       bool writes_stopped = false);
-
-  // Wait for memtable flushed
-  Status WaitForFlushMemTable(ColumnFamilyData* cfd);
+                       FlushReason flush_reason, bool writes_stopped = false);
+
+  // Wait until flushing this column family won't stall writes
+  Status WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
+                                           bool* flush_needed);
+
+  // Wait for memtable flushed.
+  // If flush_memtable_id is non-null, wait until the memtable with the ID
+  // gets flush. Otherwise, wait until the column family don't have any
+  // memtable pending flush.
+  Status WaitForFlushMemTable(ColumnFamilyData* cfd,
+                              const uint64_t* flush_memtable_id = nullptr) {
+    return WaitForFlushMemTables({cfd}, {flush_memtable_id});
+  }
+  // Wait for memtables to be flushed for multiple column families.
+  Status WaitForFlushMemTables(
+      const autovector<ColumnFamilyData*>& cfds,
+      const autovector<const uint64_t*>& flush_memtable_ids);
 
   // REQUIRES: mutex locked
-  Status HandleWALFull(WriteContext* write_context);
+  Status SwitchWAL(WriteContext* write_context);
 
   // REQUIRES: mutex locked
   Status HandleWriteBufferFull(WriteContext* write_context);
 
   // REQUIRES: mutex locked
-  Status PreprocessWrite(const WriteOptions& write_options, bool need_log_sync,
-                         bool* logs_getting_syned, WriteContext* write_context);
+  Status PreprocessWrite(const WriteOptions& write_options, bool* need_log_sync,
+                         WriteContext* write_context);
 
-  Status WriteToWAL(const autovector<WriteThread::Writer*>& write_group,
-                    log::Writer* log_writer, bool need_log_sync,
-                    bool need_log_dir_sync, SequenceNumber sequence);
+  WriteBatch* MergeBatch(const WriteThread::WriteGroup& write_group,
+                         WriteBatch* tmp_batch, size_t* write_with_wal,
+                         WriteBatch** to_be_cached_state);
 
-  // Used by WriteImpl to update bg_error_ when encountering memtable insert
-  // error.
-  void UpdateBackgroundError(const Status& memtable_insert_status);
+  Status WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer,
+                    uint64_t* log_used, uint64_t* log_size);
+
+  Status WriteToWAL(const WriteThread::WriteGroup& write_group,
+                    log::Writer* log_writer, uint64_t* log_used,
+                    bool need_log_sync, bool need_log_dir_sync,
+                    SequenceNumber sequence);
+
+  Status ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
+                              uint64_t* log_used, SequenceNumber* last_sequence,
+                              size_t seq_inc);
+
+  // Used by WriteImpl to update bg_error_ if paranoid check is enabled.
+  void WriteStatusCheck(const Status& status);
+
+  // Used by WriteImpl to update bg_error_ in case of memtable insert error.
+  void MemTableInsertStatusCheck(const Status& memtable_insert_status);
 
 #ifndef ROCKSDB_LITE
 
   Status CompactFilesImpl(const CompactionOptions& compact_options,
                           ColumnFamilyData* cfd, Version* version,
                           const std::vector<std::string>& input_file_names,
+                          std::vector<std::string>* const output_file_names,
                           const int output_level, int output_path_id,
                           JobContext* job_context, LogBuffer* log_buffer);
 
@@ -735,21 +1028,40 @@ class DBImpl : public DB {
   ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name);
 
   void MaybeScheduleFlushOrCompaction();
-  void SchedulePendingFlush(ColumnFamilyData* cfd);
+
+  // A flush request specifies the column families to flush as well as the
+  // largest memtable id to persist for each column family. Once all the
+  // memtables whose IDs are smaller than or equal to this per-column-family
+  // specified value, this flush request is considered to have completed its
+  // work of flushing this column family. After completing the work for all
+  // column families in this request, this flush is considered complete.
+  typedef std::vector<std::pair<ColumnFamilyData*, uint64_t>> FlushRequest;
+
+  void SchedulePendingFlush(const FlushRequest& req, FlushReason flush_reason);
+
   void SchedulePendingCompaction(ColumnFamilyData* cfd);
-  void SchedulePendingPurge(std::string fname, FileType type, uint64_t number,
-                            uint32_t path_id, int job_id);
+  void SchedulePendingPurge(std::string fname, std::string dir_to_sync,
+                            FileType type, uint64_t number, int job_id);
   static void BGWorkCompaction(void* arg);
+  // Runs a pre-chosen universal compaction involving bottom level in a
+  // separate, bottom-pri thread pool.
+  static void BGWorkBottomCompaction(void* arg);
   static void BGWorkFlush(void* db);
   static void BGWorkPurge(void* arg);
   static void UnscheduleCallback(void* arg);
-  void BackgroundCallCompaction(void* arg);
+  void BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
+                                Env::Priority bg_thread_pri);
   void BackgroundCallFlush();
   void BackgroundCallPurge();
   Status BackgroundCompaction(bool* madeProgress, JobContext* job_context,
-                              LogBuffer* log_buffer, void* m = 0);
+                              LogBuffer* log_buffer,
+                              PrepickedCompaction* prepicked_compaction);
   Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
-                         LogBuffer* log_buffer);
+                         LogBuffer* log_buffer, FlushReason* reason);
+
+  bool EnoughRoomForCompaction(ColumnFamilyData* cfd,
+                               const std::vector<CompactionInputFiles>& inputs,
+                               bool* sfm_bookkeeping, LogBuffer* log_buffer);
 
   void PrintStatistics();
 
@@ -769,27 +1081,39 @@ class DBImpl : public DB {
   // helper functions for adding and removing from flush & compaction queues
   void AddToCompactionQueue(ColumnFamilyData* cfd);
   ColumnFamilyData* PopFirstFromCompactionQueue();
-  void AddToFlushQueue(ColumnFamilyData* cfd);
-  ColumnFamilyData* PopFirstFromFlushQueue();
+  FlushRequest PopFirstFromFlushQueue();
 
   // helper function to call after some of the logs_ were synced
   void MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status);
 
-  const Snapshot* GetSnapshotImpl(bool is_write_conflict_boundary);
+  SnapshotImpl* GetSnapshotImpl(bool is_write_conflict_boundary);
 
   uint64_t GetMaxTotalWalSize() const;
 
+  Directory* GetDataDir(ColumnFamilyData* cfd, size_t path_id) const;
+
+  Status CloseHelper();
+
+  Status FlushAllCFs(FlushReason flush_reason);
+
+  void WaitForBackgroundWork();
+
   // table_cache_ provides its own synchronization
   std::shared_ptr<Cache> table_cache_;
 
   // Lock over the persistent DB state.  Non-nullptr iff successfully acquired.
   FileLock* db_lock_;
 
-  // The mutex for options file related operations.
-  // NOTE: should never acquire options_file_mutex_ and mutex_ at the
-  //       same time.
-  InstrumentedMutex options_files_mutex_;
+  // In addition to mutex_, log_write_mutex_ protected writes to logs_ and
+  // logfile_number_. With two_write_queues it also protects alive_log_files_,
+  // and log_empty_. Refer to the definition of each variable below for more
+  // details.
+  InstrumentedMutex log_write_mutex_;
   // State below is protected by mutex_
+  // With two_write_queues enabled, some of the variables that accessed during
+  // WriteToWAL need different synchronization: log_empty_, alive_log_files_,
+  // logs_, logfile_number_. Refer to the definition of each variable below for
+  // more description.
   mutable InstrumentedMutex mutex_;
 
   std::atomic<bool> shutting_down_;
@@ -802,11 +1126,25 @@ class DBImpl : public DB {
   // (i.e. whenever a flush is done, even if it didn't make any progress)
   // * whenever there is an error in background purge, flush or compaction
   // * whenever num_running_ingest_file_ goes to 0.
+  // * whenever pending_purge_obsolete_files_ goes to 0.
+  // * whenever disable_delete_obsolete_files_ goes to 0.
+  // * whenever SetOptions successfully updates options.
+  // * whenever a column family is dropped.
   InstrumentedCondVar bg_cv_;
+  // Writes are protected by locking both mutex_ and log_write_mutex_, and reads
+  // must be under either mutex_ or log_write_mutex_. Since after ::Open,
+  // logfile_number_ is currently updated only in write_thread_, it can be read
+  // from the same write_thread_ without any locks.
   uint64_t logfile_number_;
   std::deque<uint64_t>
-      log_recycle_files;  // a list of log files that we can recycle
+      log_recycle_files_;  // a list of log files that we can recycle
   bool log_dir_synced_;
+  // Without two_write_queues, read and writes to log_empty_ are protected by
+  // mutex_. Since it is currently updated/read only in write_thread_, it can be
+  // accessed from the same write_thread_ without any locks. With
+  // two_write_queues writes, where it can be updated in different threads,
+  // read and writes are protected by log_write_mutex_ instead. This is to avoid
+  // expesnive mutex_ lock during WAL write, which update log_empty_.
   bool log_empty_;
   ColumnFamilyHandleImpl* default_cf_handle_;
   InternalStats* default_cf_internal_stats_;
@@ -829,9 +1167,11 @@ class DBImpl : public DB {
       writer = nullptr;
       return w;
     }
-    void ClearWriter() {
+    Status ClearWriter() {
+      Status s = writer->WriteBuffer();
       delete writer;
       writer = nullptr;
+      return s;
     }
 
     uint64_t number;
@@ -841,17 +1181,38 @@ class DBImpl : public DB {
     // true for some prefix of logs_
     bool getting_synced = false;
   };
+  // Without two_write_queues, read and writes to alive_log_files_ are
+  // protected by mutex_. However since back() is never popped, and push_back()
+  // is done only from write_thread_, the same thread can access the item
+  // reffered by back() without mutex_. With two_write_queues_, writes
+  // are protected by locking both mutex_ and log_write_mutex_, and reads must
+  // be under either mutex_ or log_write_mutex_.
   std::deque<LogFileNumberSize> alive_log_files_;
   // Log files that aren't fully synced, and the current log file.
   // Synchronization:
-  //  - push_back() is done from write thread with locked mutex_,
-  //  - pop_front() is done from any thread with locked mutex_,
+  //  - push_back() is done from write_thread_ with locked mutex_ and
+  //  log_write_mutex_
+  //  - pop_front() is done from any thread with locked mutex_ and
+  //  log_write_mutex_
+  //  - reads are done with either locked mutex_ or log_write_mutex_
   //  - back() and items with getting_synced=true are not popped,
-  //  - it follows that write thread with unlocked mutex_ can safely access
-  //    back() and items with getting_synced=true.
+  //  - The same thread that sets getting_synced=true will reset it.
+  //  - it follows that the object referred by back() can be safely read from
+  //  the write_thread_ without using mutex
+  //  - it follows that the items with getting_synced=true can be safely read
+  //  from the same thread that has set getting_synced=true
   std::deque<LogWriterNumber> logs_;
   // Signaled when getting_synced becomes false for some of the logs_.
   InstrumentedCondVar log_sync_cv_;
+  // This is the app-level state that is written to the WAL but will be used
+  // only during recovery. Using this feature enables not writing the state to
+  // memtable on normal writes and hence improving the throughput. Each new
+  // write of the state will replace the previous state entirely even if the
+  // keys in the two consecuitive states do not overlap.
+  // It is protected by log_write_mutex_ when two_write_queues_ is enabled.
+  // Otherwise only the heaad of write_thread_ can access it.
+  WriteBatch cached_recoverable_state_;
+  std::atomic<bool> cached_recoverable_state_empty_ = {true};
   std::atomic<uint64_t> total_log_size_;
   // only used for dynamically adjusting max_total_wal_size. it is a sum of
   // [write_buffer_size * max_write_buffer_number] over all column families
@@ -872,7 +1233,7 @@ class DBImpl : public DB {
                           const std::string& wal_dir,
                           const std::vector<DbPath>& data_paths);
 
-    Directory* GetDataDir(size_t path_id);
+    Directory* GetDataDir(size_t path_id) const;
 
     Directory* GetWalDir() {
       if (wal_dir_) {
@@ -887,9 +1248,6 @@ class DBImpl : public DB {
     std::unique_ptr<Directory> db_dir_;
     std::vector<std::unique_ptr<Directory>> data_dirs_;
     std::unique_ptr<Directory> wal_dir_;
-
-    Status CreateAndNewDirectory(Env* env, const std::string& dirname,
-                                 std::unique_ptr<Directory>* directory) const;
   };
 
   Directories directories_;
@@ -897,13 +1255,19 @@ class DBImpl : public DB {
   WriteBufferManager* write_buffer_manager_;
 
   WriteThread write_thread_;
-
   WriteBatch tmp_batch_;
+  // The write thread when the writers have no memtable write. This will be used
+  // in 2PC to batch the prepares separately from the serial commit.
+  WriteThread nonmem_write_thread_;
 
   WriteController write_controller_;
 
+  unique_ptr<RateLimiter> low_pri_write_rate_limiter_;
+
   // Size of the last batch group. In slowdown mode, next write needs to
   // sleep if it uses up the quota.
+  // Note: This is to protect memtable and compaction. If the batch only writes
+  // to the WAL its size need not to be included in this.
   uint64_t last_batch_group_size_;
 
   FlushScheduler flush_scheduler_;
@@ -925,13 +1289,13 @@ class DBImpl : public DB {
   // purge_queue_
   struct PurgeFileInfo {
     std::string fname;
+    std::string dir_to_sync;
     FileType type;
     uint64_t number;
-    uint32_t path_id;
     int job_id;
-    PurgeFileInfo(std::string fn, FileType t, uint64_t num, uint32_t pid,
+    PurgeFileInfo(std::string fn, std::string d, FileType t, uint64_t num,
                   int jid)
-        : fname(fn), type(t), number(num), path_id(pid), job_id(jid) {}
+        : fname(fn), dir_to_sync(d), type(t), number(num), job_id(jid) {}
   };
 
   // flush_queue_ and compaction_queue_ hold column families that we need to
@@ -954,7 +1318,7 @@ class DBImpl : public DB {
   // in MaybeScheduleFlushOrCompaction()
   // invariant(column family present in flush_queue_ <==>
   // ColumnFamilyData::pending_flush_ == true)
-  std::deque<ColumnFamilyData*> flush_queue_;
+  std::deque<FlushRequest> flush_queue_;
   // invariant(column family present in compaction_queue_ <==>
   // ColumnFamilyData::pending_compaction_ == true)
   std::deque<ColumnFamilyData*> compaction_queue_;
@@ -962,11 +1326,19 @@ class DBImpl : public DB {
   // A queue to store filenames of the files to be purged
   std::deque<PurgeFileInfo> purge_queue_;
 
+  // A vector to store the file numbers that have been assigned to certain
+  // JobContext. Current implementation tracks ssts only.
+  std::vector<uint64_t> files_grabbed_for_purge_;
+
   // A queue to store log writers to close
   std::deque<log::Writer*> logs_to_free_queue_;
   int unscheduled_flushes_;
   int unscheduled_compactions_;
 
+  // count how many background compactions are running or have been scheduled in
+  // the BOTTOM pool
+  int bg_bottom_compaction_scheduled_;
+
   // count how many background compactions are running or have been scheduled
   int bg_compaction_scheduled_;
 
@@ -983,7 +1355,7 @@ class DBImpl : public DB {
   int bg_purge_scheduled_;
 
   // Information for a manual compaction
-  struct ManualCompaction {
+  struct ManualCompactionState {
     ColumnFamilyData* cfd;
     int input_level;
     int output_level;
@@ -999,18 +1371,23 @@ class DBImpl : public DB {
     InternalKey* manual_end;      // how far we are compacting
     InternalKey tmp_storage;      // Used to keep track of compaction progress
     InternalKey tmp_storage1;     // Used to keep track of compaction progress
+  };
+  struct PrepickedCompaction {
+    // background compaction takes ownership of `compaction`.
     Compaction* compaction;
+    // caller retains ownership of `manual_compaction_state` as it is reused
+    // across background compactions.
+    ManualCompactionState* manual_compaction_state;  // nullptr if non-manual
   };
-  std::deque<ManualCompaction*> manual_compaction_dequeue_;
+  std::deque<ManualCompactionState*> manual_compaction_dequeue_;
 
   struct CompactionArg {
+    // caller retains ownership of `db`.
     DBImpl* db;
-    ManualCompaction* m;
+    // background compaction takes ownership of `prepicked_compaction`.
+    PrepickedCompaction* prepicked_compaction;
   };
 
-  // Have we encountered a background error in paranoid mode?
-  Status bg_error_;
-
   // shall we disable deletion of obsolete files
   // if 0 the deletion is enabled.
   // if non-zero, files will not be getting deleted
@@ -1019,6 +1396,10 @@ class DBImpl : public DB {
   // without any synchronization
   int disable_delete_obsolete_files_;
 
+  // Number of times FindObsoleteFiles has found deletable files and the
+  // corresponding call to PurgeObsoleteFiles has not yet finished.
+  int pending_purge_obsolete_files_;
+
   // last time when DeleteObsoleteFiles with full scan was executed. Originaly
   // initialized with startup time.
   uint64_t delete_obsolete_files_last_run_;
@@ -1041,7 +1422,7 @@ class DBImpl : public DB {
   // We must attempt to free the dependent memtables again
   // at a later time after the transaction in the oldest
   // log is fully commited.
-  bool unable_to_flush_oldest_log_;
+  bool unable_to_release_oldest_log_;
 
   static const int KEEP_LOG_FILE_NUM = 1000;
   // MSVC version 1800 still does not have constexpr for ::max()
@@ -1052,6 +1433,9 @@ class DBImpl : public DB {
   // The options to access storage files
   const EnvOptions env_options_;
 
+  // Additonal options for compaction and flush
+  EnvOptions env_options_for_compaction_;
+
   // Number of running IngestExternalFile() calls.
   // REQUIRES: mutex held
   int num_running_ingest_file_;
@@ -1075,27 +1459,15 @@ class DBImpl : public DB {
   // Indicate DB was opened successfully
   bool opened_successfully_;
 
-  // minmum log number still containing prepared data.
-  // this is used by FindObsoleteFiles to determine which
-  // flushed logs we must keep around because they still
-  // contain prepared data which has not been flushed or rolled back
-  std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
-      min_log_with_prep_;
-
-  // to be used in conjunction with min_log_with_prep_.
-  // once a transaction with data in log L is committed or rolled back
-  // rather than removing the value from the heap we add that value
-  // to prepared_section_completed_ which maps LOG -> instance_count
-  // since a log could contain multiple prepared sections
-  //
-  // when trying to determine the minmum log still active we first
-  // consult min_log_with_prep_. while that root value maps to
-  // a value > 0 in prepared_section_completed_ we decrement the
-  // instance_count for that log and pop the root value in
-  // min_log_with_prep_. This will work the same as a min_heap
-  // where we are deleteing arbitrary elements and the up heaping.
-  std::unordered_map<uint64_t, uint64_t> prepared_section_completed_;
-  std::mutex prep_heap_mutex_;
+  LogsWithPrepTracker logs_with_prep_tracker_;
+
+  // Callback for compaction to check if a key is visible to a snapshot.
+  // REQUIRES: mutex held
+  std::unique_ptr<SnapshotChecker> snapshot_checker_;
+
+  // Callback for when the cached_recoverable_state_ is written to memtable
+  // Only to be set during initialization
+  std::unique_ptr<PreReleaseCallback> recoverable_state_pre_release_callback_;
 
   // No copying allowed
   DBImpl(const DBImpl&);
@@ -1103,18 +1475,15 @@ class DBImpl : public DB {
 
   // Background threads call this function, which is just a wrapper around
   // the InstallSuperVersion() function. Background threads carry
-  // job_context which can have new_superversion already
+  // sv_context which can have new_superversion already
   // allocated.
-  void InstallSuperVersionAndScheduleWorkWrapper(
-      ColumnFamilyData* cfd, JobContext* job_context,
-      const MutableCFOptions& mutable_cf_options);
-
   // All ColumnFamily state changes go through this function. Here we analyze
   // the new state and we schedule background work if we detect that the new
   // state needs flush or compaction.
-  SuperVersion* InstallSuperVersionAndScheduleWork(
-      ColumnFamilyData* cfd, SuperVersion* new_sv,
-      const MutableCFOptions& mutable_cf_options);
+  void InstallSuperVersionAndScheduleWork(
+      ColumnFamilyData* cfd, SuperVersionContext* sv_context,
+      const MutableCFOptions& mutable_cf_options,
+      FlushReason flush_reason = FlushReason::kOthers);
 
 #ifndef ROCKSDB_LITE
   using DB::GetPropertiesOfAllTables;
@@ -1127,25 +1496,74 @@ class DBImpl : public DB {
 
 #endif  // ROCKSDB_LITE
 
-  // Function that Get and KeyMayExist call with no_io true or false
-  // Note: 'value_found' from KeyMayExist propagates here
-  Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
-                 const Slice& key, PinnableSlice* value,
-                 bool* value_found = nullptr);
-
   bool GetIntPropertyInternal(ColumnFamilyData* cfd,
                               const DBPropertyInfo& property_info,
                               bool is_locked, uint64_t* value);
+  bool GetPropertyHandleOptionsStatistics(std::string* value);
 
   bool HasPendingManualCompaction();
   bool HasExclusiveManualCompaction();
-  void AddManualCompaction(ManualCompaction* m);
-  void RemoveManualCompaction(ManualCompaction* m);
-  bool ShouldntRunManualCompaction(ManualCompaction* m);
+  void AddManualCompaction(ManualCompactionState* m);
+  void RemoveManualCompaction(ManualCompactionState* m);
+  bool ShouldntRunManualCompaction(ManualCompactionState* m);
   bool HaveManualCompaction(ColumnFamilyData* cfd);
-  bool MCOverlap(ManualCompaction* m, ManualCompaction* m1);
+  bool MCOverlap(ManualCompactionState* m, ManualCompactionState* m1);
+
+  bool ShouldPurge(uint64_t file_number) const;
+  void MarkAsGrabbedForPurge(uint64_t file_number);
 
   size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
+  Env::WriteLifeTimeHint CalculateWALWriteHint() {
+    return Env::WLTH_SHORT;
+  }
+
+  // When set, we use a separate queue for writes that dont write to memtable.
+  // In 2PC these are the writes at Prepare phase.
+  const bool two_write_queues_;
+  const bool manual_wal_flush_;
+  // Increase the sequence number after writing each batch, whether memtable is
+  // disabled for that or not. Otherwise the sequence number is increased after
+  // writing each key into memtable. This implies that when disable_memtable is
+  // set, the seq is not increased at all.
+  //
+  // Default: false
+  const bool seq_per_batch_;
+  // This determines during recovery whether we expect one writebatch per
+  // recovered transaction, or potentially multiple writebatches per
+  // transaction. For WriteUnprepared, this is set to false, since multiple
+  // batches can exist per transaction.
+  //
+  // Default: true
+  const bool batch_per_txn_;
+  // LastSequence also indicates last published sequence visibile to the
+  // readers. Otherwise LastPublishedSequence should be used.
+  const bool last_seq_same_as_publish_seq_;
+  // It indicates that a customized gc algorithm must be used for
+  // flush/compaction and if it is not provided vis SnapshotChecker, we should
+  // disable gc to be safe.
+  const bool use_custom_gc_;
+  // Flag to indicate that the DB instance shutdown has been initiated. This
+  // different from shutting_down_ atomic in that it is set at the beginning
+  // of shutdown sequence, specifically in order to prevent any background
+  // error recovery from going on in parallel. The latter, shutting_down_,
+  // is set a little later during the shutdown after scheduling memtable
+  // flushes
+  bool shutdown_initiated_;
+  // Flag to indicate whether sst_file_manager object was allocated in
+  // DB::Open() or passed to us
+  bool own_sfm_;
+
+  // Clients must periodically call SetPreserveDeletesSequenceNumber()
+  // to advance this seqnum. Default value is 0 which means ALL deletes are
+  // preserved. Note that this has no effect if DBOptions.preserve_deletes
+  // is set to false.
+  std::atomic<SequenceNumber> preserve_deletes_seqnum_;
+  const bool preserve_deletes_;
+
+  // Flag to check whether Close() has been called on this DB
+  bool closed_;
+
+  ErrorHandler error_handler_;
 };
 
 extern Options SanitizeOptions(const std::string& db,
@@ -1156,7 +1574,26 @@ extern DBOptions SanitizeOptions(const std::string& db, const DBOptions& src);
 extern CompressionType GetCompressionFlush(
     const ImmutableCFOptions& ioptions,
     const MutableCFOptions& mutable_cf_options);
+
+// Return the earliest log file to keep after the memtable flush is
+// finalized.
+// `cfd_to_flush` is the column family whose memtable (specified in
+// `memtables_to_flush`) will be flushed and thus will not depend on any WAL
+// file.
+// The function is only applicable to 2pc mode.
+extern uint64_t PrecomputeMinLogNumberToKeep(
+    VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
+    autovector<VersionEdit*> edit_list,
+    const autovector<MemTable*>& memtables_to_flush,
+    LogsWithPrepTracker* prep_tracker);
+
+// `cfd_to_flush` is the column family whose memtable will be flushed and thus
+// will not depend on any WAL file. nullptr means no memtable is being flushed.
+// The function is only applicable to 2pc mode.
+extern uint64_t FindMinPrepLogReferencedByMemTable(
+    VersionSet* vset, const ColumnFamilyData* cfd_to_flush,
+    const autovector<MemTable*>& memtables_to_flush);
+
 // Fix user-supplied options to be reasonable
 template <class T, class V>
 static void ClipToRange(T* ptr, V minvalue, V maxvalue) {