// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
-// This source code is licensed under the BSD-style license found in the
-// LICENSE file in the root directory of this source tree. An additional grant
-// of patent rights can be found in the PATENTS file in the same directory.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
class Version;
class VersionSet;
+class VersionStorageInfo;
class MemTable;
class MemTableListVersion;
class CompactionPicker;
class LogBuffer;
class InstrumentedMutex;
class InstrumentedMutexLock;
+struct SuperVersionContext;
extern const double kIncSlowdownRatio;
class ColumnFamilyHandleInternal : public ColumnFamilyHandleImpl {
public:
ColumnFamilyHandleInternal()
- : ColumnFamilyHandleImpl(nullptr, nullptr, nullptr) {}
+ : ColumnFamilyHandleImpl(nullptr, nullptr, nullptr), internal_cfd_(nullptr) {}
void SetCFD(ColumnFamilyData* _cfd) { internal_cfd_ = _cfd; }
virtual ColumnFamilyData* cfd() const override { return internal_cfd_; }
MutableCFOptions mutable_cf_options;
// Version number of the current SuperVersion
uint64_t version_number;
+ WriteStallCondition write_stall_condition;
InstrumentedMutex* db_mutex;
extern Status CheckConcurrentWritesSupported(
const ColumnFamilyOptions& cf_options);
+extern Status CheckCFPathsSupported(const DBOptions& db_options,
+ const ColumnFamilyOptions& cf_options);
+
extern ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
const ColumnFamilyOptions& src);
// Wrap user defined table proproties collector factories `from cf_options`
// *) delete all memory associated with that column family
// *) delete all the files associated with that column family
void SetDropped();
- bool IsDropped() const { return dropped_; }
+ bool IsDropped() const { return dropped_.load(std::memory_order_relaxed); }
// thread-safe
int NumberLevels() const { return ioptions_.num_levels; }
void SetLogNumber(uint64_t log_number) { log_number_ = log_number; }
uint64_t GetLogNumber() const { return log_number_; }
+ void SetFlushReason(FlushReason flush_reason) {
+ flush_reason_ = flush_reason;
+ }
+ FlushReason GetFlushReason() const { return flush_reason_; }
// thread-safe
const EnvOptions* soptions() const;
const ImmutableCFOptions* ioptions() const { return &ioptions_; }
void SetCurrent(Version* _current);
uint64_t GetNumLiveVersions() const; // REQUIRE: DB mutex held
uint64_t GetTotalSstFilesSize() const; // REQUIRE: DB mutex held
- void SetMemtable(MemTable* new_mem) { mem_ = new_mem; }
+ uint64_t GetLiveSstFilesSize() const; // REQUIRE: DB mutex held
+ void SetMemtable(MemTable* new_mem) {
+ uint64_t memtable_id = last_memtable_id_.fetch_add(1) + 1;
+ new_mem->SetID(memtable_id);
+ mem_ = new_mem;
+ }
// calculate the oldest log needed for the durability of this column family
uint64_t OldestLogToKeep();
const Slice& largest_user_key,
int level) const;
+ // Check if the passed ranges overlap with any unflushed memtables
+ // (immutable or mutable).
+ //
+ // @param super_version A referenced SuperVersion that will be held for the
+ // duration of this function.
+ //
+ // Thread-safe
+ Status RangesOverlapWithMemtables(const autovector<Range>& ranges,
+ SuperVersion* super_version, bool* overlap);
+
// A flag to tell a manual compaction is to compact all levels together
- // instad of for specific level.
+ // instead of a specific level.
static const int kCompactAllLevels;
// A flag to tell a manual compaction's output is base level.
static const int kCompactToBaseLevel;
// REQUIRES: DB mutex held
Compaction* CompactRange(const MutableCFOptions& mutable_cf_options,
int input_level, int output_level,
- uint32_t output_path_id, const InternalKey* begin,
- const InternalKey* end, InternalKey** compaction_end,
- bool* manual_conflict);
+ uint32_t output_path_id, uint32_t max_subcompactions,
+ const InternalKey* begin, const InternalKey* end,
+ InternalKey** compaction_end, bool* manual_conflict);
CompactionPicker* compaction_picker() { return compaction_picker_.get(); }
// thread-safe
// As argument takes a pointer to allocated SuperVersion to enable
// the clients to allocate SuperVersion outside of mutex.
// IMPORTANT: Only call this from DBImpl::InstallSuperVersion()
- SuperVersion* InstallSuperVersion(SuperVersion* new_superversion,
- InstrumentedMutex* db_mutex,
- const MutableCFOptions& mutable_cf_options);
- SuperVersion* InstallSuperVersion(SuperVersion* new_superversion,
- InstrumentedMutex* db_mutex);
+ void InstallSuperVersion(SuperVersionContext* sv_context,
+ InstrumentedMutex* db_mutex,
+ const MutableCFOptions& mutable_cf_options);
+ void InstallSuperVersion(SuperVersionContext* sv_context,
+ InstrumentedMutex* db_mutex);
void ResetThreadLocalSuperVersions();
// Protected by DB mutex
- void set_pending_flush(bool value) { pending_flush_ = value; }
- void set_pending_compaction(bool value) { pending_compaction_ = value; }
- bool pending_flush() { return pending_flush_; }
- bool pending_compaction() { return pending_compaction_; }
+ void set_queued_for_flush(bool value) { queued_for_flush_ = value; }
+ void set_queued_for_compaction(bool value) { queued_for_compaction_ = value; }
+ bool queued_for_flush() { return queued_for_flush_; }
+ bool queued_for_compaction() { return queued_for_compaction_; }
+
+ enum class WriteStallCause {
+ kNone,
+ kMemtableLimit,
+ kL0FileCountLimit,
+ kPendingCompactionBytes,
+ };
+ static std::pair<WriteStallCondition, WriteStallCause>
+ GetWriteStallConditionAndCause(int num_unflushed_memtables, int num_l0_files,
+ uint64_t num_compaction_needed_bytes,
+ const MutableCFOptions& mutable_cf_options);
// Recalculate some small conditions, which are changed only during
// compaction, adding new memtable and/or
// recalculation of compaction score. These values are used in
// DBImpl::MakeRoomForWrite function to decide, if it need to make
// a write stall
- void RecalculateWriteStallConditions(
+ WriteStallCondition RecalculateWriteStallConditions(
const MutableCFOptions& mutable_cf_options);
+ void set_initialized() { initialized_.store(true); }
+
+ bool initialized() const { return initialized_.load(); }
+
+ const ColumnFamilyOptions& initial_cf_options() {
+ return initial_cf_options_;
+ }
+
+ Env::WriteLifeTimeHint CalculateSSTWriteHint(int level);
+
+ Status AddDirectories();
+
+ Directory* GetDataDir(size_t path_id) const;
+
private:
friend class ColumnFamilySet;
ColumnFamilyData(uint32_t id, const std::string& name,
Version* current_; // == dummy_versions->prev_
std::atomic<int> refs_; // outstanding references to ColumnFamilyData
- bool dropped_; // true if client dropped it
+ std::atomic<bool> initialized_;
+ std::atomic<bool> dropped_; // true if client dropped it
const InternalKeyComparator internal_comparator_;
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
// recovered from
uint64_t log_number_;
+ std::atomic<FlushReason> flush_reason_;
+
// An object that keeps all the compaction stats
// and picks the next compaction
std::unique_ptr<CompactionPicker> compaction_picker_;
std::unique_ptr<WriteControllerToken> write_controller_token_;
// If true --> this ColumnFamily is currently present in DBImpl::flush_queue_
- bool pending_flush_;
+ bool queued_for_flush_;
// If true --> this ColumnFamily is currently present in
// DBImpl::compaction_queue_
- bool pending_compaction_;
+ bool queued_for_compaction_;
uint64_t prev_compaction_needed_bytes_;
// if the database was opened with 2pc enabled
bool allow_2pc_;
+
+ // Memtable id to track flush.
+ std::atomic<uint64_t> last_memtable_id_;
+
+ // Directories corresponding to cf_paths.
+ std::vector<std::unique_ptr<Directory>> data_dirs_;
};
// ColumnFamilySet has interesting thread-safety requirements