1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "db/flush_job.h"
12 #ifndef __STDC_FORMAT_MACROS
13 #define __STDC_FORMAT_MACROS
21 #include "db/builder.h"
22 #include "db/db_iter.h"
23 #include "db/dbformat.h"
24 #include "db/event_helpers.h"
25 #include "db/log_reader.h"
26 #include "db/log_writer.h"
27 #include "db/memtable_list.h"
28 #include "db/merge_context.h"
29 #include "db/version_set.h"
30 #include "monitoring/iostats_context_imp.h"
31 #include "monitoring/perf_context_imp.h"
32 #include "monitoring/thread_status_util.h"
33 #include "port/port.h"
34 #include "db/memtable.h"
35 #include "rocksdb/db.h"
36 #include "rocksdb/env.h"
37 #include "rocksdb/statistics.h"
38 #include "rocksdb/status.h"
39 #include "rocksdb/table.h"
40 #include "table/block.h"
41 #include "table/block_based_table_factory.h"
42 #include "table/merging_iterator.h"
43 #include "table/table_builder.h"
44 #include "table/two_level_iterator.h"
45 #include "util/coding.h"
46 #include "util/event_logger.h"
47 #include "util/file_util.h"
48 #include "util/filename.h"
49 #include "util/log_buffer.h"
50 #include "util/logging.h"
51 #include "util/mutexlock.h"
52 #include "util/stop_watch.h"
53 #include "util/sync_point.h"
57 const char* GetFlushReasonString (FlushReason flush_reason
) {
58 switch (flush_reason
) {
59 case FlushReason::kOthers
:
60 return "Other Reasons";
61 case FlushReason::kGetLiveFiles
:
62 return "Get Live Files";
63 case FlushReason::kShutDown
:
65 case FlushReason::kExternalFileIngestion
:
66 return "External File Ingestion";
67 case FlushReason::kManualCompaction
:
68 return "Manual Compaction";
69 case FlushReason::kWriteBufferManager
:
70 return "Write Buffer Manager";
71 case FlushReason::kWriteBufferFull
:
72 return "Write Buffer Full";
73 case FlushReason::kTest
:
75 case FlushReason::kDeleteFiles
:
76 return "Delete Files";
77 case FlushReason::kAutoCompaction
:
78 return "Auto Compaction";
79 case FlushReason::kManualFlush
:
80 return "Manual Flush";
81 case FlushReason::kErrorRecovery
:
82 return "Error Recovery";
89 FlushJob::FlushJob(const std::string
& dbname
, ColumnFamilyData
* cfd
,
90 const ImmutableDBOptions
& db_options
,
91 const MutableCFOptions
& mutable_cf_options
,
92 const EnvOptions env_options
, VersionSet
* versions
,
93 InstrumentedMutex
* db_mutex
,
94 std::atomic
<bool>* shutting_down
,
95 std::vector
<SequenceNumber
> existing_snapshots
,
96 SequenceNumber earliest_write_conflict_snapshot
,
97 SnapshotChecker
* snapshot_checker
, JobContext
* job_context
,
98 LogBuffer
* log_buffer
, Directory
* db_directory
,
99 Directory
* output_file_directory
,
100 CompressionType output_compression
, Statistics
* stats
,
101 EventLogger
* event_logger
, bool measure_io_stats
)
104 db_options_(db_options
),
105 mutable_cf_options_(mutable_cf_options
),
106 env_options_(env_options
),
109 shutting_down_(shutting_down
),
110 existing_snapshots_(std::move(existing_snapshots
)),
111 earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot
),
112 snapshot_checker_(snapshot_checker
),
113 job_context_(job_context
),
114 log_buffer_(log_buffer
),
115 db_directory_(db_directory
),
116 output_file_directory_(output_file_directory
),
117 output_compression_(output_compression
),
119 event_logger_(event_logger
),
120 measure_io_stats_(measure_io_stats
),
123 pick_memtable_called(false) {
124 // Update the thread status to indicate flush.
125 ReportStartedFlush();
126 TEST_SYNC_POINT("FlushJob::FlushJob()");
129 FlushJob::~FlushJob() {
130 ThreadStatusUtil::ResetThreadStatus();
133 void FlushJob::ReportStartedFlush() {
134 ThreadStatusUtil::SetColumnFamily(cfd_
, cfd_
->ioptions()->env
,
135 db_options_
.enable_thread_tracking
);
136 ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH
);
137 ThreadStatusUtil::SetThreadOperationProperty(
138 ThreadStatus::COMPACTION_JOB_ID
,
139 job_context_
->job_id
);
140 IOSTATS_RESET(bytes_written
);
143 void FlushJob::ReportFlushInputSize(const autovector
<MemTable
*>& mems
) {
144 uint64_t input_size
= 0;
145 for (auto* mem
: mems
) {
146 input_size
+= mem
->ApproximateMemoryUsage();
148 ThreadStatusUtil::IncreaseThreadOperationProperty(
149 ThreadStatus::FLUSH_BYTES_MEMTABLES
,
153 void FlushJob::RecordFlushIOStats() {
154 RecordTick(stats_
, FLUSH_WRITE_BYTES
, IOSTATS(bytes_written
));
155 ThreadStatusUtil::IncreaseThreadOperationProperty(
156 ThreadStatus::FLUSH_BYTES_WRITTEN
, IOSTATS(bytes_written
));
157 IOSTATS_RESET(bytes_written
);
160 void FlushJob::PickMemTable() {
161 db_mutex_
->AssertHeld();
162 assert(!pick_memtable_called
);
163 pick_memtable_called
= true;
164 // Save the contents of the earliest memtable as a new Table
165 cfd_
->imm()->PickMemtablesToFlush(&mems_
);
170 ReportFlushInputSize(mems_
);
172 // entries mems are (implicitly) sorted in ascending order by their created
173 // time. We will use the first memtable's `edit` to keep the meta info for
175 MemTable
* m
= mems_
[0];
176 edit_
= m
->GetEdits();
177 edit_
->SetPrevLogNumber(0);
178 // SetLogNumber(log_num) indicates logs with number smaller than log_num
179 // will no longer be picked up for recovery.
180 edit_
->SetLogNumber(mems_
.back()->GetNextLogNumber());
181 edit_
->SetColumnFamily(cfd_
->GetID());
183 // path 0 for level 0 file.
184 meta_
.fd
= FileDescriptor(versions_
->NewFileNumber(), 0, 0);
186 base_
= cfd_
->current();
187 base_
->Ref(); // it is likely that we do not need this reference
190 Status
FlushJob::Run(LogsWithPrepTracker
* prep_tracker
,
191 FileMetaData
* file_meta
) {
192 TEST_SYNC_POINT("FlushJob::Start");
193 db_mutex_
->AssertHeld();
194 assert(pick_memtable_called
);
195 AutoThreadOperationStageUpdater
stage_run(
196 ThreadStatus::STAGE_FLUSH_RUN
);
198 ROCKS_LOG_BUFFER(log_buffer_
, "[%s] Nothing in memtable to flush",
199 cfd_
->GetName().c_str());
203 // I/O measurement variables
204 PerfLevel prev_perf_level
= PerfLevel::kEnableTime
;
205 uint64_t prev_write_nanos
= 0;
206 uint64_t prev_fsync_nanos
= 0;
207 uint64_t prev_range_sync_nanos
= 0;
208 uint64_t prev_prepare_write_nanos
= 0;
209 if (measure_io_stats_
) {
210 prev_perf_level
= GetPerfLevel();
211 SetPerfLevel(PerfLevel::kEnableTime
);
212 prev_write_nanos
= IOSTATS(write_nanos
);
213 prev_fsync_nanos
= IOSTATS(fsync_nanos
);
214 prev_range_sync_nanos
= IOSTATS(range_sync_nanos
);
215 prev_prepare_write_nanos
= IOSTATS(prepare_write_nanos
);
218 // This will release and re-acquire the mutex.
219 Status s
= WriteLevel0Table();
222 (shutting_down_
->load(std::memory_order_acquire
) || cfd_
->IsDropped())) {
223 s
= Status::ShutdownInProgress(
224 "Database shutdown or Column family drop during flush");
228 cfd_
->imm()->RollbackMemtableFlush(mems_
, meta_
.fd
.GetNumber());
230 TEST_SYNC_POINT("FlushJob::InstallResults");
231 // Replace immutable memtable with the generated Table
232 s
= cfd_
->imm()->InstallMemtableFlushResults(
233 cfd_
, mutable_cf_options_
, mems_
, prep_tracker
, versions_
, db_mutex_
,
234 meta_
.fd
.GetNumber(), &job_context_
->memtables_to_free
, db_directory_
,
238 if (s
.ok() && file_meta
!= nullptr) {
241 RecordFlushIOStats();
243 auto stream
= event_logger_
->LogToBuffer(log_buffer_
);
244 stream
<< "job" << job_context_
->job_id
<< "event"
246 stream
<< "output_compression"
247 << CompressionTypeToString(output_compression_
);
248 stream
<< "lsm_state";
250 auto vstorage
= cfd_
->current()->storage_info();
251 for (int level
= 0; level
< vstorage
->num_levels(); ++level
) {
252 stream
<< vstorage
->NumLevelFiles(level
);
255 stream
<< "immutable_memtables" << cfd_
->imm()->NumNotFlushed();
257 if (measure_io_stats_
) {
258 if (prev_perf_level
!= PerfLevel::kEnableTime
) {
259 SetPerfLevel(prev_perf_level
);
261 stream
<< "file_write_nanos" << (IOSTATS(write_nanos
) - prev_write_nanos
);
262 stream
<< "file_range_sync_nanos"
263 << (IOSTATS(range_sync_nanos
) - prev_range_sync_nanos
);
264 stream
<< "file_fsync_nanos" << (IOSTATS(fsync_nanos
) - prev_fsync_nanos
);
265 stream
<< "file_prepare_write_nanos"
266 << (IOSTATS(prepare_write_nanos
) - prev_prepare_write_nanos
);
272 void FlushJob::Cancel() {
273 db_mutex_
->AssertHeld();
274 assert(base_
!= nullptr);
278 Status
FlushJob::WriteLevel0Table() {
279 AutoThreadOperationStageUpdater
stage_updater(
280 ThreadStatus::STAGE_FLUSH_WRITE_L0
);
281 db_mutex_
->AssertHeld();
282 const uint64_t start_micros
= db_options_
.env
->NowMicros();
285 auto write_hint
= cfd_
->CalculateSSTWriteHint(0);
288 log_buffer_
->FlushBufferToLog();
290 // memtables and range_del_iters store internal iterators over each data
291 // memtable and its associated range deletion memtable, respectively, at
292 // corresponding indexes.
293 std::vector
<InternalIterator
*> memtables
;
294 std::vector
<InternalIterator
*> range_del_iters
;
296 ro
.total_order_seek
= true;
298 uint64_t total_num_entries
= 0, total_num_deletes
= 0;
299 size_t total_memory_usage
= 0;
300 for (MemTable
* m
: mems_
) {
302 db_options_
.info_log
,
303 "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64
"\n",
304 cfd_
->GetName().c_str(), job_context_
->job_id
, m
->GetNextLogNumber());
305 memtables
.push_back(m
->NewIterator(ro
, &arena
));
306 auto* range_del_iter
= m
->NewRangeTombstoneIterator(ro
);
307 if (range_del_iter
!= nullptr) {
308 range_del_iters
.push_back(range_del_iter
);
310 total_num_entries
+= m
->num_entries();
311 total_num_deletes
+= m
->num_deletes();
312 total_memory_usage
+= m
->ApproximateMemoryUsage();
316 << "job" << job_context_
->job_id
<< "event"
318 << "num_memtables" << mems_
.size() << "num_entries" << total_num_entries
319 << "num_deletes" << total_num_deletes
<< "memory_usage"
320 << total_memory_usage
<< "flush_reason"
321 << GetFlushReasonString(cfd_
->GetFlushReason());
324 ScopedArenaIterator
iter(
325 NewMergingIterator(&cfd_
->internal_comparator(), &memtables
[0],
326 static_cast<int>(memtables
.size()), &arena
));
327 std::unique_ptr
<InternalIterator
> range_del_iter(NewMergingIterator(
328 &cfd_
->internal_comparator(),
329 range_del_iters
.empty() ? nullptr : &range_del_iters
[0],
330 static_cast<int>(range_del_iters
.size())));
331 ROCKS_LOG_INFO(db_options_
.info_log
,
332 "[%s] [JOB %d] Level-0 flush table #%" PRIu64
": started",
333 cfd_
->GetName().c_str(), job_context_
->job_id
,
334 meta_
.fd
.GetNumber());
336 TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
337 &output_compression_
);
338 int64_t _current_time
= 0;
339 auto status
= db_options_
.env
->GetCurrentTime(&_current_time
);
340 // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
343 db_options_
.info_log
,
344 "Failed to get current time to populate creation_time property. "
346 status
.ToString().c_str());
348 const uint64_t current_time
= static_cast<uint64_t>(_current_time
);
350 uint64_t oldest_key_time
=
351 mems_
.front()->ApproximateOldestKeyTime();
354 dbname_
, db_options_
.env
, *cfd_
->ioptions(), mutable_cf_options_
,
355 env_options_
, cfd_
->table_cache(), iter
.get(),
356 std::move(range_del_iter
), &meta_
, cfd_
->internal_comparator(),
357 cfd_
->int_tbl_prop_collector_factories(), cfd_
->GetID(),
358 cfd_
->GetName(), existing_snapshots_
,
359 earliest_write_conflict_snapshot_
, snapshot_checker_
,
360 output_compression_
, cfd_
->ioptions()->compression_opts
,
361 mutable_cf_options_
.paranoid_file_checks
, cfd_
->internal_stats(),
362 TableFileCreationReason::kFlush
, event_logger_
, job_context_
->job_id
,
363 Env::IO_HIGH
, &table_properties_
, 0 /* level */, current_time
,
364 oldest_key_time
, write_hint
);
365 LogFlush(db_options_
.info_log
);
367 ROCKS_LOG_INFO(db_options_
.info_log
,
368 "[%s] [JOB %d] Level-0 flush table #%" PRIu64
": %" PRIu64
371 cfd_
->GetName().c_str(), job_context_
->job_id
,
372 meta_
.fd
.GetNumber(), meta_
.fd
.GetFileSize(),
373 s
.ToString().c_str(),
374 meta_
.marked_for_compaction
? " (needs compaction)" : "");
376 if (s
.ok() && output_file_directory_
!= nullptr) {
377 s
= output_file_directory_
->Fsync();
379 TEST_SYNC_POINT("FlushJob::WriteLevel0Table");
384 // Note that if file_size is zero, the file has been deleted and
385 // should not be added to the manifest.
386 if (s
.ok() && meta_
.fd
.GetFileSize() > 0) {
387 // if we have more than 1 background thread, then we cannot
388 // insert files directly into higher levels because some other
389 // threads could be concurrently producing compacted files for
392 edit_
->AddFile(0 /* level */, meta_
.fd
.GetNumber(), meta_
.fd
.GetPathId(),
393 meta_
.fd
.GetFileSize(), meta_
.smallest
, meta_
.largest
,
394 meta_
.fd
.smallest_seqno
, meta_
.fd
.largest_seqno
,
395 meta_
.marked_for_compaction
);
398 // Note that here we treat flush as level 0 compaction in internal stats
399 InternalStats::CompactionStats
stats(CompactionReason::kFlush
, 1);
400 stats
.micros
= db_options_
.env
->NowMicros() - start_micros
;
401 stats
.bytes_written
= meta_
.fd
.GetFileSize();
402 MeasureTime(stats_
, FLUSH_TIME
, stats
.micros
);
403 cfd_
->internal_stats()->AddCompactionStats(0 /* level */, stats
);
404 cfd_
->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED
,
405 meta_
.fd
.GetFileSize());
406 RecordFlushIOStats();
410 } // namespace rocksdb