// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
-// This source code is licensed under the BSD-style license found in the
-// LICENSE file in the root directory of this source tree. An additional grant
-// of patent rights can be found in the PATENTS file in the same directory.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
#include <limits>
#include <list>
#include <map>
-#include <queue>
#include <set>
#include <string>
#include <utility>
#include "db/column_family.h"
#include "db/compaction_job.h"
#include "db/dbformat.h"
+#include "db/error_handler.h"
+#include "db/event_helpers.h"
#include "db/external_sst_file_ingestion_job.h"
#include "db/flush_job.h"
#include "db/flush_scheduler.h"
#include "db/internal_stats.h"
#include "db/log_writer.h"
+#include "db/logs_with_prep_tracker.h"
+#include "db/pre_release_callback.h"
+#include "db/read_callback.h"
+#include "db/snapshot_checker.h"
#include "db/snapshot_impl.h"
#include "db/version_edit.h"
#include "db/wal_manager.h"
#include "rocksdb/env.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/status.h"
+#include "rocksdb/trace_reader_writer.h"
#include "rocksdb/transaction_log.h"
#include "rocksdb/write_buffer_manager.h"
#include "table/scoped_arena_iterator.h"
#include "util/hash.h"
#include "util/stop_watch.h"
#include "util/thread_local.h"
+#include "util/trace_replay.h"
namespace rocksdb {
+class Arena;
+class ArenaWrappedDBIter;
class MemTable;
class TableCache;
class Version;
class VersionEdit;
class VersionSet;
-class Arena;
class WriteCallback;
struct JobContext;
struct ExternalSstFileInfo;
class DBImpl : public DB {
public:
- DBImpl(const DBOptions& options, const std::string& dbname);
+ DBImpl(const DBOptions& options, const std::string& dbname,
+ const bool seq_per_batch = false, const bool batch_per_txn = true);
virtual ~DBImpl();
+ using DB::Resume;
+ virtual Status Resume() override;
+
// Implementations of the DB interface
using DB::Put;
virtual Status Put(const WriteOptions& options,
virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;
+
+ // Function that Get and KeyMayExist call with no_io true or false
+ // Note: 'value_found' from KeyMayExist propagates here
+ Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
+ const Slice& key, PinnableSlice* value,
+ bool* value_found = nullptr, ReadCallback* callback = nullptr,
+ bool* is_blob_index = nullptr);
+
using DB::MultiGet;
virtual std::vector<Status> MultiGet(
const ReadOptions& options,
const std::vector<Slice>& keys,
std::vector<std::string>* values) override;
- virtual Status CreateColumnFamily(const ColumnFamilyOptions& options,
+ virtual Status CreateColumnFamily(const ColumnFamilyOptions& cf_options,
const std::string& column_family,
ColumnFamilyHandle** handle) override;
+ virtual Status CreateColumnFamilies(
+ const ColumnFamilyOptions& cf_options,
+ const std::vector<std::string>& column_family_names,
+ std::vector<ColumnFamilyHandle*>* handles) override;
+ virtual Status CreateColumnFamilies(
+ const std::vector<ColumnFamilyDescriptor>& column_families,
+ std::vector<ColumnFamilyHandle*>* handles) override;
virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override;
+ virtual Status DropColumnFamilies(
+ const std::vector<ColumnFamilyHandle*>& column_families) override;
// Returns false if key doesn't exist in the database and true if it may.
// If value_found is not passed in as null, then return the value if found in
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value,
bool* value_found = nullptr) override;
+
using DB::NewIterator;
virtual Iterator* NewIterator(const ReadOptions& options,
ColumnFamilyHandle* column_family) override;
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) override;
+ ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options,
+ ColumnFamilyData* cfd,
+ SequenceNumber snapshot,
+ ReadCallback* read_callback,
+ bool allow_blob = false,
+ bool allow_refresh = true);
+
virtual const Snapshot* GetSnapshot() override;
virtual void ReleaseSnapshot(const Snapshot* snapshot) override;
using DB::GetProperty;
virtual bool GetProperty(ColumnFamilyHandle* column_family,
const Slice& property, std::string* value) override;
using DB::GetMapProperty;
- virtual bool GetMapProperty(ColumnFamilyHandle* column_family,
- const Slice& property,
- std::map<std::string, double>* value) override;
+ virtual bool GetMapProperty(
+ ColumnFamilyHandle* column_family, const Slice& property,
+ std::map<std::string, std::string>* value) override;
using DB::GetIntProperty;
virtual bool GetIntProperty(ColumnFamilyHandle* column_family,
const Slice& property, uint64_t* value) override;
ColumnFamilyHandle* column_family,
const std::vector<std::string>& input_file_names,
const int output_level,
- const int output_path_id = -1) override;
+ const int output_path_id = -1,
+ std::vector<std::string>* const output_file_names
+ = nullptr) override;
virtual Status PauseBackgroundWork() override;
virtual Status ContinueBackgroundWork() override;
using DB::Flush;
virtual Status Flush(const FlushOptions& options,
ColumnFamilyHandle* column_family) override;
+ virtual Status FlushWAL(bool sync) override;
+ bool TEST_WALBufferIsEmpty();
virtual Status SyncWAL() override;
virtual SequenceNumber GetLatestSequenceNumber() const override;
+ // REQUIRES: joined the main write queue if two_write_queues is disabled, and
+ // the second write queue otherwise.
+ virtual void SetLastPublishedSequence(SequenceNumber seq);
+ // Returns LastSequence in last_seq_same_as_publish_seq_
+ // mode and LastAllocatedSequence otherwise. This is useful when visiblility
+ // depends also on data written to the WAL but not to the memtable.
+ SequenceNumber TEST_GetLastVisibleSequence() const;
+
+ virtual bool SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) override;
#ifndef ROCKSDB_LITE
using DB::ResetStats;
const TransactionLogIterator::ReadOptions&
read_options = TransactionLogIterator::ReadOptions()) override;
virtual Status DeleteFile(std::string name) override;
- Status DeleteFilesInRange(ColumnFamilyHandle* column_family,
- const Slice* begin, const Slice* end);
+ Status DeleteFilesInRanges(ColumnFamilyHandle* column_family,
+ const RangePtr* ranges, size_t n,
+ bool include_end = true);
virtual void GetLiveFilesMetaData(
std::vector<LiveFileMetaData>* metadata) override;
ColumnFamilyHandle* column_family,
ColumnFamilyMetaData* metadata) override;
- // experimental API
Status SuggestCompactRange(ColumnFamilyHandle* column_family,
- const Slice* begin, const Slice* end);
+ const Slice* begin, const Slice* end) override;
- Status PromoteL0(ColumnFamilyHandle* column_family, int target_level);
+ Status PromoteL0(ColumnFamilyHandle* column_family,
+ int target_level) override;
// Similar to Write() but will call the callback once on the single write
// thread to determine whether it is safe to perform the write.
// TODO(andrewkr): this API need to be aware of range deletion operations
Status GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
bool cache_only, SequenceNumber* seq,
- bool* found_record_for_key);
+ bool* found_record_for_key,
+ bool* is_blob_index = nullptr);
using DB::IngestExternalFile;
virtual Status IngestExternalFile(
const std::vector<std::string>& external_files,
const IngestExternalFileOptions& ingestion_options) override;
+ virtual Status VerifyChecksum() override;
+
+ using DB::StartTrace;
+ virtual Status StartTrace(
+ const TraceOptions& options,
+ std::unique_ptr<TraceWriter>&& trace_writer) override;
+
+ using DB::EndTrace;
+ virtual Status EndTrace() override;
+ Status TraceIteratorSeek(const uint32_t& cf_id, const Slice& key);
+ Status TraceIteratorSeekForPrev(const uint32_t& cf_id, const Slice& key);
#endif // ROCKSDB_LITE
// Similar to GetSnapshot(), but also lets the db know that this snapshot
Status RunManualCompaction(ColumnFamilyData* cfd, int input_level,
int output_level, uint32_t output_path_id,
+ uint32_t max_subcompactions,
const Slice* begin, const Slice* end,
bool exclusive,
bool disallow_trivial_move = false);
Arena* arena, RangeDelAggregator* range_del_agg,
ColumnFamilyHandle* column_family = nullptr);
+ LogsWithPrepTracker* logs_with_prep_tracker() {
+ return &logs_with_prep_tracker_;
+ }
+
#ifndef NDEBUG
// Extra methods (for testing) that are not in the public DB interface
// Implemented in db_impl_debug.cc
ColumnFamilyHandle* column_family = nullptr,
bool disallow_trivial_move = false);
- void TEST_HandleWALFull();
+ void TEST_SwitchWAL();
- bool TEST_UnableToFlushOldestLog() {
- return unable_to_flush_oldest_log_;
- }
+ bool TEST_UnableToReleaseOldestLog() { return unable_to_release_oldest_log_; }
bool TEST_IsLogGettingFlushed() {
return alive_log_files_.begin()->getting_flushed;
}
+ Status TEST_SwitchMemtable(ColumnFamilyData* cfd = nullptr);
+
// Force current memtable contents to be flushed.
- Status TEST_FlushMemTable(bool wait = true,
+ Status TEST_FlushMemTable(bool wait = true, bool allow_write_stall = false,
ColumnFamilyHandle* cfh = nullptr);
// Wait for memtable compaction
Status TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family = nullptr);
// Wait for any compaction
- Status TEST_WaitForCompact();
+ // We add a bool parameter to wait for unscheduledCompactions_ == 0, but this
+ // is only for the special test of CancelledCompactions
+ Status TEST_WaitForCompact(bool waitUnscheduled = false);
// Return the maximum overlapping data (in bytes) at next level for any
// file at a level >= 1.
// Return the current manifest file no.
uint64_t TEST_Current_Manifest_FileNo();
+ // Returns the number that'll be assigned to the next file that's created.
+ uint64_t TEST_Current_Next_FileNo();
+
// get total level0 file size. Only for testing.
uint64_t TEST_GetLevel0TotalSize();
Status TEST_GetAllImmutableCFOptions(
std::unordered_map<std::string, const ImmutableCFOptions*>* iopts_map);
- // Return the lastest MutableCFOptions of of a column family
+ // Return the lastest MutableCFOptions of a column family
Status TEST_GetLatestMutableCFOptions(ColumnFamilyHandle* column_family,
- MutableCFOptions* mutable_cf_opitons);
+ MutableCFOptions* mutable_cf_options);
Cache* TEST_table_cache() { return table_cache_.get(); }
uint64_t TEST_FindMinLogContainingOutstandingPrep();
uint64_t TEST_FindMinPrepLogReferencedByMemTable();
+ size_t TEST_PreparedSectionCompletedSize();
+ size_t TEST_LogsWithPrepSize();
int TEST_BGCompactionsAllowed() const;
+ int TEST_BGFlushesAllowed() const;
+ size_t TEST_GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
#endif // NDEBUG
- // Return maximum background compaction allowed to be scheduled based on
- // compaction status.
- int BGCompactionsAllowed() const;
+ struct BGJobLimits {
+ int max_flushes;
+ int max_compactions;
+ };
+ // Returns maximum background flushes and compactions allowed to be scheduled
+ BGJobLimits GetBGJobLimits() const;
+ // Need a static version that can be called during SanitizeOptions().
+ static BGJobLimits GetBGJobLimits(int max_background_flushes,
+ int max_background_compactions,
+ int max_background_jobs,
+ bool parallelize_compactions);
// move logs pending closing from job_context to the DB queue and
// schedule a purge
bool no_full_scan = false);
// Diffs the files listed in filenames and those that do not
- // belong to live files are posibly removed. Also, removes all the
+ // belong to live files are possibly removed. Also, removes all the
// files in sst_delete_files and log_delete_files.
// It is not necessary to hold the mutex when invoking this method.
- void PurgeObsoleteFiles(const JobContext& background_contet,
+ // If FindObsoleteFiles() was run, we need to also run
+ // PurgeObsoleteFiles(), even if disable_delete_obsolete_files_ is true
+ void PurgeObsoleteFiles(JobContext& background_contet,
bool schedule_only = false);
void SchedulePurge();
// mutex is held.
SuperVersion* GetAndRefSuperVersion(uint32_t column_family_id);
+ // Un-reference the super version and clean it up if it is the last reference.
+ void CleanupSuperVersion(SuperVersion* sv);
+
// Un-reference the super version and return it to thread local cache if
// needed. If it is the last reference of the super version. Clean it up
// after un-referencing it.
// mutex is released.
ColumnFamilyHandle* GetColumnFamilyHandle(uint32_t column_family_id);
+ // Same as above, should called without mutex held and not on write thread.
+ ColumnFamilyHandle* GetColumnFamilyHandleUnlocked(uint32_t column_family_id);
+
// Returns the number of currently running flushes.
// REQUIREMENT: mutex_ must be held when calling this function.
int num_running_flushes() {
const WriteController& write_controller() { return write_controller_; }
+ InternalIterator* NewInternalIterator(const ReadOptions&,
+ ColumnFamilyData* cfd,
+ SuperVersion* super_version,
+ Arena* arena,
+ RangeDelAggregator* range_del_agg);
+
// hollow transactions shell used for recovery.
// these will then be passed to TransactionDB so that
// locks can be reacquired before writing can resume.
struct RecoveredTransaction {
- uint64_t log_number_;
std::string name_;
- WriteBatch* batch_;
+ bool unprepared_;
+
+ struct BatchInfo {
+ uint64_t log_number_;
+ // TODO(lth): For unprepared, the memory usage here can be big for
+ // unprepared transactions. This is only useful for rollbacks, and we
+ // can in theory just keep keyset for that.
+ WriteBatch* batch_;
+ // Number of sub-batches. A new sub-batch is created if txn attempts to
+ // insert a duplicate key,seq to memtable. This is currently used in
+ // WritePreparedTxn/WriteUnpreparedTxn.
+ size_t batch_cnt_;
+ };
+
+ // This maps the seq of the first key in the batch to BatchInfo, which
+ // contains WriteBatch and other information relevant to the batch.
+ //
+ // For WriteUnprepared, batches_ can have size greater than 1, but for
+ // other write policies, it must be of size 1.
+ std::map<SequenceNumber, BatchInfo> batches_;
+
explicit RecoveredTransaction(const uint64_t log, const std::string& name,
- WriteBatch* batch)
- : log_number_(log), name_(name), batch_(batch) {}
+ WriteBatch* batch, SequenceNumber seq,
+ size_t batch_cnt, bool unprepared)
+ : name_(name), unprepared_(unprepared) {
+ batches_[seq] = {log, batch, batch_cnt};
+ }
+
+ ~RecoveredTransaction() {
+ for (auto& it : batches_) {
+ delete it.second.batch_;
+ }
+ }
- ~RecoveredTransaction() { delete batch_; }
+ void AddBatch(SequenceNumber seq, uint64_t log_number, WriteBatch* batch,
+ size_t batch_cnt, bool unprepared) {
+ assert(batches_.count(seq) == 0);
+ batches_[seq] = {log_number, batch, batch_cnt};
+ // Prior state must be unprepared, since the prepare batch must be the
+ // last batch.
+ assert(unprepared_);
+ unprepared_ = unprepared;
+ }
};
bool allow_2pc() const { return immutable_db_options_.allow_2pc; }
}
void InsertRecoveredTransaction(const uint64_t log, const std::string& name,
- WriteBatch* batch) {
- recovered_transactions_[name] = new RecoveredTransaction(log, name, batch);
- MarkLogAsContainingPrepSection(log);
+ WriteBatch* batch, SequenceNumber seq,
+ size_t batch_cnt, bool unprepared_batch) {
+ // For WriteUnpreparedTxn, InsertRecoveredTransaction is called multiple
+ // times for every unprepared batch encountered during recovery.
+ //
+ // If the transaction is prepared, then the last call to
+ // InsertRecoveredTransaction will have unprepared_batch = false.
+ auto rtxn = recovered_transactions_.find(name);
+ if (rtxn == recovered_transactions_.end()) {
+ recovered_transactions_[name] = new RecoveredTransaction(
+ log, name, batch, seq, batch_cnt, unprepared_batch);
+ } else {
+ rtxn->second->AddBatch(seq, log, batch, batch_cnt, unprepared_batch);
+ }
+ logs_with_prep_tracker_.MarkLogAsContainingPrepSection(log);
}
void DeleteRecoveredTransaction(const std::string& name) {
assert(it != recovered_transactions_.end());
auto* trx = it->second;
recovered_transactions_.erase(it);
- MarkLogAsHavingPrepSectionFlushed(trx->log_number_);
+ for (const auto& info : trx->batches_) {
+ logs_with_prep_tracker_.MarkLogAsHavingPrepSectionFlushed(
+ info.second.log_number_);
+ }
delete trx;
}
recovered_transactions_.clear();
}
- void MarkLogAsHavingPrepSectionFlushed(uint64_t log);
- void MarkLogAsContainingPrepSection(uint64_t log);
void AddToLogsToFreeQueue(log::Writer* log_writer) {
logs_to_free_queue_.push_back(log_writer);
}
+ void SetSnapshotChecker(SnapshotChecker* snapshot_checker);
+
+ // Not thread-safe.
+ void SetRecoverableStatePreReleaseCallback(PreReleaseCallback* callback);
+
+ InstrumentedMutex* mutex() { return &mutex_; }
+
Status NewDB();
+ // This is to be used only by internal rocksdb classes.
+ static Status Open(const DBOptions& db_options, const std::string& name,
+ const std::vector<ColumnFamilyDescriptor>& column_families,
+ std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
+ const bool seq_per_batch, const bool batch_per_txn);
+
+ virtual Status Close() override;
+
+ static Status CreateAndNewDirectory(Env* env, const std::string& dirname,
+ std::unique_ptr<Directory>* directory);
+
protected:
Env* const env_;
const std::string dbname_;
unique_ptr<VersionSet> versions_;
+ // Flag to check whether we allocated and own the info log file
+ bool own_info_log_;
const DBOptions initial_db_options_;
const ImmutableDBOptions immutable_db_options_;
MutableDBOptions mutable_db_options_;
Statistics* stats_;
std::unordered_map<std::string, RecoveredTransaction*>
recovered_transactions_;
-
- InternalIterator* NewInternalIterator(const ReadOptions&,
- ColumnFamilyData* cfd,
- SuperVersion* super_version,
- Arena* arena,
- RangeDelAggregator* range_del_agg);
+ std::unique_ptr<Tracer> tracer_;
+ InstrumentedMutex trace_mutex_;
// Except in DB::Open(), WriteOptionsFile can only be called when:
- // 1. WriteThread::Writer::EnterUnbatched() is used.
- // 2. db_mutex is held
- Status WriteOptionsFile();
+ // Persist options to options file.
+ // If need_mutex_lock = false, the method will lock DB mutex.
+ // If need_enter_write_thread = false, the method will enter write thread.
+ Status WriteOptionsFile(bool need_mutex_lock, bool need_enter_write_thread);
// The following two functions can only be called when:
// 1. WriteThread::Writer::EnterUnbatched() is used.
void EraseThreadStatusDbInfo() const;
+ // If disable_memtable is set the application logic must guarantee that the
+ // batch will still be skipped from memtable during the recovery. An excption
+ // to this is seq_per_batch_ mode, in which since each batch already takes one
+ // seq, it is ok for the batch to write to memtable during recovery as long as
+ // it only takes one sequence number: i.e., no duplicate keys.
+ // In WriteCommitted it is guarnateed since disable_memtable is used for
+ // prepare batch which will be written to memtable later during the commit,
+ // and in WritePrepared it is guaranteed since it will be used only for WAL
+ // markers which will never be written to memtable. If the commit marker is
+ // accompanied with CommitTimeWriteBatch that is not written to memtable as
+ // long as it has no duplicate keys, it does not violate the one-seq-per-batch
+ // policy.
+ // batch_cnt is expected to be non-zero in seq_per_batch mode and
+ // indicates the number of sub-patches. A sub-patch is a subset of the write
+ // batch that does not have duplicate keys.
Status WriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
- bool disable_memtable = false);
-
- uint64_t FindMinLogContainingOutstandingPrep();
- uint64_t FindMinPrepLogReferencedByMemTable();
+ bool disable_memtable = false, uint64_t* seq_used = nullptr,
+ size_t batch_cnt = 0,
+ PreReleaseCallback* pre_release_callback = nullptr);
+
+ Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates,
+ WriteCallback* callback = nullptr,
+ uint64_t* log_used = nullptr, uint64_t log_ref = 0,
+ bool disable_memtable = false,
+ uint64_t* seq_used = nullptr);
+
+ // batch_cnt is expected to be non-zero in seq_per_batch mode and indicates
+ // the number of sub-patches. A sub-patch is a subset of the write batch that
+ // does not have duplicate keys.
+ Status WriteImplWALOnly(const WriteOptions& options, WriteBatch* updates,
+ WriteCallback* callback = nullptr,
+ uint64_t* log_used = nullptr, uint64_t log_ref = 0,
+ uint64_t* seq_used = nullptr, size_t batch_cnt = 0,
+ PreReleaseCallback* pre_release_callback = nullptr);
+
+ // write cached_recoverable_state_ to memtable if it is not empty
+ // The writer must be the leader in write_thread_ and holding mutex_
+ Status WriteRecoverableState();
+
+ // Actual implementation of Close()
+ Status CloseImpl();
private:
friend class DB;
+ friend class ErrorHandler;
friend class InternalStats;
- friend class TransactionImpl;
+ friend class PessimisticTransaction;
+ friend class TransactionBaseImpl;
+ friend class WriteCommittedTxn;
+ friend class WritePreparedTxn;
+ friend class WritePreparedTxnDB;
+ friend class WriteBatchWithIndex;
+ friend class WriteUnpreparedTxnDB;
+ friend class WriteUnpreparedTxn;
+
#ifndef ROCKSDB_LITE
friend class ForwardIterator;
#endif
friend struct SuperVersion;
friend class CompactedDBImpl;
+ friend class DBTest_ConcurrentFlushWAL_Test;
+ friend class DBTest_MixedSlowdownOptionsStop_Test;
#ifndef NDEBUG
+ friend class DBTest2_ReadCallbackTest_Test;
+ friend class WriteCallbackTest_WriteWithCallbackTest_Test;
friend class XFTransactionWriteHandler;
+ friend class DBBlobIndexTest;
+ friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
#endif
struct CompactionState;
struct WriteContext {
- autovector<SuperVersion*> superversions_to_free_;
+ SuperVersionContext superversion_context;
autovector<MemTable*> memtables_to_free_;
+ explicit WriteContext(bool create_superversion = false)
+ : superversion_context(create_superversion) {}
+
~WriteContext() {
- for (auto& sv : superversions_to_free_) {
- delete sv;
- }
+ superversion_context.Clean();
for (auto& m : memtables_to_free_) {
delete m;
}
}
};
+ struct PrepickedCompaction;
struct PurgeFileInfo;
// Recover the descriptor from persistent storage. May do a significant
bool read_only = false, bool error_if_log_file_exist = false,
bool error_if_data_exists_in_logs = false);
+ Status ResumeImpl();
+
void MaybeIgnoreError(Status* s) const;
const Status CreateArchivalDirectory();
+ Status CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
+ const std::string& cf_name,
+ ColumnFamilyHandle** handle);
+
+ Status DropColumnFamilyImpl(ColumnFamilyHandle* column_family);
+
// Delete any unneeded files and stale in-memory entries.
void DeleteObsoleteFiles();
// Delete obsolete files and log status and information of file deletion
- void DeleteObsoleteFileImpl(Status file_deletion_status, int job_id,
- const std::string& fname, FileType type,
- uint64_t number, uint32_t path_id);
+ void DeleteObsoleteFileImpl(int job_id, const std::string& fname,
+ const std::string& path_to_sync, FileType type,
+ uint64_t number);
// Background process needs to call
// auto x = CaptureCurrentFileNumberInPendingOutputs()
Status SyncClosedLogs(JobContext* job_context);
// Flush the in-memory write buffer to storage. Switches to a new
- // log-file/memtable and writes a new descriptor iff successful.
+ // log-file/memtable and writes a new descriptor iff successful. Then
+ // installs a new super version for the column family.
Status FlushMemTableToOutputFile(ColumnFamilyData* cfd,
const MutableCFOptions& mutable_cf_options,
bool* madeProgress, JobContext* job_context,
+ SuperVersionContext* superversion_context,
LogBuffer* log_buffer);
+ // Argument required by background flush thread.
+ struct BGFlushArg {
+ BGFlushArg()
+ : cfd_(nullptr), memtable_id_(0), superversion_context_(nullptr) {}
+ BGFlushArg(ColumnFamilyData* cfd, uint64_t memtable_id,
+ SuperVersionContext* superversion_context)
+ : cfd_(cfd),
+ memtable_id_(memtable_id),
+ superversion_context_(superversion_context) {}
+
+ // Column family to flush.
+ ColumnFamilyData* cfd_;
+ // Maximum ID of memtable to flush. In this column family, memtables with
+ // IDs smaller than this value must be flushed before this flush completes.
+ uint64_t memtable_id_;
+ // Pointer to a SuperVersionContext object. After flush completes, RocksDB
+ // installs a new superversion for the column family. This operation
+ // requires a SuperVersionContext object (currently embedded in JobContext).
+ SuperVersionContext* superversion_context_;
+ };
+
+ // Flush the memtables of (multiple) column families to multiple files on
+ // persistent storage.
+ Status FlushMemTablesToOutputFiles(
+ const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
+ JobContext* job_context, LogBuffer* log_buffer);
+
// REQUIRES: log_numbers are sorted in ascending order
Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
SequenceNumber* next_sequence, bool read_only);
Status WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
MemTable* mem, VersionEdit* edit);
+ // Restore alive_log_files_ and total_log_size_ after recovery.
+ // It needs to run only when there's no flush during recovery
+ // (e.g. avoid_flush_during_recovery=true). May also trigger flush
+ // in case total_log_size > max_total_wal_size.
+ Status RestoreAliveLogFiles(const std::vector<uint64_t>& log_numbers);
+
// num_bytes: for slowdown case, delay time is calculated based on
// `num_bytes` going through.
Status DelayWrite(uint64_t num_bytes, const WriteOptions& write_options);
+ Status ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
+ WriteBatch* my_batch);
+
Status ScheduleFlushes(WriteContext* context);
Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context);
// Force current memtable contents to be flushed.
Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options,
- bool writes_stopped = false);
-
- // Wait for memtable flushed
- Status WaitForFlushMemTable(ColumnFamilyData* cfd);
+ FlushReason flush_reason, bool writes_stopped = false);
+
+ // Wait until flushing this column family won't stall writes
+ Status WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
+ bool* flush_needed);
+
+ // Wait for memtable flushed.
+ // If flush_memtable_id is non-null, wait until the memtable with the ID
+ // gets flush. Otherwise, wait until the column family don't have any
+ // memtable pending flush.
+ Status WaitForFlushMemTable(ColumnFamilyData* cfd,
+ const uint64_t* flush_memtable_id = nullptr) {
+ return WaitForFlushMemTables({cfd}, {flush_memtable_id});
+ }
+ // Wait for memtables to be flushed for multiple column families.
+ Status WaitForFlushMemTables(
+ const autovector<ColumnFamilyData*>& cfds,
+ const autovector<const uint64_t*>& flush_memtable_ids);
// REQUIRES: mutex locked
- Status HandleWALFull(WriteContext* write_context);
+ Status SwitchWAL(WriteContext* write_context);
// REQUIRES: mutex locked
Status HandleWriteBufferFull(WriteContext* write_context);
// REQUIRES: mutex locked
- Status PreprocessWrite(const WriteOptions& write_options, bool need_log_sync,
- bool* logs_getting_syned, WriteContext* write_context);
+ Status PreprocessWrite(const WriteOptions& write_options, bool* need_log_sync,
+ WriteContext* write_context);
- Status WriteToWAL(const autovector<WriteThread::Writer*>& write_group,
- log::Writer* log_writer, bool need_log_sync,
- bool need_log_dir_sync, SequenceNumber sequence);
+ WriteBatch* MergeBatch(const WriteThread::WriteGroup& write_group,
+ WriteBatch* tmp_batch, size_t* write_with_wal,
+ WriteBatch** to_be_cached_state);
- // Used by WriteImpl to update bg_error_ when encountering memtable insert
- // error.
- void UpdateBackgroundError(const Status& memtable_insert_status);
+ Status WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer,
+ uint64_t* log_used, uint64_t* log_size);
+
+ Status WriteToWAL(const WriteThread::WriteGroup& write_group,
+ log::Writer* log_writer, uint64_t* log_used,
+ bool need_log_sync, bool need_log_dir_sync,
+ SequenceNumber sequence);
+
+ Status ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
+ uint64_t* log_used, SequenceNumber* last_sequence,
+ size_t seq_inc);
+
+ // Used by WriteImpl to update bg_error_ if paranoid check is enabled.
+ void WriteStatusCheck(const Status& status);
+
+ // Used by WriteImpl to update bg_error_ in case of memtable insert error.
+ void MemTableInsertStatusCheck(const Status& memtable_insert_status);
#ifndef ROCKSDB_LITE
Status CompactFilesImpl(const CompactionOptions& compact_options,
ColumnFamilyData* cfd, Version* version,
const std::vector<std::string>& input_file_names,
+ std::vector<std::string>* const output_file_names,
const int output_level, int output_path_id,
JobContext* job_context, LogBuffer* log_buffer);
ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name);
void MaybeScheduleFlushOrCompaction();
- void SchedulePendingFlush(ColumnFamilyData* cfd);
+
+ // A flush request specifies the column families to flush as well as the
+ // largest memtable id to persist for each column family. Once all the
+ // memtables whose IDs are smaller than or equal to this per-column-family
+ // specified value, this flush request is considered to have completed its
+ // work of flushing this column family. After completing the work for all
+ // column families in this request, this flush is considered complete.
+ typedef std::vector<std::pair<ColumnFamilyData*, uint64_t>> FlushRequest;
+
+ void SchedulePendingFlush(const FlushRequest& req, FlushReason flush_reason);
+
void SchedulePendingCompaction(ColumnFamilyData* cfd);
- void SchedulePendingPurge(std::string fname, FileType type, uint64_t number,
- uint32_t path_id, int job_id);
+ void SchedulePendingPurge(std::string fname, std::string dir_to_sync,
+ FileType type, uint64_t number, int job_id);
static void BGWorkCompaction(void* arg);
+ // Runs a pre-chosen universal compaction involving bottom level in a
+ // separate, bottom-pri thread pool.
+ static void BGWorkBottomCompaction(void* arg);
static void BGWorkFlush(void* db);
static void BGWorkPurge(void* arg);
static void UnscheduleCallback(void* arg);
- void BackgroundCallCompaction(void* arg);
+ void BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
+ Env::Priority bg_thread_pri);
void BackgroundCallFlush();
void BackgroundCallPurge();
Status BackgroundCompaction(bool* madeProgress, JobContext* job_context,
- LogBuffer* log_buffer, void* m = 0);
+ LogBuffer* log_buffer,
+ PrepickedCompaction* prepicked_compaction);
Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
- LogBuffer* log_buffer);
+ LogBuffer* log_buffer, FlushReason* reason);
+
+ bool EnoughRoomForCompaction(ColumnFamilyData* cfd,
+ const std::vector<CompactionInputFiles>& inputs,
+ bool* sfm_bookkeeping, LogBuffer* log_buffer);
void PrintStatistics();
// helper functions for adding and removing from flush & compaction queues
void AddToCompactionQueue(ColumnFamilyData* cfd);
ColumnFamilyData* PopFirstFromCompactionQueue();
- void AddToFlushQueue(ColumnFamilyData* cfd);
- ColumnFamilyData* PopFirstFromFlushQueue();
+ FlushRequest PopFirstFromFlushQueue();
// helper function to call after some of the logs_ were synced
void MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status);
- const Snapshot* GetSnapshotImpl(bool is_write_conflict_boundary);
+ SnapshotImpl* GetSnapshotImpl(bool is_write_conflict_boundary);
uint64_t GetMaxTotalWalSize() const;
+ Directory* GetDataDir(ColumnFamilyData* cfd, size_t path_id) const;
+
+ Status CloseHelper();
+
+ Status FlushAllCFs(FlushReason flush_reason);
+
+ void WaitForBackgroundWork();
+
// table_cache_ provides its own synchronization
std::shared_ptr<Cache> table_cache_;
// Lock over the persistent DB state. Non-nullptr iff successfully acquired.
FileLock* db_lock_;
- // The mutex for options file related operations.
- // NOTE: should never acquire options_file_mutex_ and mutex_ at the
- // same time.
- InstrumentedMutex options_files_mutex_;
+ // In addition to mutex_, log_write_mutex_ protected writes to logs_ and
+ // logfile_number_. With two_write_queues it also protects alive_log_files_,
+ // and log_empty_. Refer to the definition of each variable below for more
+ // details.
+ InstrumentedMutex log_write_mutex_;
// State below is protected by mutex_
+ // With two_write_queues enabled, some of the variables that accessed during
+ // WriteToWAL need different synchronization: log_empty_, alive_log_files_,
+ // logs_, logfile_number_. Refer to the definition of each variable below for
+ // more description.
mutable InstrumentedMutex mutex_;
std::atomic<bool> shutting_down_;
// (i.e. whenever a flush is done, even if it didn't make any progress)
// * whenever there is an error in background purge, flush or compaction
// * whenever num_running_ingest_file_ goes to 0.
+ // * whenever pending_purge_obsolete_files_ goes to 0.
+ // * whenever disable_delete_obsolete_files_ goes to 0.
+ // * whenever SetOptions successfully updates options.
+ // * whenever a column family is dropped.
InstrumentedCondVar bg_cv_;
+ // Writes are protected by locking both mutex_ and log_write_mutex_, and reads
+ // must be under either mutex_ or log_write_mutex_. Since after ::Open,
+ // logfile_number_ is currently updated only in write_thread_, it can be read
+ // from the same write_thread_ without any locks.
uint64_t logfile_number_;
std::deque<uint64_t>
- log_recycle_files; // a list of log files that we can recycle
+ log_recycle_files_; // a list of log files that we can recycle
bool log_dir_synced_;
+ // Without two_write_queues, read and writes to log_empty_ are protected by
+ // mutex_. Since it is currently updated/read only in write_thread_, it can be
+ // accessed from the same write_thread_ without any locks. With
+ // two_write_queues writes, where it can be updated in different threads,
+ // read and writes are protected by log_write_mutex_ instead. This is to avoid
+ // expesnive mutex_ lock during WAL write, which update log_empty_.
bool log_empty_;
ColumnFamilyHandleImpl* default_cf_handle_;
InternalStats* default_cf_internal_stats_;
writer = nullptr;
return w;
}
- void ClearWriter() {
+ Status ClearWriter() {
+ Status s = writer->WriteBuffer();
delete writer;
writer = nullptr;
+ return s;
}
uint64_t number;
// true for some prefix of logs_
bool getting_synced = false;
};
+ // Without two_write_queues, read and writes to alive_log_files_ are
+ // protected by mutex_. However since back() is never popped, and push_back()
+ // is done only from write_thread_, the same thread can access the item
+ // reffered by back() without mutex_. With two_write_queues_, writes
+ // are protected by locking both mutex_ and log_write_mutex_, and reads must
+ // be under either mutex_ or log_write_mutex_.
std::deque<LogFileNumberSize> alive_log_files_;
// Log files that aren't fully synced, and the current log file.
// Synchronization:
- // - push_back() is done from write thread with locked mutex_,
- // - pop_front() is done from any thread with locked mutex_,
+ // - push_back() is done from write_thread_ with locked mutex_ and
+ // log_write_mutex_
+ // - pop_front() is done from any thread with locked mutex_ and
+ // log_write_mutex_
+ // - reads are done with either locked mutex_ or log_write_mutex_
// - back() and items with getting_synced=true are not popped,
- // - it follows that write thread with unlocked mutex_ can safely access
- // back() and items with getting_synced=true.
+ // - The same thread that sets getting_synced=true will reset it.
+ // - it follows that the object referred by back() can be safely read from
+ // the write_thread_ without using mutex
+ // - it follows that the items with getting_synced=true can be safely read
+ // from the same thread that has set getting_synced=true
std::deque<LogWriterNumber> logs_;
// Signaled when getting_synced becomes false for some of the logs_.
InstrumentedCondVar log_sync_cv_;
+ // This is the app-level state that is written to the WAL but will be used
+ // only during recovery. Using this feature enables not writing the state to
+ // memtable on normal writes and hence improving the throughput. Each new
+ // write of the state will replace the previous state entirely even if the
+ // keys in the two consecuitive states do not overlap.
+ // It is protected by log_write_mutex_ when two_write_queues_ is enabled.
+ // Otherwise only the heaad of write_thread_ can access it.
+ WriteBatch cached_recoverable_state_;
+ std::atomic<bool> cached_recoverable_state_empty_ = {true};
std::atomic<uint64_t> total_log_size_;
// only used for dynamically adjusting max_total_wal_size. it is a sum of
// [write_buffer_size * max_write_buffer_number] over all column families
const std::string& wal_dir,
const std::vector<DbPath>& data_paths);
- Directory* GetDataDir(size_t path_id);
+ Directory* GetDataDir(size_t path_id) const;
Directory* GetWalDir() {
if (wal_dir_) {
std::unique_ptr<Directory> db_dir_;
std::vector<std::unique_ptr<Directory>> data_dirs_;
std::unique_ptr<Directory> wal_dir_;
-
- Status CreateAndNewDirectory(Env* env, const std::string& dirname,
- std::unique_ptr<Directory>* directory) const;
};
Directories directories_;
WriteBufferManager* write_buffer_manager_;
WriteThread write_thread_;
-
WriteBatch tmp_batch_;
+ // The write thread when the writers have no memtable write. This will be used
+ // in 2PC to batch the prepares separately from the serial commit.
+ WriteThread nonmem_write_thread_;
WriteController write_controller_;
+ unique_ptr<RateLimiter> low_pri_write_rate_limiter_;
+
// Size of the last batch group. In slowdown mode, next write needs to
// sleep if it uses up the quota.
+ // Note: This is to protect memtable and compaction. If the batch only writes
+ // to the WAL its size need not to be included in this.
uint64_t last_batch_group_size_;
FlushScheduler flush_scheduler_;
// purge_queue_
struct PurgeFileInfo {
std::string fname;
+ std::string dir_to_sync;
FileType type;
uint64_t number;
- uint32_t path_id;
int job_id;
- PurgeFileInfo(std::string fn, FileType t, uint64_t num, uint32_t pid,
+ PurgeFileInfo(std::string fn, std::string d, FileType t, uint64_t num,
int jid)
- : fname(fn), type(t), number(num), path_id(pid), job_id(jid) {}
+ : fname(fn), dir_to_sync(d), type(t), number(num), job_id(jid) {}
};
// flush_queue_ and compaction_queue_ hold column families that we need to
// in MaybeScheduleFlushOrCompaction()
// invariant(column family present in flush_queue_ <==>
// ColumnFamilyData::pending_flush_ == true)
- std::deque<ColumnFamilyData*> flush_queue_;
+ std::deque<FlushRequest> flush_queue_;
// invariant(column family present in compaction_queue_ <==>
// ColumnFamilyData::pending_compaction_ == true)
std::deque<ColumnFamilyData*> compaction_queue_;
// A queue to store filenames of the files to be purged
std::deque<PurgeFileInfo> purge_queue_;
+ // A vector to store the file numbers that have been assigned to certain
+ // JobContext. Current implementation tracks ssts only.
+ std::vector<uint64_t> files_grabbed_for_purge_;
+
// A queue to store log writers to close
std::deque<log::Writer*> logs_to_free_queue_;
int unscheduled_flushes_;
int unscheduled_compactions_;
+ // count how many background compactions are running or have been scheduled in
+ // the BOTTOM pool
+ int bg_bottom_compaction_scheduled_;
+
// count how many background compactions are running or have been scheduled
int bg_compaction_scheduled_;
int bg_purge_scheduled_;
// Information for a manual compaction
- struct ManualCompaction {
+ struct ManualCompactionState {
ColumnFamilyData* cfd;
int input_level;
int output_level;
InternalKey* manual_end; // how far we are compacting
InternalKey tmp_storage; // Used to keep track of compaction progress
InternalKey tmp_storage1; // Used to keep track of compaction progress
+ };
+ struct PrepickedCompaction {
+ // background compaction takes ownership of `compaction`.
Compaction* compaction;
+ // caller retains ownership of `manual_compaction_state` as it is reused
+ // across background compactions.
+ ManualCompactionState* manual_compaction_state; // nullptr if non-manual
};
- std::deque<ManualCompaction*> manual_compaction_dequeue_;
+ std::deque<ManualCompactionState*> manual_compaction_dequeue_;
struct CompactionArg {
+ // caller retains ownership of `db`.
DBImpl* db;
- ManualCompaction* m;
+ // background compaction takes ownership of `prepicked_compaction`.
+ PrepickedCompaction* prepicked_compaction;
};
- // Have we encountered a background error in paranoid mode?
- Status bg_error_;
-
// shall we disable deletion of obsolete files
// if 0 the deletion is enabled.
// if non-zero, files will not be getting deleted
// without any synchronization
int disable_delete_obsolete_files_;
+ // Number of times FindObsoleteFiles has found deletable files and the
+ // corresponding call to PurgeObsoleteFiles has not yet finished.
+ int pending_purge_obsolete_files_;
+
// last time when DeleteObsoleteFiles with full scan was executed. Originaly
// initialized with startup time.
uint64_t delete_obsolete_files_last_run_;
// We must attempt to free the dependent memtables again
// at a later time after the transaction in the oldest
// log is fully commited.
- bool unable_to_flush_oldest_log_;
+ bool unable_to_release_oldest_log_;
static const int KEEP_LOG_FILE_NUM = 1000;
// MSVC version 1800 still does not have constexpr for ::max()
// The options to access storage files
const EnvOptions env_options_;
+ // Additonal options for compaction and flush
+ EnvOptions env_options_for_compaction_;
+
// Number of running IngestExternalFile() calls.
// REQUIRES: mutex held
int num_running_ingest_file_;
// Indicate DB was opened successfully
bool opened_successfully_;
- // minmum log number still containing prepared data.
- // this is used by FindObsoleteFiles to determine which
- // flushed logs we must keep around because they still
- // contain prepared data which has not been flushed or rolled back
- std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
- min_log_with_prep_;
-
- // to be used in conjunction with min_log_with_prep_.
- // once a transaction with data in log L is committed or rolled back
- // rather than removing the value from the heap we add that value
- // to prepared_section_completed_ which maps LOG -> instance_count
- // since a log could contain multiple prepared sections
- //
- // when trying to determine the minmum log still active we first
- // consult min_log_with_prep_. while that root value maps to
- // a value > 0 in prepared_section_completed_ we decrement the
- // instance_count for that log and pop the root value in
- // min_log_with_prep_. This will work the same as a min_heap
- // where we are deleteing arbitrary elements and the up heaping.
- std::unordered_map<uint64_t, uint64_t> prepared_section_completed_;
- std::mutex prep_heap_mutex_;
+ LogsWithPrepTracker logs_with_prep_tracker_;
+
+ // Callback for compaction to check if a key is visible to a snapshot.
+ // REQUIRES: mutex held
+ std::unique_ptr<SnapshotChecker> snapshot_checker_;
+
+ // Callback for when the cached_recoverable_state_ is written to memtable
+ // Only to be set during initialization
+ std::unique_ptr<PreReleaseCallback> recoverable_state_pre_release_callback_;
// No copying allowed
DBImpl(const DBImpl&);
// Background threads call this function, which is just a wrapper around
// the InstallSuperVersion() function. Background threads carry
- // job_context which can have new_superversion already
+ // sv_context which can have new_superversion already
// allocated.
- void InstallSuperVersionAndScheduleWorkWrapper(
- ColumnFamilyData* cfd, JobContext* job_context,
- const MutableCFOptions& mutable_cf_options);
-
// All ColumnFamily state changes go through this function. Here we analyze
// the new state and we schedule background work if we detect that the new
// state needs flush or compaction.
- SuperVersion* InstallSuperVersionAndScheduleWork(
- ColumnFamilyData* cfd, SuperVersion* new_sv,
- const MutableCFOptions& mutable_cf_options);
+ void InstallSuperVersionAndScheduleWork(
+ ColumnFamilyData* cfd, SuperVersionContext* sv_context,
+ const MutableCFOptions& mutable_cf_options,
+ FlushReason flush_reason = FlushReason::kOthers);
#ifndef ROCKSDB_LITE
using DB::GetPropertiesOfAllTables;
#endif // ROCKSDB_LITE
- // Function that Get and KeyMayExist call with no_io true or false
- // Note: 'value_found' from KeyMayExist propagates here
- Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
- const Slice& key, PinnableSlice* value,
- bool* value_found = nullptr);
-
bool GetIntPropertyInternal(ColumnFamilyData* cfd,
const DBPropertyInfo& property_info,
bool is_locked, uint64_t* value);
+ bool GetPropertyHandleOptionsStatistics(std::string* value);
bool HasPendingManualCompaction();
bool HasExclusiveManualCompaction();
- void AddManualCompaction(ManualCompaction* m);
- void RemoveManualCompaction(ManualCompaction* m);
- bool ShouldntRunManualCompaction(ManualCompaction* m);
+ void AddManualCompaction(ManualCompactionState* m);
+ void RemoveManualCompaction(ManualCompactionState* m);
+ bool ShouldntRunManualCompaction(ManualCompactionState* m);
bool HaveManualCompaction(ColumnFamilyData* cfd);
- bool MCOverlap(ManualCompaction* m, ManualCompaction* m1);
+ bool MCOverlap(ManualCompactionState* m, ManualCompactionState* m1);
+
+ bool ShouldPurge(uint64_t file_number) const;
+ void MarkAsGrabbedForPurge(uint64_t file_number);
size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
+ Env::WriteLifeTimeHint CalculateWALWriteHint() {
+ return Env::WLTH_SHORT;
+ }
+
+ // When set, we use a separate queue for writes that dont write to memtable.
+ // In 2PC these are the writes at Prepare phase.
+ const bool two_write_queues_;
+ const bool manual_wal_flush_;
+ // Increase the sequence number after writing each batch, whether memtable is
+ // disabled for that or not. Otherwise the sequence number is increased after
+ // writing each key into memtable. This implies that when disable_memtable is
+ // set, the seq is not increased at all.
+ //
+ // Default: false
+ const bool seq_per_batch_;
+ // This determines during recovery whether we expect one writebatch per
+ // recovered transaction, or potentially multiple writebatches per
+ // transaction. For WriteUnprepared, this is set to false, since multiple
+ // batches can exist per transaction.
+ //
+ // Default: true
+ const bool batch_per_txn_;
+ // LastSequence also indicates last published sequence visibile to the
+ // readers. Otherwise LastPublishedSequence should be used.
+ const bool last_seq_same_as_publish_seq_;
+ // It indicates that a customized gc algorithm must be used for
+ // flush/compaction and if it is not provided vis SnapshotChecker, we should
+ // disable gc to be safe.
+ const bool use_custom_gc_;
+ // Flag to indicate that the DB instance shutdown has been initiated. This
+ // different from shutting_down_ atomic in that it is set at the beginning
+ // of shutdown sequence, specifically in order to prevent any background
+ // error recovery from going on in parallel. The latter, shutting_down_,
+ // is set a little later during the shutdown after scheduling memtable
+ // flushes
+ bool shutdown_initiated_;
+ // Flag to indicate whether sst_file_manager object was allocated in
+ // DB::Open() or passed to us
+ bool own_sfm_;
+
+ // Clients must periodically call SetPreserveDeletesSequenceNumber()
+ // to advance this seqnum. Default value is 0 which means ALL deletes are
+ // preserved. Note that this has no effect if DBOptions.preserve_deletes
+ // is set to false.
+ std::atomic<SequenceNumber> preserve_deletes_seqnum_;
+ const bool preserve_deletes_;
+
+ // Flag to check whether Close() has been called on this DB
+ bool closed_;
+
+ ErrorHandler error_handler_;
};
extern Options SanitizeOptions(const std::string& db,
extern CompressionType GetCompressionFlush(
const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options);
-
+
+// Return the earliest log file to keep after the memtable flush is
+// finalized.
+// `cfd_to_flush` is the column family whose memtable (specified in
+// `memtables_to_flush`) will be flushed and thus will not depend on any WAL
+// file.
+// The function is only applicable to 2pc mode.
+extern uint64_t PrecomputeMinLogNumberToKeep(
+ VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
+ autovector<VersionEdit*> edit_list,
+ const autovector<MemTable*>& memtables_to_flush,
+ LogsWithPrepTracker* prep_tracker);
+
+// `cfd_to_flush` is the column family whose memtable will be flushed and thus
+// will not depend on any WAL file. nullptr means no memtable is being flushed.
+// The function is only applicable to 2pc mode.
+extern uint64_t FindMinPrepLogReferencedByMemTable(
+ VersionSet* vset, const ColumnFamilyData* cfd_to_flush,
+ const autovector<MemTable*>& memtables_to_flush);
+
// Fix user-supplied options to be reasonable
template <class T, class V>
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {