// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
-// This source code is licensed under the BSD-style license found in the
-// LICENSE file in the root directory of this source tree. An additional grant
-// of patent rights can be found in the PATENTS file in the same directory.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
#include "db/file_indexer.h"
#include "db/log_reader.h"
#include "db/range_del_aggregator.h"
+#include "db/read_callback.h"
#include "db/table_cache.h"
#include "db/version_builder.h"
#include "db/version_edit.h"
}
class Compaction;
-class InternalIterator;
class LogBuffer;
class LookupKey;
class MemTable;
// Update the accumulated stats from a file-meta.
void UpdateAccumulatedStats(FileMetaData* file_meta);
- // Decrease the current stat form a to-be-delected file-meta
+ // Decrease the current stat from a to-be-deleted file-meta
void RemoveCurrentStats(FileMetaData* file_meta);
void ComputeCompensatedSizes();
// ComputeCompactionScore()
void ComputeFilesMarkedForCompaction();
+ // This computes ttl_expired_files_ and is called by
+ // ComputeCompactionScore()
+ void ComputeExpiredTtlFiles(const ImmutableCFOptions& ioptions,
+ const uint64_t ttl);
+
+ // This computes bottommost_files_marked_for_compaction_ and is called by
+ // ComputeCompactionScore() or UpdateOldestSnapshot().
+ //
+ // Among bottommost files (assumes they've already been computed), marks the
+ // ones that have keys that would be eliminated if recompacted, according to
+ // the seqnum of the oldest existing snapshot. Must be called every time
+ // oldest snapshot changes as that is when bottom-level files can become
+ // eligible for compaction.
+ //
+ // REQUIRES: DB mutex held
+ void ComputeBottommostFilesMarkedForCompaction();
+
// Generate level_files_brief_ from files_
void GenerateLevelFilesBrief();
// Sort all files for this version based on their file size and
return level0_non_overlapping_;
}
+ // Check whether each file in this version is bottommost (i.e., nothing in its
+ // key-range could possibly exist in an older file/level).
+ // REQUIRES: This version has not been saved
+ void GenerateBottommostFiles();
+
+ // Updates the oldest snapshot and related internal state, like the bottommost
+ // files marked for compaction.
+ // REQUIRES: DB mutex held
+ void UpdateOldestSnapshot(SequenceNumber oldest_snapshot_seqnum);
+
int MaxInputLevel() const;
+ int MaxOutputLevel(bool allow_ingest_behind) const;
// Return level number that has idx'th highest score
int CompactionScoreLevel(int idx) const { return compaction_level_[idx]; }
std::vector<FileMetaData*>* inputs,
int hint_index = -1, // index of overlap file
int* file_index = nullptr, // return index of overlap file
- bool expand_range = true) // if set, returns files which overlap the
- const; // range and overlap each other. If false,
+ bool expand_range = true, // if set, returns files which overlap the
+ // range and overlap each other. If false,
// then just files intersecting the range
+ InternalKey** next_smallest = nullptr) // if non-null, returns the
+ const; // smallest key of next file not included
void GetCleanInputsWithinInterval(
int level, const InternalKey* begin, // nullptr means before all keys
const InternalKey* end, // nullptr means after all keys
const;
void GetOverlappingInputsRangeBinarySearch(
- int level, // level > 0
- const Slice& begin, // nullptr means before all keys
- const Slice& end, // nullptr means after all keys
+ int level, // level > 0
+ const InternalKey* begin, // nullptr means before all keys
+ const InternalKey* end, // nullptr means after all keys
std::vector<FileMetaData*>* inputs,
int hint_index, // index of overlap file
int* file_index, // return index of overlap file
- bool within_interval = false) // if set, force the inputs within interval
- const;
+ bool within_interval = false, // if set, force the inputs within interval
+ InternalKey** next_smallest = nullptr) // if non-null, returns the
+ const; // smallest key of next file not included
void ExtendFileRangeOverlappingInterval(
int level,
- const Slice& begin, // nullptr means before all keys
- const Slice& end, // nullptr means after all keys
- unsigned int index, // start extending from this index
- int* startIndex, // return the startIndex of input range
- int* endIndex) // return the endIndex of input range
+ const InternalKey* begin, // nullptr means before all keys
+ const InternalKey* end, // nullptr means after all keys
+ unsigned int index, // start extending from this index
+ int* startIndex, // return the startIndex of input range
+ int* endIndex) // return the endIndex of input range
const;
void ExtendFileRangeWithinInterval(
int level,
- const Slice& begin, // nullptr means before all keys
- const Slice& end, // nullptr means after all keys
- unsigned int index, // start extending from this index
- int* startIndex, // return the startIndex of input range
- int* endIndex) // return the endIndex of input range
+ const InternalKey* begin, // nullptr means before all keys
+ const InternalKey* end, // nullptr means after all keys
+ unsigned int index, // start extending from this index
+ int* startIndex, // return the startIndex of input range
+ int* endIndex) // return the endIndex of input range
const;
// Returns true iff some file in the specified level overlaps
return files_marked_for_compaction_;
}
+ // REQUIRES: This version has been saved (see VersionSet::SaveTo)
+ // REQUIRES: DB mutex held during access
+ const autovector<std::pair<int, FileMetaData*>>& ExpiredTtlFiles() const {
+ assert(finalized_);
+ return expired_ttl_files_;
+ }
+
+ // REQUIRES: This version has been saved (see VersionSet::SaveTo)
+ // REQUIRES: DB mutex held during access
+ const autovector<std::pair<int, FileMetaData*>>&
+ BottommostFilesMarkedForCompaction() const {
+ assert(finalized_);
+ return bottommost_files_marked_for_compaction_;
+ }
+
int base_level() const { return base_level_; }
// REQUIRES: lock is held
bool force_consistency_checks() const { return force_consistency_checks_; }
+ // Returns whether any key in [`smallest_key`, `largest_key`] could appear in
+ // an older L0 file than `last_l0_idx` or in a greater level than `last_level`
+ //
+ // @param last_level Level after which we check for overlap
+ // @param last_l0_idx If `last_level == 0`, index of L0 file after which we
+ // check for overlap; otherwise, must be -1
+ bool RangeMightExistAfterSortedRun(const Slice& smallest_key,
+ const Slice& largest_key, int last_level,
+ int last_l0_idx);
+
private:
const InternalKeyComparator* internal_comparator_;
const Comparator* user_comparator_;
// ComputeCompactionScore()
autovector<std::pair<int, FileMetaData*>> files_marked_for_compaction_;
+ autovector<std::pair<int, FileMetaData*>> expired_ttl_files_;
+
+ // These files are considered bottommost because none of their keys can exist
+ // at lower levels. They are not necessarily all in the same level. The marked
+ // ones are eligible for compaction because they contain duplicate key
+ // versions that are no longer protected by snapshot. These variables are
+ // protected by DB mutex and are calculated in `GenerateBottommostFiles()` and
+ // `ComputeBottommostFilesMarkedForCompaction()`.
+ autovector<std::pair<int, FileMetaData*>> bottommost_files_;
+ autovector<std::pair<int, FileMetaData*>>
+ bottommost_files_marked_for_compaction_;
+
+ // Threshold for needing to mark another bottommost file. Maintain it so we
+ // can quickly check when releasing a snapshot whether more bottommost files
+ // became eligible for compaction. It's defined as the min of the max nonzero
+ // seqnums of unmarked bottommost files.
+ SequenceNumber bottommost_files_mark_threshold_ = kMaxSequenceNumber;
+
+ // Monotonically increases as we release old snapshots. Zero indicates no
+ // snapshots have been released yet. When no snapshots remain we set it to the
+ // current seqnum, which needs to be protected as a snapshot can still be
+ // created that references it.
+ SequenceNumber oldest_snapshot_seqnum_ = 0;
+
// Level that should be compacted next and its compaction score.
// Score < 1 means compaction is not strictly needed. These fields
// are initialized by Finalize().
uint64_t accumulated_num_deletions_;
// current number of non_deletion entries
uint64_t current_num_non_deletions_;
- // current number of delection entries
+ // current number of deletion entries
uint64_t current_num_deletions_;
// current number of file samples
uint64_t current_num_samples_;
MergeIteratorBuilder* merger_iter_builder,
int level, RangeDelAggregator* range_del_agg);
+ Status OverlapWithLevelIterator(const ReadOptions&, const EnvOptions&,
+ const Slice& smallest_user_key,
+ const Slice& largest_user_key,
+ int level, bool* overlap);
+
// Lookup the value for key. If found, store it in *val and
// return OK. Else return a non-OK status.
// Uses *operands to store merge_operator operations to apply later.
void Get(const ReadOptions&, const LookupKey& key, PinnableSlice* value,
Status* status, MergeContext* merge_context,
RangeDelAggregator* range_del_agg, bool* value_found = nullptr,
- bool* key_exists = nullptr, SequenceNumber* seq = nullptr);
+ bool* key_exists = nullptr, SequenceNumber* seq = nullptr,
+ ReadCallback* callback = nullptr, bool* is_blob = nullptr);
// Loads some stats information from files. Call without mutex held. It needs
// to be called before applying the version to the version set.
void AddLiveFiles(std::vector<FileDescriptor>* live);
// Return a human readable string that describes this version's contents.
- std::string DebugString(bool hex = false) const;
+ std::string DebugString(bool hex = false, bool print_stats = false) const;
- // Returns the version nuber of this version
+ // Returns the version number of this version
uint64_t GetVersionNumber() const { return version_number_; }
// REQUIRES: lock is held
// On success, "tp" will contains the table properties of the file
// specified in "file_meta". If the file name of "file_meta" is
- // known ahread, passing it by a non-null "fname" can save a
+ // known ahead, passing it by a non-null "fname" can save a
// file-name conversion.
Status GetTableProperties(std::shared_ptr<const TableProperties>* tp,
const FileMetaData* file_meta,
// REQUIRES: lock is held
// On success, *props will be populated with all SSTables' table properties.
// The keys of `props` are the sst file name, the values of `props` are the
- // tables' propertis, represented as shared_ptr.
+ // tables' properties, represented as shared_ptr.
Status GetPropertiesOfAllTables(TablePropertiesCollection* props);
Status GetPropertiesOfAllTables(TablePropertiesCollection* props, int level);
Status GetPropertiesOfTablesInRange(const Range* range, std::size_t n,
TablePropertiesCollection* props) const;
// REQUIRES: lock is held
- // On success, "tp" will contains the aggregated table property amoug
+ // On success, "tp" will contains the aggregated table property among
// the table properties of all sst files in this version.
Status GetAggregatedTableProperties(
std::shared_ptr<const TableProperties>* tp, int level = -1);
void GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta);
+ uint64_t GetSstFilesSize();
+
+ MutableCFOptions GetMutableCFOptions() { return mutable_cf_options_; }
+
private:
Env* env_;
friend class VersionSet;
bool IsFilterSkipped(int level, bool is_file_last_in_level = false);
// The helper function of UpdateAccumulatedStats, which may fill the missing
- // fields of file_mata from its associated TableProperties.
+ // fields of file_meta from its associated TableProperties.
// Returns true if it does initialize FileMetaData.
bool MaybeInitializeFileMetaData(FileMetaData* file_meta);
Version* next_; // Next version in linked list
Version* prev_; // Previous version in linked list
int refs_; // Number of live refs to this version
+ const EnvOptions env_options_;
+ const MutableCFOptions mutable_cf_options_;
// A version number that uniquely represents this version. This is
// used for debugging and logging purposes only.
uint64_t version_number_;
- Version(ColumnFamilyData* cfd, VersionSet* vset, uint64_t version_number = 0);
+ Version(ColumnFamilyData* cfd, VersionSet* vset, const EnvOptions& env_opt,
+ MutableCFOptions mutable_cf_options, uint64_t version_number = 0);
~Version();
void operator=(const Version&);
};
+struct ObsoleteFileInfo {
+ FileMetaData* metadata;
+ std::string path;
+
+ ObsoleteFileInfo() noexcept : metadata(nullptr) {}
+ ObsoleteFileInfo(FileMetaData* f, const std::string& file_path)
+ : metadata(f), path(file_path) {}
+
+ ObsoleteFileInfo(const ObsoleteFileInfo&) = delete;
+ ObsoleteFileInfo& operator=(const ObsoleteFileInfo&) = delete;
+
+ ObsoleteFileInfo(ObsoleteFileInfo&& rhs) noexcept :
+ ObsoleteFileInfo() {
+ *this = std::move(rhs);
+ }
+
+ ObsoleteFileInfo& operator=(ObsoleteFileInfo&& rhs) noexcept {
+ path = std::move(rhs.path);
+ metadata = rhs.metadata;
+ rhs.metadata = nullptr;
+
+ return *this;
+ }
+
+ void DeleteMetadata() {
+ delete metadata;
+ metadata = nullptr;
+ }
+};
+
+namespace {
+class BaseReferencedVersionBuilder;
+}
+
class VersionSet {
public:
VersionSet(const std::string& dbname, const ImmutableDBOptions* db_options,
InstrumentedMutex* mu, Directory* db_directory = nullptr,
bool new_descriptor_log = false,
const ColumnFamilyOptions* column_family_options = nullptr) {
- autovector<VersionEdit*> edit_list;
- edit_list.push_back(edit);
- return LogAndApply(column_family_data, mutable_cf_options, edit_list, mu,
+ std::vector<ColumnFamilyData*> cfds(1, column_family_data);
+ std::vector<MutableCFOptions> mutable_cf_options_list(1,
+ mutable_cf_options);
+ std::vector<autovector<VersionEdit*>> edit_lists(1, {edit});
+ return LogAndApply(cfds, mutable_cf_options_list, edit_lists, mu,
db_directory, new_descriptor_log, column_family_options);
}
// The batch version. If edit_list.size() > 1, caller must ensure that
const MutableCFOptions& mutable_cf_options,
const autovector<VersionEdit*>& edit_list, InstrumentedMutex* mu,
Directory* db_directory = nullptr, bool new_descriptor_log = false,
- const ColumnFamilyOptions* column_family_options = nullptr);
+ const ColumnFamilyOptions* column_family_options = nullptr) {
+ std::vector<ColumnFamilyData*> cfds(1, column_family_data);
+ std::vector<MutableCFOptions> mutable_cf_options_list(1,
+ mutable_cf_options);
+ std::vector<autovector<VersionEdit*>> edit_lists(1, edit_list);
+ return LogAndApply(cfds, mutable_cf_options_list, edit_lists, mu,
+ db_directory, new_descriptor_log, column_family_options);
+ }
+
+ // The across-multi-cf batch version. If edit_lists contain more than
+ // 1 version edits, caller must ensure that no edit in the []list is column
+ // family manipulation.
+ Status LogAndApply(const std::vector<ColumnFamilyData*>& cfds,
+ const std::vector<MutableCFOptions>& mutable_cf_options,
+ const std::vector<autovector<VersionEdit*>>& edit_lists,
+ InstrumentedMutex* mu, Directory* db_directory = nullptr,
+ bool new_descriptor_log = false,
+ const ColumnFamilyOptions* new_cf_options = nullptr);
// Recover the last saved descriptor from persistent storage.
// If read_only == true, Recover() will not complain if some column families
uint64_t current_next_file_number() const { return next_file_number_.load(); }
+ uint64_t min_log_number_to_keep_2pc() const {
+ return min_log_number_to_keep_2pc_.load();
+ }
+
// Allocate and return a new file number
uint64_t NewFileNumber() { return next_file_number_.fetch_add(1); }
+ // Fetch And Add n new file number
+ uint64_t FetchAddFileNumber(uint64_t n) {
+ return next_file_number_.fetch_add(n);
+ }
+
// Return the last sequence number.
uint64_t LastSequence() const {
return last_sequence_.load(std::memory_order_acquire);
}
+ // Note: memory_order_acquire must be sufficient.
+ uint64_t LastAllocatedSequence() const {
+ return last_allocated_sequence_.load(std::memory_order_seq_cst);
+ }
+
+ // Note: memory_order_acquire must be sufficient.
+ uint64_t LastPublishedSequence() const {
+ return last_published_sequence_.load(std::memory_order_seq_cst);
+ }
+
// Set the last sequence number to s.
void SetLastSequence(uint64_t s) {
assert(s >= last_sequence_);
+ // Last visible sequence must always be less than last written seq
+ assert(!db_options_->two_write_queues || s <= last_allocated_sequence_);
last_sequence_.store(s, std::memory_order_release);
}
+ // Note: memory_order_release must be sufficient
+ void SetLastPublishedSequence(uint64_t s) {
+ assert(s >= last_published_sequence_);
+ last_published_sequence_.store(s, std::memory_order_seq_cst);
+ }
+
+ // Note: memory_order_release must be sufficient
+ void SetLastAllocatedSequence(uint64_t s) {
+ assert(s >= last_allocated_sequence_);
+ last_allocated_sequence_.store(s, std::memory_order_seq_cst);
+ }
+
+ // Note: memory_order_release must be sufficient
+ uint64_t FetchAddLastAllocatedSequence(uint64_t s) {
+ return last_allocated_sequence_.fetch_add(s, std::memory_order_seq_cst);
+ }
+
// Mark the specified file number as used.
- // REQUIRED: this is only called during single-threaded recovery
- void MarkFileNumberUsedDuringRecovery(uint64_t number);
+ // REQUIRED: this is only called during single-threaded recovery or repair.
+ void MarkFileNumberUsed(uint64_t number);
+
+ // Mark the specified log number as deleted
+ // REQUIRED: this is only called during single-threaded recovery or repair, or
+ // from ::LogAndApply where the global mutex is held.
+ void MarkMinLogNumberToKeep2PC(uint64_t number);
// Return the log file number for the log file that is currently
// being compacted, or zero if there is no such log file.
uint64_t prev_log_number() const { return prev_log_number_; }
- // Returns the minimum log number such that all
- // log numbers less than or equal to it can be deleted
- uint64_t MinLogNumber() const {
+ // Returns the minimum log number which still has data not flushed to any SST
+ // file.
+ // In non-2PC mode, all the log numbers smaller than this number can be safely
+ // deleted.
+ uint64_t MinLogNumberWithUnflushedData() const {
+ return PreComputeMinLogNumberWithUnflushedData(nullptr);
+ }
+ // Returns the minimum log number which still has data not flushed to any SST
+ // file, except data from `cfd_to_skip`.
+ uint64_t PreComputeMinLogNumberWithUnflushedData(
+ const ColumnFamilyData* cfd_to_skip) const {
uint64_t min_log_num = std::numeric_limits<uint64_t>::max();
for (auto cfd : *column_family_set_) {
+ if (cfd == cfd_to_skip) {
+ continue;
+ }
// It's safe to ignore dropped column families here:
// cfd->IsDropped() becomes true after the drop is persisted in MANIFEST.
if (min_log_num > cfd->GetLogNumber() && !cfd->IsDropped()) {
// Create an iterator that reads over the compaction inputs for "*c".
// The caller should delete the iterator when no longer needed.
- InternalIterator* MakeInputIterator(const Compaction* c,
- RangeDelAggregator* range_del_agg);
+ InternalIterator* MakeInputIterator(
+ const Compaction* c, RangeDelAggregator* range_del_agg,
+ const EnvOptions& env_options_compactions);
// Add all files listed in any live version to *live.
void AddLiveFiles(std::vector<FileDescriptor>* live_list);
// This function doesn't support leveldb SST filenames
void GetLiveFilesMetaData(std::vector<LiveFileMetaData> *metadata);
- void GetObsoleteFiles(std::vector<FileMetaData*>* files,
+ void GetObsoleteFiles(std::vector<ObsoleteFileInfo>* files,
std::vector<std::string>* manifest_filenames,
uint64_t min_pending_output);
ColumnFamilySet* GetColumnFamilySet() { return column_family_set_.get(); }
const EnvOptions& env_options() { return env_options_; }
+ void ChangeEnvOptions(const MutableDBOptions& new_options) {
+ env_options_.writable_file_max_buffer_size =
+ new_options.writable_file_max_buffer_size;
+ }
+
+ const ImmutableDBOptions* db_options() const { return db_options_; }
static uint64_t GetNumLiveVersions(Version* dummy_versions);
struct LogReporter : public log::Reader::Reporter {
Status* status;
- virtual void Corruption(size_t bytes, const Status& s) override {
+ virtual void Corruption(size_t /*bytes*/, const Status& s) override {
if (this->status->ok()) *this->status = s;
}
};
ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& cf_options,
VersionEdit* edit);
+ Status ApplyOneVersionEdit(
+ VersionEdit& edit,
+ const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_opts,
+ std::unordered_map<int, std::string>& column_families_not_found,
+ std::unordered_map<uint32_t, BaseReferencedVersionBuilder*>& builders,
+ bool* have_log_number, uint64_t* log_number, bool* have_prev_log_number,
+ uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file,
+ bool* have_last_sequence, SequenceNumber* last_sequence,
+ uint64_t* min_log_number_to_keep, uint32_t* max_column_family);
+
+ Status ProcessManifestWrites(std::deque<ManifestWriter>& writers,
+ InstrumentedMutex* mu, Directory* db_directory,
+ bool new_descriptor_log,
+ const ColumnFamilyOptions* new_cf_options);
+
std::unique_ptr<ColumnFamilySet> column_family_set_;
Env* const env_;
const std::string dbname_;
const ImmutableDBOptions* const db_options_;
std::atomic<uint64_t> next_file_number_;
+ // Any log number equal or lower than this should be ignored during recovery,
+ // and is qualified for being deleted in 2PC mode. In non-2PC mode, this
+ // number is ignored.
+ std::atomic<uint64_t> min_log_number_to_keep_2pc_ = {0};
uint64_t manifest_file_number_;
uint64_t options_file_number_;
uint64_t pending_manifest_file_number_;
+ // The last seq visible to reads. It normally indicates the last sequence in
+ // the memtable but when using two write queues it could also indicate the
+ // last sequence in the WAL visible to reads.
std::atomic<uint64_t> last_sequence_;
+ // The last seq that is already allocated. It is applicable only when we have
+ // two write queues. In that case seq might or might not have appreated in
+ // memtable but it is expected to appear in the WAL.
+ // We have last_sequence <= last_allocated_sequence_
+ std::atomic<uint64_t> last_allocated_sequence_;
+ // The last allocated sequence that is also published to the readers. This is
+ // applicable only when last_seq_same_as_publish_seq_ is not set. Otherwise
+ // last_sequence_ also indicates the last published seq.
+ // We have last_sequence <= last_published_sequence_ <=
+ // last_allocated_sequence_
+ std::atomic<uint64_t> last_published_sequence_;
uint64_t prev_log_number_; // 0 or backing store for memtable being compacted
// Opened lazily
// Current size of manifest file
uint64_t manifest_file_size_;
- std::vector<FileMetaData*> obsolete_files_;
+ std::vector<ObsoleteFileInfo> obsolete_files_;
std::vector<std::string> obsolete_manifests_;
// env options for all reads and writes except compactions
- const EnvOptions& env_options_;
-
- // env options used for compactions. This is a copy of
- // env_options_ but with readaheads set to readahead_compactions_.
- const EnvOptions env_options_compactions_;
+ EnvOptions env_options_;
// No copying allowed
VersionSet(const VersionSet&);