1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "db/flush_job.h"
17 #include "db/builder.h"
18 #include "db/db_iter.h"
19 #include "db/dbformat.h"
20 #include "db/event_helpers.h"
21 #include "db/log_reader.h"
22 #include "db/log_writer.h"
23 #include "db/memtable.h"
24 #include "db/memtable_list.h"
25 #include "db/merge_context.h"
26 #include "db/range_tombstone_fragmenter.h"
27 #include "db/version_set.h"
28 #include "file/file_util.h"
29 #include "file/filename.h"
30 #include "logging/event_logger.h"
31 #include "logging/log_buffer.h"
32 #include "logging/logging.h"
33 #include "monitoring/iostats_context_imp.h"
34 #include "monitoring/perf_context_imp.h"
35 #include "monitoring/thread_status_util.h"
36 #include "port/port.h"
37 #include "rocksdb/db.h"
38 #include "rocksdb/env.h"
39 #include "rocksdb/statistics.h"
40 #include "rocksdb/status.h"
41 #include "rocksdb/table.h"
42 #include "table/merging_iterator.h"
43 #include "table/table_builder.h"
44 #include "table/two_level_iterator.h"
45 #include "test_util/sync_point.h"
46 #include "util/coding.h"
47 #include "util/mutexlock.h"
48 #include "util/stop_watch.h"
50 namespace ROCKSDB_NAMESPACE
{
52 const char* GetFlushReasonString (FlushReason flush_reason
) {
53 switch (flush_reason
) {
54 case FlushReason::kOthers
:
55 return "Other Reasons";
56 case FlushReason::kGetLiveFiles
:
57 return "Get Live Files";
58 case FlushReason::kShutDown
:
60 case FlushReason::kExternalFileIngestion
:
61 return "External File Ingestion";
62 case FlushReason::kManualCompaction
:
63 return "Manual Compaction";
64 case FlushReason::kWriteBufferManager
:
65 return "Write Buffer Manager";
66 case FlushReason::kWriteBufferFull
:
67 return "Write Buffer Full";
68 case FlushReason::kTest
:
70 case FlushReason::kDeleteFiles
:
71 return "Delete Files";
72 case FlushReason::kAutoCompaction
:
73 return "Auto Compaction";
74 case FlushReason::kManualFlush
:
75 return "Manual Flush";
76 case FlushReason::kErrorRecovery
:
77 return "Error Recovery";
84 const std::string
& dbname
, ColumnFamilyData
* cfd
,
85 const ImmutableDBOptions
& db_options
,
86 const MutableCFOptions
& mutable_cf_options
, const uint64_t* max_memtable_id
,
87 const FileOptions
& file_options
, VersionSet
* versions
,
88 InstrumentedMutex
* db_mutex
, std::atomic
<bool>* shutting_down
,
89 std::vector
<SequenceNumber
> existing_snapshots
,
90 SequenceNumber earliest_write_conflict_snapshot
,
91 SnapshotChecker
* snapshot_checker
, JobContext
* job_context
,
92 LogBuffer
* log_buffer
, FSDirectory
* db_directory
,
93 FSDirectory
* output_file_directory
, CompressionType output_compression
,
94 Statistics
* stats
, EventLogger
* event_logger
, bool measure_io_stats
,
95 const bool sync_output_directory
, const bool write_manifest
,
96 Env::Priority thread_pri
, const std::shared_ptr
<IOTracer
>& io_tracer
,
97 const std::string
& db_id
, const std::string
& db_session_id
,
98 std::string full_history_ts_low
)
101 db_session_id_(db_session_id
),
103 db_options_(db_options
),
104 mutable_cf_options_(mutable_cf_options
),
105 max_memtable_id_(max_memtable_id
),
106 file_options_(file_options
),
109 shutting_down_(shutting_down
),
110 existing_snapshots_(std::move(existing_snapshots
)),
111 earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot
),
112 snapshot_checker_(snapshot_checker
),
113 job_context_(job_context
),
114 log_buffer_(log_buffer
),
115 db_directory_(db_directory
),
116 output_file_directory_(output_file_directory
),
117 output_compression_(output_compression
),
119 event_logger_(event_logger
),
120 measure_io_stats_(measure_io_stats
),
121 sync_output_directory_(sync_output_directory
),
122 write_manifest_(write_manifest
),
125 pick_memtable_called(false),
126 thread_pri_(thread_pri
),
127 io_tracer_(io_tracer
),
128 full_history_ts_low_(std::move(full_history_ts_low
)) {
129 // Update the thread status to indicate flush.
130 ReportStartedFlush();
131 TEST_SYNC_POINT("FlushJob::FlushJob()");
134 FlushJob::~FlushJob() {
135 io_status_
.PermitUncheckedError();
136 ThreadStatusUtil::ResetThreadStatus();
139 void FlushJob::ReportStartedFlush() {
140 ThreadStatusUtil::SetColumnFamily(cfd_
, cfd_
->ioptions()->env
,
141 db_options_
.enable_thread_tracking
);
142 ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH
);
143 ThreadStatusUtil::SetThreadOperationProperty(
144 ThreadStatus::COMPACTION_JOB_ID
,
145 job_context_
->job_id
);
146 IOSTATS_RESET(bytes_written
);
149 void FlushJob::ReportFlushInputSize(const autovector
<MemTable
*>& mems
) {
150 uint64_t input_size
= 0;
151 for (auto* mem
: mems
) {
152 input_size
+= mem
->ApproximateMemoryUsage();
154 ThreadStatusUtil::IncreaseThreadOperationProperty(
155 ThreadStatus::FLUSH_BYTES_MEMTABLES
,
159 void FlushJob::RecordFlushIOStats() {
160 RecordTick(stats_
, FLUSH_WRITE_BYTES
, IOSTATS(bytes_written
));
161 ThreadStatusUtil::IncreaseThreadOperationProperty(
162 ThreadStatus::FLUSH_BYTES_WRITTEN
, IOSTATS(bytes_written
));
163 IOSTATS_RESET(bytes_written
);
166 void FlushJob::PickMemTable() {
167 db_mutex_
->AssertHeld();
168 assert(!pick_memtable_called
);
169 pick_memtable_called
= true;
170 // Save the contents of the earliest memtable as a new Table
171 cfd_
->imm()->PickMemtablesToFlush(max_memtable_id_
, &mems_
);
176 ReportFlushInputSize(mems_
);
178 // entries mems are (implicitly) sorted in ascending order by their created
179 // time. We will use the first memtable's `edit` to keep the meta info for
181 MemTable
* m
= mems_
[0];
182 edit_
= m
->GetEdits();
183 edit_
->SetPrevLogNumber(0);
184 // SetLogNumber(log_num) indicates logs with number smaller than log_num
185 // will no longer be picked up for recovery.
186 edit_
->SetLogNumber(mems_
.back()->GetNextLogNumber());
187 edit_
->SetColumnFamily(cfd_
->GetID());
189 // path 0 for level 0 file.
190 meta_
.fd
= FileDescriptor(versions_
->NewFileNumber(), 0, 0);
192 base_
= cfd_
->current();
193 base_
->Ref(); // it is likely that we do not need this reference
196 Status
FlushJob::Run(LogsWithPrepTracker
* prep_tracker
,
197 FileMetaData
* file_meta
) {
198 TEST_SYNC_POINT("FlushJob::Start");
199 db_mutex_
->AssertHeld();
200 assert(pick_memtable_called
);
201 AutoThreadOperationStageUpdater
stage_run(
202 ThreadStatus::STAGE_FLUSH_RUN
);
204 ROCKS_LOG_BUFFER(log_buffer_
, "[%s] Nothing in memtable to flush",
205 cfd_
->GetName().c_str());
209 // I/O measurement variables
210 PerfLevel prev_perf_level
= PerfLevel::kEnableTime
;
211 uint64_t prev_write_nanos
= 0;
212 uint64_t prev_fsync_nanos
= 0;
213 uint64_t prev_range_sync_nanos
= 0;
214 uint64_t prev_prepare_write_nanos
= 0;
215 uint64_t prev_cpu_write_nanos
= 0;
216 uint64_t prev_cpu_read_nanos
= 0;
217 if (measure_io_stats_
) {
218 prev_perf_level
= GetPerfLevel();
219 SetPerfLevel(PerfLevel::kEnableTime
);
220 prev_write_nanos
= IOSTATS(write_nanos
);
221 prev_fsync_nanos
= IOSTATS(fsync_nanos
);
222 prev_range_sync_nanos
= IOSTATS(range_sync_nanos
);
223 prev_prepare_write_nanos
= IOSTATS(prepare_write_nanos
);
224 prev_cpu_write_nanos
= IOSTATS(cpu_write_nanos
);
225 prev_cpu_read_nanos
= IOSTATS(cpu_read_nanos
);
228 // This will release and re-acquire the mutex.
229 Status s
= WriteLevel0Table();
231 if (s
.ok() && cfd_
->IsDropped()) {
232 s
= Status::ColumnFamilyDropped("Column family dropped during compaction");
234 if ((s
.ok() || s
.IsColumnFamilyDropped()) &&
235 shutting_down_
->load(std::memory_order_acquire
)) {
236 s
= Status::ShutdownInProgress("Database shutdown");
240 cfd_
->imm()->RollbackMemtableFlush(mems_
, meta_
.fd
.GetNumber());
241 } else if (write_manifest_
) {
242 TEST_SYNC_POINT("FlushJob::InstallResults");
243 // Replace immutable memtable with the generated Table
245 s
= cfd_
->imm()->TryInstallMemtableFlushResults(
246 cfd_
, mutable_cf_options_
, mems_
, prep_tracker
, versions_
, db_mutex_
,
247 meta_
.fd
.GetNumber(), &job_context_
->memtables_to_free
, db_directory_
,
248 log_buffer_
, &committed_flush_jobs_info_
, &tmp_io_s
);
249 if (!tmp_io_s
.ok()) {
250 io_status_
= tmp_io_s
;
254 if (s
.ok() && file_meta
!= nullptr) {
257 RecordFlushIOStats();
259 // When measure_io_stats_ is true, the default 512 bytes is not enough.
260 auto stream
= event_logger_
->LogToBuffer(log_buffer_
, 1024);
261 stream
<< "job" << job_context_
->job_id
<< "event"
263 stream
<< "output_compression"
264 << CompressionTypeToString(output_compression_
);
265 stream
<< "lsm_state";
267 auto vstorage
= cfd_
->current()->storage_info();
268 for (int level
= 0; level
< vstorage
->num_levels(); ++level
) {
269 stream
<< vstorage
->NumLevelFiles(level
);
273 const auto& blob_files
= vstorage
->GetBlobFiles();
274 if (!blob_files
.empty()) {
275 stream
<< "blob_file_head" << blob_files
.begin()->first
;
276 stream
<< "blob_file_tail" << blob_files
.rbegin()->first
;
279 stream
<< "immutable_memtables" << cfd_
->imm()->NumNotFlushed();
281 if (measure_io_stats_
) {
282 if (prev_perf_level
!= PerfLevel::kEnableTime
) {
283 SetPerfLevel(prev_perf_level
);
285 stream
<< "file_write_nanos" << (IOSTATS(write_nanos
) - prev_write_nanos
);
286 stream
<< "file_range_sync_nanos"
287 << (IOSTATS(range_sync_nanos
) - prev_range_sync_nanos
);
288 stream
<< "file_fsync_nanos" << (IOSTATS(fsync_nanos
) - prev_fsync_nanos
);
289 stream
<< "file_prepare_write_nanos"
290 << (IOSTATS(prepare_write_nanos
) - prev_prepare_write_nanos
);
291 stream
<< "file_cpu_write_nanos"
292 << (IOSTATS(cpu_write_nanos
) - prev_cpu_write_nanos
);
293 stream
<< "file_cpu_read_nanos"
294 << (IOSTATS(cpu_read_nanos
) - prev_cpu_read_nanos
);
300 void FlushJob::Cancel() {
301 db_mutex_
->AssertHeld();
302 assert(base_
!= nullptr);
306 Status
FlushJob::WriteLevel0Table() {
307 AutoThreadOperationStageUpdater
stage_updater(
308 ThreadStatus::STAGE_FLUSH_WRITE_L0
);
309 db_mutex_
->AssertHeld();
310 const uint64_t start_micros
= db_options_
.env
->NowMicros();
311 const uint64_t start_cpu_micros
= db_options_
.env
->NowCPUNanos() / 1000;
314 std::vector
<BlobFileAddition
> blob_file_additions
;
317 auto write_hint
= cfd_
->CalculateSSTWriteHint(0);
320 log_buffer_
->FlushBufferToLog();
322 // memtables and range_del_iters store internal iterators over each data
323 // memtable and its associated range deletion memtable, respectively, at
324 // corresponding indexes.
325 std::vector
<InternalIterator
*> memtables
;
326 std::vector
<std::unique_ptr
<FragmentedRangeTombstoneIterator
>>
329 ro
.total_order_seek
= true;
331 uint64_t total_num_entries
= 0, total_num_deletes
= 0;
332 uint64_t total_data_size
= 0;
333 size_t total_memory_usage
= 0;
334 for (MemTable
* m
: mems_
) {
336 db_options_
.info_log
,
337 "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64
"\n",
338 cfd_
->GetName().c_str(), job_context_
->job_id
, m
->GetNextLogNumber());
339 memtables
.push_back(m
->NewIterator(ro
, &arena
));
340 auto* range_del_iter
=
341 m
->NewRangeTombstoneIterator(ro
, kMaxSequenceNumber
);
342 if (range_del_iter
!= nullptr) {
343 range_del_iters
.emplace_back(range_del_iter
);
345 total_num_entries
+= m
->num_entries();
346 total_num_deletes
+= m
->num_deletes();
347 total_data_size
+= m
->get_data_size();
348 total_memory_usage
+= m
->ApproximateMemoryUsage();
351 event_logger_
->Log() << "job" << job_context_
->job_id
<< "event"
353 << "num_memtables" << mems_
.size() << "num_entries"
354 << total_num_entries
<< "num_deletes"
355 << total_num_deletes
<< "total_data_size"
356 << total_data_size
<< "memory_usage"
357 << total_memory_usage
<< "flush_reason"
358 << GetFlushReasonString(cfd_
->GetFlushReason());
361 ScopedArenaIterator
iter(
362 NewMergingIterator(&cfd_
->internal_comparator(), &memtables
[0],
363 static_cast<int>(memtables
.size()), &arena
));
364 ROCKS_LOG_INFO(db_options_
.info_log
,
365 "[%s] [JOB %d] Level-0 flush table #%" PRIu64
": started",
366 cfd_
->GetName().c_str(), job_context_
->job_id
,
367 meta_
.fd
.GetNumber());
369 TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
370 &output_compression_
);
371 int64_t _current_time
= 0;
372 auto status
= db_options_
.env
->GetCurrentTime(&_current_time
);
373 // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
376 db_options_
.info_log
,
377 "Failed to get current time to populate creation_time property. "
379 status
.ToString().c_str());
381 const uint64_t current_time
= static_cast<uint64_t>(_current_time
);
383 uint64_t oldest_key_time
=
384 mems_
.front()->ApproximateOldestKeyTime();
386 // It's not clear whether oldest_key_time is always available. In case
387 // it is not available, use current_time.
388 uint64_t oldest_ancester_time
= std::min(current_time
, oldest_key_time
);
390 TEST_SYNC_POINT_CALLBACK(
391 "FlushJob::WriteLevel0Table:oldest_ancester_time",
392 &oldest_ancester_time
);
393 meta_
.oldest_ancester_time
= oldest_ancester_time
;
395 meta_
.file_creation_time
= current_time
;
397 uint64_t creation_time
= (cfd_
->ioptions()->compaction_style
==
398 CompactionStyle::kCompactionStyleFIFO
)
400 : meta_
.oldest_ancester_time
;
403 const std::string
* const full_history_ts_low
=
404 (full_history_ts_low_
.empty()) ? nullptr : &full_history_ts_low_
;
406 dbname_
, versions_
, db_options_
, *cfd_
->ioptions(),
407 mutable_cf_options_
, file_options_
, cfd_
->table_cache(), iter
.get(),
408 std::move(range_del_iters
), &meta_
, &blob_file_additions
,
409 cfd_
->internal_comparator(), cfd_
->int_tbl_prop_collector_factories(),
410 cfd_
->GetID(), cfd_
->GetName(), existing_snapshots_
,
411 earliest_write_conflict_snapshot_
, snapshot_checker_
,
412 output_compression_
, mutable_cf_options_
.sample_for_compression
,
413 mutable_cf_options_
.compression_opts
,
414 mutable_cf_options_
.paranoid_file_checks
, cfd_
->internal_stats(),
415 TableFileCreationReason::kFlush
, &io_s
, io_tracer_
, event_logger_
,
416 job_context_
->job_id
, Env::IO_HIGH
, &table_properties_
, 0 /* level */,
417 creation_time
, oldest_key_time
, write_hint
, current_time
, db_id_
,
418 db_session_id_
, full_history_ts_low
);
422 LogFlush(db_options_
.info_log
);
424 ROCKS_LOG_INFO(db_options_
.info_log
,
425 "[%s] [JOB %d] Level-0 flush table #%" PRIu64
": %" PRIu64
428 cfd_
->GetName().c_str(), job_context_
->job_id
,
429 meta_
.fd
.GetNumber(), meta_
.fd
.GetFileSize(),
430 s
.ToString().c_str(),
431 meta_
.marked_for_compaction
? " (needs compaction)" : "");
433 if (s
.ok() && output_file_directory_
!= nullptr && sync_output_directory_
) {
434 s
= output_file_directory_
->Fsync(IOOptions(), nullptr);
436 TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_
);
441 // Note that if file_size is zero, the file has been deleted and
442 // should not be added to the manifest.
443 const bool has_output
= meta_
.fd
.GetFileSize() > 0;
445 if (s
.ok() && has_output
) {
446 // if we have more than 1 background thread, then we cannot
447 // insert files directly into higher levels because some other
448 // threads could be concurrently producing compacted files for
451 edit_
->AddFile(0 /* level */, meta_
.fd
.GetNumber(), meta_
.fd
.GetPathId(),
452 meta_
.fd
.GetFileSize(), meta_
.smallest
, meta_
.largest
,
453 meta_
.fd
.smallest_seqno
, meta_
.fd
.largest_seqno
,
454 meta_
.marked_for_compaction
, meta_
.oldest_blob_file_number
,
455 meta_
.oldest_ancester_time
, meta_
.file_creation_time
,
456 meta_
.file_checksum
, meta_
.file_checksum_func_name
);
458 edit_
->SetBlobFileAdditions(std::move(blob_file_additions
));
461 // Piggyback FlushJobInfo on the first first flushed memtable.
462 mems_
[0]->SetFlushJobInfo(GetFlushJobInfo());
463 #endif // !ROCKSDB_LITE
465 // Note that here we treat flush as level 0 compaction in internal stats
466 InternalStats::CompactionStats
stats(CompactionReason::kFlush
, 1);
467 stats
.micros
= db_options_
.env
->NowMicros() - start_micros
;
468 stats
.cpu_micros
= db_options_
.env
->NowCPUNanos() / 1000 - start_cpu_micros
;
471 stats
.bytes_written
= meta_
.fd
.GetFileSize();
472 stats
.num_output_files
= 1;
475 const auto& blobs
= edit_
->GetBlobFileAdditions();
476 for (const auto& blob
: blobs
) {
477 stats
.bytes_written
+= blob
.GetTotalBlobBytes();
480 stats
.num_output_files
+= static_cast<int>(blobs
.size());
482 RecordTimeToHistogram(stats_
, FLUSH_TIME
, stats
.micros
);
483 cfd_
->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_
, stats
);
484 cfd_
->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED
,
485 stats
.bytes_written
);
486 RecordFlushIOStats();
491 std::unique_ptr
<FlushJobInfo
> FlushJob::GetFlushJobInfo() const {
492 db_mutex_
->AssertHeld();
493 std::unique_ptr
<FlushJobInfo
> info(new FlushJobInfo
{});
494 info
->cf_id
= cfd_
->GetID();
495 info
->cf_name
= cfd_
->GetName();
497 const uint64_t file_number
= meta_
.fd
.GetNumber();
499 MakeTableFileName(cfd_
->ioptions()->cf_paths
[0].path
, file_number
);
500 info
->file_number
= file_number
;
501 info
->oldest_blob_file_number
= meta_
.oldest_blob_file_number
;
502 info
->thread_id
= db_options_
.env
->GetThreadID();
503 info
->job_id
= job_context_
->job_id
;
504 info
->smallest_seqno
= meta_
.fd
.smallest_seqno
;
505 info
->largest_seqno
= meta_
.fd
.largest_seqno
;
506 info
->table_properties
= table_properties_
;
507 info
->flush_reason
= cfd_
->GetFlushReason();
510 #endif // !ROCKSDB_LITE
512 } // namespace ROCKSDB_NAMESPACE