#include "db/event_helpers.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
+#include "db/memtable.h"
#include "db/memtable_list.h"
#include "db/merge_context.h"
+#include "db/range_tombstone_fragmenter.h"
#include "db/version_set.h"
#include "monitoring/iostats_context_imp.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/thread_status_util.h"
#include "port/port.h"
-#include "db/memtable.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/statistics.h"
}
}
-
FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
const ImmutableDBOptions& db_options,
const MutableCFOptions& mutable_cf_options,
- const EnvOptions env_options, VersionSet* versions,
+ const uint64_t* max_memtable_id,
+ const EnvOptions& env_options, VersionSet* versions,
InstrumentedMutex* db_mutex,
std::atomic<bool>* shutting_down,
std::vector<SequenceNumber> existing_snapshots,
LogBuffer* log_buffer, Directory* db_directory,
Directory* output_file_directory,
CompressionType output_compression, Statistics* stats,
- EventLogger* event_logger, bool measure_io_stats)
+ EventLogger* event_logger, bool measure_io_stats,
+ const bool sync_output_directory, const bool write_manifest,
+ Env::Priority thread_pri)
: dbname_(dbname),
cfd_(cfd),
db_options_(db_options),
mutable_cf_options_(mutable_cf_options),
+ max_memtable_id_(max_memtable_id),
env_options_(env_options),
versions_(versions),
db_mutex_(db_mutex),
stats_(stats),
event_logger_(event_logger),
measure_io_stats_(measure_io_stats),
+ sync_output_directory_(sync_output_directory),
+ write_manifest_(write_manifest),
edit_(nullptr),
base_(nullptr),
- pick_memtable_called(false) {
+ pick_memtable_called(false),
+ thread_pri_(thread_pri) {
// Update the thread status to indicate flush.
ReportStartedFlush();
TEST_SYNC_POINT("FlushJob::FlushJob()");
assert(!pick_memtable_called);
pick_memtable_called = true;
// Save the contents of the earliest memtable as a new Table
- cfd_->imm()->PickMemtablesToFlush(&mems_);
+ cfd_->imm()->PickMemtablesToFlush(max_memtable_id_, &mems_);
if (mems_.empty()) {
return;
}
uint64_t prev_fsync_nanos = 0;
uint64_t prev_range_sync_nanos = 0;
uint64_t prev_prepare_write_nanos = 0;
+ uint64_t prev_cpu_write_nanos = 0;
+ uint64_t prev_cpu_read_nanos = 0;
if (measure_io_stats_) {
prev_perf_level = GetPerfLevel();
SetPerfLevel(PerfLevel::kEnableTime);
prev_fsync_nanos = IOSTATS(fsync_nanos);
prev_range_sync_nanos = IOSTATS(range_sync_nanos);
prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
+ prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
+ prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
}
// This will release and re-acquire the mutex.
if (!s.ok()) {
cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
- } else {
+ } else if (write_manifest_) {
TEST_SYNC_POINT("FlushJob::InstallResults");
// Replace immutable memtable with the generated Table
- s = cfd_->imm()->InstallMemtableFlushResults(
+ s = cfd_->imm()->TryInstallMemtableFlushResults(
cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
log_buffer_);
stream << "file_fsync_nanos" << (IOSTATS(fsync_nanos) - prev_fsync_nanos);
stream << "file_prepare_write_nanos"
<< (IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos);
+ stream << "file_cpu_write_nanos"
+ << (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos);
+ stream << "file_cpu_read_nanos"
+ << (IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos);
}
return s;
ThreadStatus::STAGE_FLUSH_WRITE_L0);
db_mutex_->AssertHeld();
const uint64_t start_micros = db_options_.env->NowMicros();
+ const uint64_t start_cpu_micros = db_options_.env->NowCPUNanos() / 1000;
Status s;
{
auto write_hint = cfd_->CalculateSSTWriteHint(0);
// memtable and its associated range deletion memtable, respectively, at
// corresponding indexes.
std::vector<InternalIterator*> memtables;
- std::vector<InternalIterator*> range_del_iters;
+ std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
+ range_del_iters;
ReadOptions ro;
ro.total_order_seek = true;
Arena arena;
uint64_t total_num_entries = 0, total_num_deletes = 0;
+ uint64_t total_data_size = 0;
size_t total_memory_usage = 0;
for (MemTable* m : mems_) {
ROCKS_LOG_INFO(
"[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
memtables.push_back(m->NewIterator(ro, &arena));
- auto* range_del_iter = m->NewRangeTombstoneIterator(ro);
+ auto* range_del_iter =
+ m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
if (range_del_iter != nullptr) {
- range_del_iters.push_back(range_del_iter);
+ range_del_iters.emplace_back(range_del_iter);
}
total_num_entries += m->num_entries();
total_num_deletes += m->num_deletes();
+ total_data_size += m->get_data_size();
total_memory_usage += m->ApproximateMemoryUsage();
}
- event_logger_->Log()
- << "job" << job_context_->job_id << "event"
- << "flush_started"
- << "num_memtables" << mems_.size() << "num_entries" << total_num_entries
- << "num_deletes" << total_num_deletes << "memory_usage"
- << total_memory_usage << "flush_reason"
- << GetFlushReasonString(cfd_->GetFlushReason());
+ event_logger_->Log() << "job" << job_context_->job_id << "event"
+ << "flush_started"
+ << "num_memtables" << mems_.size() << "num_entries"
+ << total_num_entries << "num_deletes"
+ << total_num_deletes << "total_data_size"
+ << total_data_size << "memory_usage"
+ << total_memory_usage << "flush_reason"
+ << GetFlushReasonString(cfd_->GetFlushReason());
{
ScopedArenaIterator iter(
NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
static_cast<int>(memtables.size()), &arena));
- std::unique_ptr<InternalIterator> range_del_iter(NewMergingIterator(
- &cfd_->internal_comparator(),
- range_del_iters.empty() ? nullptr : &range_del_iters[0],
- static_cast<int>(range_del_iters.size())));
ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
cfd_->GetName().c_str(), job_context_->job_id,
s = BuildTable(
dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
env_options_, cfd_->table_cache(), iter.get(),
- std::move(range_del_iter), &meta_, cfd_->internal_comparator(),
+ std::move(range_del_iters), &meta_, cfd_->internal_comparator(),
cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
cfd_->GetName(), existing_snapshots_,
earliest_write_conflict_snapshot_, snapshot_checker_,
- output_compression_, cfd_->ioptions()->compression_opts,
+ output_compression_, mutable_cf_options_.sample_for_compression,
+ cfd_->ioptions()->compression_opts,
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
Env::IO_HIGH, &table_properties_, 0 /* level */, current_time,
s.ToString().c_str(),
meta_.marked_for_compaction ? " (needs compaction)" : "");
- if (s.ok() && output_file_directory_ != nullptr) {
+ if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) {
s = output_file_directory_->Fsync();
}
TEST_SYNC_POINT("FlushJob::WriteLevel0Table");
// Note that here we treat flush as level 0 compaction in internal stats
InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
stats.micros = db_options_.env->NowMicros() - start_micros;
+ stats.cpu_micros = db_options_.env->NowCPUNanos() / 1000 - start_cpu_micros;
stats.bytes_written = meta_.fd.GetFileSize();
- MeasureTime(stats_, FLUSH_TIME, stats.micros);
- cfd_->internal_stats()->AddCompactionStats(0 /* level */, stats);
+ RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros);
+ cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats);
cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
meta_.fd.GetFileSize());
RecordFlushIOStats();