#include "db/flush_job.h"
-#ifndef __STDC_FORMAT_MACROS
-#define __STDC_FORMAT_MACROS
-#endif
-
-#include <inttypes.h>
+#include <cinttypes>
#include <algorithm>
#include <vector>
#include "db/merge_context.h"
#include "db/range_tombstone_fragmenter.h"
#include "db/version_set.h"
+#include "file/file_util.h"
+#include "file/filename.h"
+#include "logging/event_logger.h"
+#include "logging/log_buffer.h"
+#include "logging/logging.h"
#include "monitoring/iostats_context_imp.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/thread_status_util.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
-#include "table/block.h"
-#include "table/block_based_table_factory.h"
+#include "table/block_based/block.h"
+#include "table/block_based/block_based_table_factory.h"
#include "table/merging_iterator.h"
#include "table/table_builder.h"
#include "table/two_level_iterator.h"
+#include "test_util/sync_point.h"
#include "util/coding.h"
-#include "util/event_logger.h"
-#include "util/file_util.h"
-#include "util/filename.h"
-#include "util/log_buffer.h"
-#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/stop_watch.h"
-#include "util/sync_point.h"
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
const char* GetFlushReasonString (FlushReason flush_reason) {
switch (flush_reason) {
const ImmutableDBOptions& db_options,
const MutableCFOptions& mutable_cf_options,
const uint64_t* max_memtable_id,
- const EnvOptions& env_options, VersionSet* versions,
+ const FileOptions& file_options, VersionSet* versions,
InstrumentedMutex* db_mutex,
std::atomic<bool>* shutting_down,
std::vector<SequenceNumber> existing_snapshots,
db_options_(db_options),
mutable_cf_options_(mutable_cf_options),
max_memtable_id_(max_memtable_id),
- env_options_(env_options),
+ file_options_(file_options),
versions_(versions),
db_mutex_(db_mutex),
shutting_down_(shutting_down),
// This will release and re-acquire the mutex.
Status s = WriteLevel0Table();
- if (s.ok() &&
- (shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) {
- s = Status::ShutdownInProgress(
- "Database shutdown or Column family drop during flush");
+ if (s.ok() && cfd_->IsDropped()) {
+ s = Status::ColumnFamilyDropped("Column family dropped during compaction");
+ }
+ if ((s.ok() || s.IsColumnFamilyDropped()) &&
+ shutting_down_->load(std::memory_order_acquire)) {
+ s = Status::ShutdownInProgress("Database shutdown");
}
if (!s.ok()) {
s = cfd_->imm()->TryInstallMemtableFlushResults(
cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
- log_buffer_);
+ log_buffer_, &committed_flush_jobs_info_);
}
if (s.ok() && file_meta != nullptr) {
}
RecordFlushIOStats();
- auto stream = event_logger_->LogToBuffer(log_buffer_);
+ // When measure_io_stats_ is true, the default 512 bytes is not enough.
+ auto stream = event_logger_->LogToBuffer(log_buffer_, 1024);
stream << "job" << job_context_->job_id << "event"
<< "flush_finished";
stream << "output_compression"
uint64_t oldest_key_time =
mems_.front()->ApproximateOldestKeyTime();
+ // It's not clear whether oldest_key_time is always available. In case
+ // it is not available, use current_time.
+ meta_.oldest_ancester_time = std::min(current_time, oldest_key_time);
+ meta_.file_creation_time = current_time;
+
+ uint64_t creation_time = (cfd_->ioptions()->compaction_style ==
+ CompactionStyle::kCompactionStyleFIFO)
+ ? current_time
+ : meta_.oldest_ancester_time;
+
s = BuildTable(
- dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
- env_options_, cfd_->table_cache(), iter.get(),
+ dbname_, db_options_.env, db_options_.fs.get(), *cfd_->ioptions(),
+ mutable_cf_options_, file_options_, cfd_->table_cache(), iter.get(),
std::move(range_del_iters), &meta_, cfd_->internal_comparator(),
cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
cfd_->GetName(), existing_snapshots_,
cfd_->ioptions()->compression_opts,
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
- Env::IO_HIGH, &table_properties_, 0 /* level */, current_time,
- oldest_key_time, write_hint);
+ Env::IO_HIGH, &table_properties_, 0 /* level */,
+ creation_time, oldest_key_time, write_hint, current_time);
LogFlush(db_options_.info_log);
}
ROCKS_LOG_INFO(db_options_.info_log,
if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) {
s = output_file_directory_->Fsync();
}
- TEST_SYNC_POINT("FlushJob::WriteLevel0Table");
+ TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_);
db_mutex_->Lock();
}
base_->Unref();
edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
- meta_.marked_for_compaction);
+ meta_.marked_for_compaction, meta_.oldest_blob_file_number,
+ meta_.oldest_ancester_time, meta_.file_creation_time,
+ meta_.file_checksum, meta_.file_checksum_func_name);
}
+#ifndef ROCKSDB_LITE
+ // Piggyback FlushJobInfo on the first first flushed memtable.
+ mems_[0]->SetFlushJobInfo(GetFlushJobInfo());
+#endif // !ROCKSDB_LITE
// Note that here we treat flush as level 0 compaction in internal stats
InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
return s;
}
-} // namespace rocksdb
+#ifndef ROCKSDB_LITE
+std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const {
+ db_mutex_->AssertHeld();
+ std::unique_ptr<FlushJobInfo> info(new FlushJobInfo{});
+ info->cf_id = cfd_->GetID();
+ info->cf_name = cfd_->GetName();
+
+ const uint64_t file_number = meta_.fd.GetNumber();
+ info->file_path =
+ MakeTableFileName(cfd_->ioptions()->cf_paths[0].path, file_number);
+ info->file_number = file_number;
+ info->oldest_blob_file_number = meta_.oldest_blob_file_number;
+ info->thread_id = db_options_.env->GetThreadID();
+ info->job_id = job_context_->job_id;
+ info->smallest_seqno = meta_.fd.smallest_seqno;
+ info->largest_seqno = meta_.fd.largest_seqno;
+ info->table_properties = table_properties_;
+ info->flush_reason = cfd_->GetFlushReason();
+ return info;
+}
+#endif // !ROCKSDB_LITE
+
+} // namespace ROCKSDB_NAMESPACE