// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
-// This source code is licensed under the BSD-style license found in the
-// LICENSE file in the root directory of this source tree. An additional grant
-// of patent rights can be found in the PATENTS file in the same directory.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
#include "monitoring/iostats_context_imp.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/thread_status_util.h"
-#include "port/likely.h"
#include "port/port.h"
#include "db/memtable.h"
#include "rocksdb/db.h"
namespace rocksdb {
+const char* GetFlushReasonString (FlushReason flush_reason) {
+ switch (flush_reason) {
+ case FlushReason::kOthers:
+ return "Other Reasons";
+ case FlushReason::kGetLiveFiles:
+ return "Get Live Files";
+ case FlushReason::kShutDown:
+ return "Shut down";
+ case FlushReason::kExternalFileIngestion:
+ return "External File Ingestion";
+ case FlushReason::kManualCompaction:
+ return "Manual Compaction";
+ case FlushReason::kWriteBufferManager:
+ return "Write Buffer Manager";
+ case FlushReason::kWriteBufferFull:
+ return "Write Buffer Full";
+ case FlushReason::kTest:
+ return "Test";
+ case FlushReason::kDeleteFiles:
+ return "Delete Files";
+ case FlushReason::kAutoCompaction:
+ return "Auto Compaction";
+ case FlushReason::kManualFlush:
+ return "Manual Flush";
+ case FlushReason::kErrorRecovery:
+ return "Error Recovery";
+ default:
+ return "Invalid";
+ }
+}
+
+
FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
const ImmutableDBOptions& db_options,
const MutableCFOptions& mutable_cf_options,
- const EnvOptions& env_options, VersionSet* versions,
+ const EnvOptions env_options, VersionSet* versions,
InstrumentedMutex* db_mutex,
std::atomic<bool>* shutting_down,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
- JobContext* job_context, LogBuffer* log_buffer,
- Directory* db_directory, Directory* output_file_directory,
+ SnapshotChecker* snapshot_checker, JobContext* job_context,
+ LogBuffer* log_buffer, Directory* db_directory,
+ Directory* output_file_directory,
CompressionType output_compression, Statistics* stats,
EventLogger* event_logger, bool measure_io_stats)
: dbname_(dbname),
shutting_down_(shutting_down),
existing_snapshots_(std::move(existing_snapshots)),
earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
+ snapshot_checker_(snapshot_checker),
job_context_(job_context),
log_buffer_(log_buffer),
db_directory_(db_directory),
stats_(stats),
event_logger_(event_logger),
measure_io_stats_(measure_io_stats),
+ edit_(nullptr),
+ base_(nullptr),
pick_memtable_called(false) {
// Update the thread status to indicate flush.
ReportStartedFlush();
base_->Ref(); // it is likely that we do not need this reference
}
-Status FlushJob::Run(FileMetaData* file_meta) {
+Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
+ FileMetaData* file_meta) {
+ TEST_SYNC_POINT("FlushJob::Start");
db_mutex_->AssertHeld();
assert(pick_memtable_called);
AutoThreadOperationStageUpdater stage_run(
TEST_SYNC_POINT("FlushJob::InstallResults");
// Replace immutable memtable with the generated Table
s = cfd_->imm()->InstallMemtableFlushResults(
- cfd_, mutable_cf_options_, mems_, versions_, db_mutex_,
+ cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
log_buffer_);
}
auto stream = event_logger_->LogToBuffer(log_buffer_);
stream << "job" << job_context_->job_id << "event"
<< "flush_finished";
+ stream << "output_compression"
+ << CompressionTypeToString(output_compression_);
stream << "lsm_state";
stream.StartArray();
auto vstorage = cfd_->current()->storage_info();
const uint64_t start_micros = db_options_.env->NowMicros();
Status s;
{
+ auto write_hint = cfd_->CalculateSSTWriteHint(0);
db_mutex_->Unlock();
if (log_buffer_) {
log_buffer_->FlushBufferToLog();
total_memory_usage += m->ApproximateMemoryUsage();
}
- event_logger_->Log() << "job" << job_context_->job_id << "event"
- << "flush_started"
- << "num_memtables" << mems_.size() << "num_entries"
- << total_num_entries << "num_deletes"
- << total_num_deletes << "memory_usage"
- << total_memory_usage;
+ event_logger_->Log()
+ << "job" << job_context_->job_id << "event"
+ << "flush_started"
+ << "num_memtables" << mems_.size() << "num_entries" << total_num_entries
+ << "num_deletes" << total_num_deletes << "memory_usage"
+ << total_memory_usage << "flush_reason"
+ << GetFlushReasonString(cfd_->GetFlushReason());
{
ScopedArenaIterator iter(
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
&output_compression_);
- EnvOptions optimized_env_options =
- db_options_.env->OptimizeForCompactionTableWrite(env_options_, db_options_);
+ int64_t _current_time = 0;
+ auto status = db_options_.env->GetCurrentTime(&_current_time);
+ // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
+ if (!status.ok()) {
+ ROCKS_LOG_WARN(
+ db_options_.info_log,
+ "Failed to get current time to populate creation_time property. "
+ "Status: %s",
+ status.ToString().c_str());
+ }
+ const uint64_t current_time = static_cast<uint64_t>(_current_time);
+
+ uint64_t oldest_key_time =
+ mems_.front()->ApproximateOldestKeyTime();
+
s = BuildTable(
dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
- optimized_env_options, cfd_->table_cache(), iter.get(),
+ env_options_, cfd_->table_cache(), iter.get(),
std::move(range_del_iter), &meta_, cfd_->internal_comparator(),
cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
cfd_->GetName(), existing_snapshots_,
- earliest_write_conflict_snapshot_, output_compression_,
- cfd_->ioptions()->compression_opts,
+ earliest_write_conflict_snapshot_, snapshot_checker_,
+ output_compression_, cfd_->ioptions()->compression_opts,
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
- Env::IO_HIGH, &table_properties_, 0 /* level */);
+ Env::IO_HIGH, &table_properties_, 0 /* level */, current_time,
+ oldest_key_time, write_hint);
LogFlush(db_options_.info_log);
}
ROCKS_LOG_INFO(db_options_.info_log,
s.ToString().c_str(),
meta_.marked_for_compaction ? " (needs compaction)" : "");
- if (output_file_directory_ != nullptr) {
- output_file_directory_->Fsync();
+ if (s.ok() && output_file_directory_ != nullptr) {
+ s = output_file_directory_->Fsync();
}
TEST_SYNC_POINT("FlushJob::WriteLevel0Table");
db_mutex_->Lock();
// Add file to L0
edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
- meta_.smallest_seqno, meta_.largest_seqno,
+ meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
meta_.marked_for_compaction);
}
// Note that here we treat flush as level 0 compaction in internal stats
- InternalStats::CompactionStats stats(1);
+ InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
stats.micros = db_options_.env->NowMicros() - start_micros;
stats.bytes_written = meta_.fd.GetFileSize();
+ MeasureTime(stats_, FLUSH_TIME, stats.micros);
cfd_->internal_stats()->AddCompactionStats(0 /* level */, stats);
cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
meta_.fd.GetFileSize());