]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/flush_job.cc
b64712fd1f06ac7ab417b695fbe9d01debde121b
[ceph.git] / ceph / src / rocksdb / db / flush_job.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/flush_job.h"
11
12 #ifndef __STDC_FORMAT_MACROS
13 #define __STDC_FORMAT_MACROS
14 #endif
15
16 #include <inttypes.h>
17
18 #include <algorithm>
19 #include <vector>
20
21 #include "db/builder.h"
22 #include "db/db_iter.h"
23 #include "db/dbformat.h"
24 #include "db/event_helpers.h"
25 #include "db/log_reader.h"
26 #include "db/log_writer.h"
27 #include "db/memtable_list.h"
28 #include "db/merge_context.h"
29 #include "db/version_set.h"
30 #include "monitoring/iostats_context_imp.h"
31 #include "monitoring/perf_context_imp.h"
32 #include "monitoring/thread_status_util.h"
33 #include "port/port.h"
34 #include "db/memtable.h"
35 #include "rocksdb/db.h"
36 #include "rocksdb/env.h"
37 #include "rocksdb/statistics.h"
38 #include "rocksdb/status.h"
39 #include "rocksdb/table.h"
40 #include "table/block.h"
41 #include "table/block_based_table_factory.h"
42 #include "table/merging_iterator.h"
43 #include "table/table_builder.h"
44 #include "table/two_level_iterator.h"
45 #include "util/coding.h"
46 #include "util/event_logger.h"
47 #include "util/file_util.h"
48 #include "util/filename.h"
49 #include "util/log_buffer.h"
50 #include "util/logging.h"
51 #include "util/mutexlock.h"
52 #include "util/stop_watch.h"
53 #include "util/sync_point.h"
54
55 namespace rocksdb {
56
57 const char* GetFlushReasonString (FlushReason flush_reason) {
58 switch (flush_reason) {
59 case FlushReason::kOthers:
60 return "Other Reasons";
61 case FlushReason::kGetLiveFiles:
62 return "Get Live Files";
63 case FlushReason::kShutDown:
64 return "Shut down";
65 case FlushReason::kExternalFileIngestion:
66 return "External File Ingestion";
67 case FlushReason::kManualCompaction:
68 return "Manual Compaction";
69 case FlushReason::kWriteBufferManager:
70 return "Write Buffer Manager";
71 case FlushReason::kWriteBufferFull:
72 return "Write Buffer Full";
73 case FlushReason::kTest:
74 return "Test";
75 case FlushReason::kDeleteFiles:
76 return "Delete Files";
77 case FlushReason::kAutoCompaction:
78 return "Auto Compaction";
79 case FlushReason::kManualFlush:
80 return "Manual Flush";
81 case FlushReason::kErrorRecovery:
82 return "Error Recovery";
83 default:
84 return "Invalid";
85 }
86 }
87
88
89 FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
90 const ImmutableDBOptions& db_options,
91 const MutableCFOptions& mutable_cf_options,
92 const EnvOptions env_options, VersionSet* versions,
93 InstrumentedMutex* db_mutex,
94 std::atomic<bool>* shutting_down,
95 std::vector<SequenceNumber> existing_snapshots,
96 SequenceNumber earliest_write_conflict_snapshot,
97 SnapshotChecker* snapshot_checker, JobContext* job_context,
98 LogBuffer* log_buffer, Directory* db_directory,
99 Directory* output_file_directory,
100 CompressionType output_compression, Statistics* stats,
101 EventLogger* event_logger, bool measure_io_stats)
102 : dbname_(dbname),
103 cfd_(cfd),
104 db_options_(db_options),
105 mutable_cf_options_(mutable_cf_options),
106 env_options_(env_options),
107 versions_(versions),
108 db_mutex_(db_mutex),
109 shutting_down_(shutting_down),
110 existing_snapshots_(std::move(existing_snapshots)),
111 earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
112 snapshot_checker_(snapshot_checker),
113 job_context_(job_context),
114 log_buffer_(log_buffer),
115 db_directory_(db_directory),
116 output_file_directory_(output_file_directory),
117 output_compression_(output_compression),
118 stats_(stats),
119 event_logger_(event_logger),
120 measure_io_stats_(measure_io_stats),
121 edit_(nullptr),
122 base_(nullptr),
123 pick_memtable_called(false) {
124 // Update the thread status to indicate flush.
125 ReportStartedFlush();
126 TEST_SYNC_POINT("FlushJob::FlushJob()");
127 }
128
129 FlushJob::~FlushJob() {
130 ThreadStatusUtil::ResetThreadStatus();
131 }
132
133 void FlushJob::ReportStartedFlush() {
134 ThreadStatusUtil::SetColumnFamily(cfd_, cfd_->ioptions()->env,
135 db_options_.enable_thread_tracking);
136 ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH);
137 ThreadStatusUtil::SetThreadOperationProperty(
138 ThreadStatus::COMPACTION_JOB_ID,
139 job_context_->job_id);
140 IOSTATS_RESET(bytes_written);
141 }
142
143 void FlushJob::ReportFlushInputSize(const autovector<MemTable*>& mems) {
144 uint64_t input_size = 0;
145 for (auto* mem : mems) {
146 input_size += mem->ApproximateMemoryUsage();
147 }
148 ThreadStatusUtil::IncreaseThreadOperationProperty(
149 ThreadStatus::FLUSH_BYTES_MEMTABLES,
150 input_size);
151 }
152
153 void FlushJob::RecordFlushIOStats() {
154 RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written));
155 ThreadStatusUtil::IncreaseThreadOperationProperty(
156 ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
157 IOSTATS_RESET(bytes_written);
158 }
159
160 void FlushJob::PickMemTable() {
161 db_mutex_->AssertHeld();
162 assert(!pick_memtable_called);
163 pick_memtable_called = true;
164 // Save the contents of the earliest memtable as a new Table
165 cfd_->imm()->PickMemtablesToFlush(&mems_);
166 if (mems_.empty()) {
167 return;
168 }
169
170 ReportFlushInputSize(mems_);
171
172 // entries mems are (implicitly) sorted in ascending order by their created
173 // time. We will use the first memtable's `edit` to keep the meta info for
174 // this flush.
175 MemTable* m = mems_[0];
176 edit_ = m->GetEdits();
177 edit_->SetPrevLogNumber(0);
178 // SetLogNumber(log_num) indicates logs with number smaller than log_num
179 // will no longer be picked up for recovery.
180 edit_->SetLogNumber(mems_.back()->GetNextLogNumber());
181 edit_->SetColumnFamily(cfd_->GetID());
182
183 // path 0 for level 0 file.
184 meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
185
186 base_ = cfd_->current();
187 base_->Ref(); // it is likely that we do not need this reference
188 }
189
190 Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
191 FileMetaData* file_meta) {
192 TEST_SYNC_POINT("FlushJob::Start");
193 db_mutex_->AssertHeld();
194 assert(pick_memtable_called);
195 AutoThreadOperationStageUpdater stage_run(
196 ThreadStatus::STAGE_FLUSH_RUN);
197 if (mems_.empty()) {
198 ROCKS_LOG_BUFFER(log_buffer_, "[%s] Nothing in memtable to flush",
199 cfd_->GetName().c_str());
200 return Status::OK();
201 }
202
203 // I/O measurement variables
204 PerfLevel prev_perf_level = PerfLevel::kEnableTime;
205 uint64_t prev_write_nanos = 0;
206 uint64_t prev_fsync_nanos = 0;
207 uint64_t prev_range_sync_nanos = 0;
208 uint64_t prev_prepare_write_nanos = 0;
209 if (measure_io_stats_) {
210 prev_perf_level = GetPerfLevel();
211 SetPerfLevel(PerfLevel::kEnableTime);
212 prev_write_nanos = IOSTATS(write_nanos);
213 prev_fsync_nanos = IOSTATS(fsync_nanos);
214 prev_range_sync_nanos = IOSTATS(range_sync_nanos);
215 prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
216 }
217
218 // This will release and re-acquire the mutex.
219 Status s = WriteLevel0Table();
220
221 if (s.ok() &&
222 (shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) {
223 s = Status::ShutdownInProgress(
224 "Database shutdown or Column family drop during flush");
225 }
226
227 if (!s.ok()) {
228 cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
229 } else {
230 TEST_SYNC_POINT("FlushJob::InstallResults");
231 // Replace immutable memtable with the generated Table
232 s = cfd_->imm()->InstallMemtableFlushResults(
233 cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
234 meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
235 log_buffer_);
236 }
237
238 if (s.ok() && file_meta != nullptr) {
239 *file_meta = meta_;
240 }
241 RecordFlushIOStats();
242
243 auto stream = event_logger_->LogToBuffer(log_buffer_);
244 stream << "job" << job_context_->job_id << "event"
245 << "flush_finished";
246 stream << "output_compression"
247 << CompressionTypeToString(output_compression_);
248 stream << "lsm_state";
249 stream.StartArray();
250 auto vstorage = cfd_->current()->storage_info();
251 for (int level = 0; level < vstorage->num_levels(); ++level) {
252 stream << vstorage->NumLevelFiles(level);
253 }
254 stream.EndArray();
255 stream << "immutable_memtables" << cfd_->imm()->NumNotFlushed();
256
257 if (measure_io_stats_) {
258 if (prev_perf_level != PerfLevel::kEnableTime) {
259 SetPerfLevel(prev_perf_level);
260 }
261 stream << "file_write_nanos" << (IOSTATS(write_nanos) - prev_write_nanos);
262 stream << "file_range_sync_nanos"
263 << (IOSTATS(range_sync_nanos) - prev_range_sync_nanos);
264 stream << "file_fsync_nanos" << (IOSTATS(fsync_nanos) - prev_fsync_nanos);
265 stream << "file_prepare_write_nanos"
266 << (IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos);
267 }
268
269 return s;
270 }
271
272 void FlushJob::Cancel() {
273 db_mutex_->AssertHeld();
274 assert(base_ != nullptr);
275 base_->Unref();
276 }
277
278 Status FlushJob::WriteLevel0Table() {
279 AutoThreadOperationStageUpdater stage_updater(
280 ThreadStatus::STAGE_FLUSH_WRITE_L0);
281 db_mutex_->AssertHeld();
282 const uint64_t start_micros = db_options_.env->NowMicros();
283 Status s;
284 {
285 auto write_hint = cfd_->CalculateSSTWriteHint(0);
286 db_mutex_->Unlock();
287 if (log_buffer_) {
288 log_buffer_->FlushBufferToLog();
289 }
290 // memtables and range_del_iters store internal iterators over each data
291 // memtable and its associated range deletion memtable, respectively, at
292 // corresponding indexes.
293 std::vector<InternalIterator*> memtables;
294 std::vector<InternalIterator*> range_del_iters;
295 ReadOptions ro;
296 ro.total_order_seek = true;
297 Arena arena;
298 uint64_t total_num_entries = 0, total_num_deletes = 0;
299 size_t total_memory_usage = 0;
300 for (MemTable* m : mems_) {
301 ROCKS_LOG_INFO(
302 db_options_.info_log,
303 "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
304 cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
305 memtables.push_back(m->NewIterator(ro, &arena));
306 auto* range_del_iter = m->NewRangeTombstoneIterator(ro);
307 if (range_del_iter != nullptr) {
308 range_del_iters.push_back(range_del_iter);
309 }
310 total_num_entries += m->num_entries();
311 total_num_deletes += m->num_deletes();
312 total_memory_usage += m->ApproximateMemoryUsage();
313 }
314
315 event_logger_->Log()
316 << "job" << job_context_->job_id << "event"
317 << "flush_started"
318 << "num_memtables" << mems_.size() << "num_entries" << total_num_entries
319 << "num_deletes" << total_num_deletes << "memory_usage"
320 << total_memory_usage << "flush_reason"
321 << GetFlushReasonString(cfd_->GetFlushReason());
322
323 {
324 ScopedArenaIterator iter(
325 NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
326 static_cast<int>(memtables.size()), &arena));
327 std::unique_ptr<InternalIterator> range_del_iter(NewMergingIterator(
328 &cfd_->internal_comparator(),
329 range_del_iters.empty() ? nullptr : &range_del_iters[0],
330 static_cast<int>(range_del_iters.size())));
331 ROCKS_LOG_INFO(db_options_.info_log,
332 "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
333 cfd_->GetName().c_str(), job_context_->job_id,
334 meta_.fd.GetNumber());
335
336 TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
337 &output_compression_);
338 int64_t _current_time = 0;
339 auto status = db_options_.env->GetCurrentTime(&_current_time);
340 // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
341 if (!status.ok()) {
342 ROCKS_LOG_WARN(
343 db_options_.info_log,
344 "Failed to get current time to populate creation_time property. "
345 "Status: %s",
346 status.ToString().c_str());
347 }
348 const uint64_t current_time = static_cast<uint64_t>(_current_time);
349
350 uint64_t oldest_key_time =
351 mems_.front()->ApproximateOldestKeyTime();
352
353 s = BuildTable(
354 dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
355 env_options_, cfd_->table_cache(), iter.get(),
356 std::move(range_del_iter), &meta_, cfd_->internal_comparator(),
357 cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
358 cfd_->GetName(), existing_snapshots_,
359 earliest_write_conflict_snapshot_, snapshot_checker_,
360 output_compression_, cfd_->ioptions()->compression_opts,
361 mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
362 TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
363 Env::IO_HIGH, &table_properties_, 0 /* level */, current_time,
364 oldest_key_time, write_hint);
365 LogFlush(db_options_.info_log);
366 }
367 ROCKS_LOG_INFO(db_options_.info_log,
368 "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64
369 " bytes %s"
370 "%s",
371 cfd_->GetName().c_str(), job_context_->job_id,
372 meta_.fd.GetNumber(), meta_.fd.GetFileSize(),
373 s.ToString().c_str(),
374 meta_.marked_for_compaction ? " (needs compaction)" : "");
375
376 if (s.ok() && output_file_directory_ != nullptr) {
377 s = output_file_directory_->Fsync();
378 }
379 TEST_SYNC_POINT("FlushJob::WriteLevel0Table");
380 db_mutex_->Lock();
381 }
382 base_->Unref();
383
384 // Note that if file_size is zero, the file has been deleted and
385 // should not be added to the manifest.
386 if (s.ok() && meta_.fd.GetFileSize() > 0) {
387 // if we have more than 1 background thread, then we cannot
388 // insert files directly into higher levels because some other
389 // threads could be concurrently producing compacted files for
390 // that key range.
391 // Add file to L0
392 edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
393 meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
394 meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
395 meta_.marked_for_compaction);
396 }
397
398 // Note that here we treat flush as level 0 compaction in internal stats
399 InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
400 stats.micros = db_options_.env->NowMicros() - start_micros;
401 stats.bytes_written = meta_.fd.GetFileSize();
402 MeasureTime(stats_, FLUSH_TIME, stats.micros);
403 cfd_->internal_stats()->AddCompactionStats(0 /* level */, stats);
404 cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
405 meta_.fd.GetFileSize());
406 RecordFlushIOStats();
407 return s;
408 }
409
410 } // namespace rocksdb