]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/flush_job.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / flush_job.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/flush_job.h"
11
12 #ifndef __STDC_FORMAT_MACROS
13 #define __STDC_FORMAT_MACROS
14 #endif
15
16 #include <inttypes.h>
17
18 #include <algorithm>
19 #include <vector>
20
21 #include "db/builder.h"
22 #include "db/db_iter.h"
23 #include "db/dbformat.h"
24 #include "db/event_helpers.h"
25 #include "db/log_reader.h"
26 #include "db/log_writer.h"
27 #include "db/memtable.h"
28 #include "db/memtable_list.h"
29 #include "db/merge_context.h"
30 #include "db/range_tombstone_fragmenter.h"
31 #include "db/version_set.h"
32 #include "monitoring/iostats_context_imp.h"
33 #include "monitoring/perf_context_imp.h"
34 #include "monitoring/thread_status_util.h"
35 #include "port/port.h"
36 #include "rocksdb/db.h"
37 #include "rocksdb/env.h"
38 #include "rocksdb/statistics.h"
39 #include "rocksdb/status.h"
40 #include "rocksdb/table.h"
41 #include "table/block.h"
42 #include "table/block_based_table_factory.h"
43 #include "table/merging_iterator.h"
44 #include "table/table_builder.h"
45 #include "table/two_level_iterator.h"
46 #include "util/coding.h"
47 #include "util/event_logger.h"
48 #include "util/file_util.h"
49 #include "util/filename.h"
50 #include "util/log_buffer.h"
51 #include "util/logging.h"
52 #include "util/mutexlock.h"
53 #include "util/stop_watch.h"
54 #include "util/sync_point.h"
55
56 namespace rocksdb {
57
58 const char* GetFlushReasonString (FlushReason flush_reason) {
59 switch (flush_reason) {
60 case FlushReason::kOthers:
61 return "Other Reasons";
62 case FlushReason::kGetLiveFiles:
63 return "Get Live Files";
64 case FlushReason::kShutDown:
65 return "Shut down";
66 case FlushReason::kExternalFileIngestion:
67 return "External File Ingestion";
68 case FlushReason::kManualCompaction:
69 return "Manual Compaction";
70 case FlushReason::kWriteBufferManager:
71 return "Write Buffer Manager";
72 case FlushReason::kWriteBufferFull:
73 return "Write Buffer Full";
74 case FlushReason::kTest:
75 return "Test";
76 case FlushReason::kDeleteFiles:
77 return "Delete Files";
78 case FlushReason::kAutoCompaction:
79 return "Auto Compaction";
80 case FlushReason::kManualFlush:
81 return "Manual Flush";
82 case FlushReason::kErrorRecovery:
83 return "Error Recovery";
84 default:
85 return "Invalid";
86 }
87 }
88
89 FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
90 const ImmutableDBOptions& db_options,
91 const MutableCFOptions& mutable_cf_options,
92 const uint64_t* max_memtable_id,
93 const EnvOptions& env_options, VersionSet* versions,
94 InstrumentedMutex* db_mutex,
95 std::atomic<bool>* shutting_down,
96 std::vector<SequenceNumber> existing_snapshots,
97 SequenceNumber earliest_write_conflict_snapshot,
98 SnapshotChecker* snapshot_checker, JobContext* job_context,
99 LogBuffer* log_buffer, Directory* db_directory,
100 Directory* output_file_directory,
101 CompressionType output_compression, Statistics* stats,
102 EventLogger* event_logger, bool measure_io_stats,
103 const bool sync_output_directory, const bool write_manifest,
104 Env::Priority thread_pri)
105 : dbname_(dbname),
106 cfd_(cfd),
107 db_options_(db_options),
108 mutable_cf_options_(mutable_cf_options),
109 max_memtable_id_(max_memtable_id),
110 env_options_(env_options),
111 versions_(versions),
112 db_mutex_(db_mutex),
113 shutting_down_(shutting_down),
114 existing_snapshots_(std::move(existing_snapshots)),
115 earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
116 snapshot_checker_(snapshot_checker),
117 job_context_(job_context),
118 log_buffer_(log_buffer),
119 db_directory_(db_directory),
120 output_file_directory_(output_file_directory),
121 output_compression_(output_compression),
122 stats_(stats),
123 event_logger_(event_logger),
124 measure_io_stats_(measure_io_stats),
125 sync_output_directory_(sync_output_directory),
126 write_manifest_(write_manifest),
127 edit_(nullptr),
128 base_(nullptr),
129 pick_memtable_called(false),
130 thread_pri_(thread_pri) {
131 // Update the thread status to indicate flush.
132 ReportStartedFlush();
133 TEST_SYNC_POINT("FlushJob::FlushJob()");
134 }
135
136 FlushJob::~FlushJob() {
137 ThreadStatusUtil::ResetThreadStatus();
138 }
139
140 void FlushJob::ReportStartedFlush() {
141 ThreadStatusUtil::SetColumnFamily(cfd_, cfd_->ioptions()->env,
142 db_options_.enable_thread_tracking);
143 ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH);
144 ThreadStatusUtil::SetThreadOperationProperty(
145 ThreadStatus::COMPACTION_JOB_ID,
146 job_context_->job_id);
147 IOSTATS_RESET(bytes_written);
148 }
149
150 void FlushJob::ReportFlushInputSize(const autovector<MemTable*>& mems) {
151 uint64_t input_size = 0;
152 for (auto* mem : mems) {
153 input_size += mem->ApproximateMemoryUsage();
154 }
155 ThreadStatusUtil::IncreaseThreadOperationProperty(
156 ThreadStatus::FLUSH_BYTES_MEMTABLES,
157 input_size);
158 }
159
160 void FlushJob::RecordFlushIOStats() {
161 RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written));
162 ThreadStatusUtil::IncreaseThreadOperationProperty(
163 ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
164 IOSTATS_RESET(bytes_written);
165 }
166
167 void FlushJob::PickMemTable() {
168 db_mutex_->AssertHeld();
169 assert(!pick_memtable_called);
170 pick_memtable_called = true;
171 // Save the contents of the earliest memtable as a new Table
172 cfd_->imm()->PickMemtablesToFlush(max_memtable_id_, &mems_);
173 if (mems_.empty()) {
174 return;
175 }
176
177 ReportFlushInputSize(mems_);
178
179 // entries mems are (implicitly) sorted in ascending order by their created
180 // time. We will use the first memtable's `edit` to keep the meta info for
181 // this flush.
182 MemTable* m = mems_[0];
183 edit_ = m->GetEdits();
184 edit_->SetPrevLogNumber(0);
185 // SetLogNumber(log_num) indicates logs with number smaller than log_num
186 // will no longer be picked up for recovery.
187 edit_->SetLogNumber(mems_.back()->GetNextLogNumber());
188 edit_->SetColumnFamily(cfd_->GetID());
189
190 // path 0 for level 0 file.
191 meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
192
193 base_ = cfd_->current();
194 base_->Ref(); // it is likely that we do not need this reference
195 }
196
197 Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
198 FileMetaData* file_meta) {
199 TEST_SYNC_POINT("FlushJob::Start");
200 db_mutex_->AssertHeld();
201 assert(pick_memtable_called);
202 AutoThreadOperationStageUpdater stage_run(
203 ThreadStatus::STAGE_FLUSH_RUN);
204 if (mems_.empty()) {
205 ROCKS_LOG_BUFFER(log_buffer_, "[%s] Nothing in memtable to flush",
206 cfd_->GetName().c_str());
207 return Status::OK();
208 }
209
210 // I/O measurement variables
211 PerfLevel prev_perf_level = PerfLevel::kEnableTime;
212 uint64_t prev_write_nanos = 0;
213 uint64_t prev_fsync_nanos = 0;
214 uint64_t prev_range_sync_nanos = 0;
215 uint64_t prev_prepare_write_nanos = 0;
216 uint64_t prev_cpu_write_nanos = 0;
217 uint64_t prev_cpu_read_nanos = 0;
218 if (measure_io_stats_) {
219 prev_perf_level = GetPerfLevel();
220 SetPerfLevel(PerfLevel::kEnableTime);
221 prev_write_nanos = IOSTATS(write_nanos);
222 prev_fsync_nanos = IOSTATS(fsync_nanos);
223 prev_range_sync_nanos = IOSTATS(range_sync_nanos);
224 prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
225 prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
226 prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
227 }
228
229 // This will release and re-acquire the mutex.
230 Status s = WriteLevel0Table();
231
232 if (s.ok() &&
233 (shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) {
234 s = Status::ShutdownInProgress(
235 "Database shutdown or Column family drop during flush");
236 }
237
238 if (!s.ok()) {
239 cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
240 } else if (write_manifest_) {
241 TEST_SYNC_POINT("FlushJob::InstallResults");
242 // Replace immutable memtable with the generated Table
243 s = cfd_->imm()->TryInstallMemtableFlushResults(
244 cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
245 meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
246 log_buffer_);
247 }
248
249 if (s.ok() && file_meta != nullptr) {
250 *file_meta = meta_;
251 }
252 RecordFlushIOStats();
253
254 auto stream = event_logger_->LogToBuffer(log_buffer_);
255 stream << "job" << job_context_->job_id << "event"
256 << "flush_finished";
257 stream << "output_compression"
258 << CompressionTypeToString(output_compression_);
259 stream << "lsm_state";
260 stream.StartArray();
261 auto vstorage = cfd_->current()->storage_info();
262 for (int level = 0; level < vstorage->num_levels(); ++level) {
263 stream << vstorage->NumLevelFiles(level);
264 }
265 stream.EndArray();
266 stream << "immutable_memtables" << cfd_->imm()->NumNotFlushed();
267
268 if (measure_io_stats_) {
269 if (prev_perf_level != PerfLevel::kEnableTime) {
270 SetPerfLevel(prev_perf_level);
271 }
272 stream << "file_write_nanos" << (IOSTATS(write_nanos) - prev_write_nanos);
273 stream << "file_range_sync_nanos"
274 << (IOSTATS(range_sync_nanos) - prev_range_sync_nanos);
275 stream << "file_fsync_nanos" << (IOSTATS(fsync_nanos) - prev_fsync_nanos);
276 stream << "file_prepare_write_nanos"
277 << (IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos);
278 stream << "file_cpu_write_nanos"
279 << (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos);
280 stream << "file_cpu_read_nanos"
281 << (IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos);
282 }
283
284 return s;
285 }
286
287 void FlushJob::Cancel() {
288 db_mutex_->AssertHeld();
289 assert(base_ != nullptr);
290 base_->Unref();
291 }
292
293 Status FlushJob::WriteLevel0Table() {
294 AutoThreadOperationStageUpdater stage_updater(
295 ThreadStatus::STAGE_FLUSH_WRITE_L0);
296 db_mutex_->AssertHeld();
297 const uint64_t start_micros = db_options_.env->NowMicros();
298 const uint64_t start_cpu_micros = db_options_.env->NowCPUNanos() / 1000;
299 Status s;
300 {
301 auto write_hint = cfd_->CalculateSSTWriteHint(0);
302 db_mutex_->Unlock();
303 if (log_buffer_) {
304 log_buffer_->FlushBufferToLog();
305 }
306 // memtables and range_del_iters store internal iterators over each data
307 // memtable and its associated range deletion memtable, respectively, at
308 // corresponding indexes.
309 std::vector<InternalIterator*> memtables;
310 std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
311 range_del_iters;
312 ReadOptions ro;
313 ro.total_order_seek = true;
314 Arena arena;
315 uint64_t total_num_entries = 0, total_num_deletes = 0;
316 uint64_t total_data_size = 0;
317 size_t total_memory_usage = 0;
318 for (MemTable* m : mems_) {
319 ROCKS_LOG_INFO(
320 db_options_.info_log,
321 "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
322 cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
323 memtables.push_back(m->NewIterator(ro, &arena));
324 auto* range_del_iter =
325 m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
326 if (range_del_iter != nullptr) {
327 range_del_iters.emplace_back(range_del_iter);
328 }
329 total_num_entries += m->num_entries();
330 total_num_deletes += m->num_deletes();
331 total_data_size += m->get_data_size();
332 total_memory_usage += m->ApproximateMemoryUsage();
333 }
334
335 event_logger_->Log() << "job" << job_context_->job_id << "event"
336 << "flush_started"
337 << "num_memtables" << mems_.size() << "num_entries"
338 << total_num_entries << "num_deletes"
339 << total_num_deletes << "total_data_size"
340 << total_data_size << "memory_usage"
341 << total_memory_usage << "flush_reason"
342 << GetFlushReasonString(cfd_->GetFlushReason());
343
344 {
345 ScopedArenaIterator iter(
346 NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
347 static_cast<int>(memtables.size()), &arena));
348 ROCKS_LOG_INFO(db_options_.info_log,
349 "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
350 cfd_->GetName().c_str(), job_context_->job_id,
351 meta_.fd.GetNumber());
352
353 TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
354 &output_compression_);
355 int64_t _current_time = 0;
356 auto status = db_options_.env->GetCurrentTime(&_current_time);
357 // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
358 if (!status.ok()) {
359 ROCKS_LOG_WARN(
360 db_options_.info_log,
361 "Failed to get current time to populate creation_time property. "
362 "Status: %s",
363 status.ToString().c_str());
364 }
365 const uint64_t current_time = static_cast<uint64_t>(_current_time);
366
367 uint64_t oldest_key_time =
368 mems_.front()->ApproximateOldestKeyTime();
369
370 s = BuildTable(
371 dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
372 env_options_, cfd_->table_cache(), iter.get(),
373 std::move(range_del_iters), &meta_, cfd_->internal_comparator(),
374 cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
375 cfd_->GetName(), existing_snapshots_,
376 earliest_write_conflict_snapshot_, snapshot_checker_,
377 output_compression_, mutable_cf_options_.sample_for_compression,
378 cfd_->ioptions()->compression_opts,
379 mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
380 TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
381 Env::IO_HIGH, &table_properties_, 0 /* level */, current_time,
382 oldest_key_time, write_hint);
383 LogFlush(db_options_.info_log);
384 }
385 ROCKS_LOG_INFO(db_options_.info_log,
386 "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64
387 " bytes %s"
388 "%s",
389 cfd_->GetName().c_str(), job_context_->job_id,
390 meta_.fd.GetNumber(), meta_.fd.GetFileSize(),
391 s.ToString().c_str(),
392 meta_.marked_for_compaction ? " (needs compaction)" : "");
393
394 if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) {
395 s = output_file_directory_->Fsync();
396 }
397 TEST_SYNC_POINT("FlushJob::WriteLevel0Table");
398 db_mutex_->Lock();
399 }
400 base_->Unref();
401
402 // Note that if file_size is zero, the file has been deleted and
403 // should not be added to the manifest.
404 if (s.ok() && meta_.fd.GetFileSize() > 0) {
405 // if we have more than 1 background thread, then we cannot
406 // insert files directly into higher levels because some other
407 // threads could be concurrently producing compacted files for
408 // that key range.
409 // Add file to L0
410 edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
411 meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
412 meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
413 meta_.marked_for_compaction);
414 }
415
416 // Note that here we treat flush as level 0 compaction in internal stats
417 InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
418 stats.micros = db_options_.env->NowMicros() - start_micros;
419 stats.cpu_micros = db_options_.env->NowCPUNanos() / 1000 - start_cpu_micros;
420 stats.bytes_written = meta_.fd.GetFileSize();
421 RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros);
422 cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats);
423 cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
424 meta_.fd.GetFileSize());
425 RecordFlushIOStats();
426 return s;
427 }
428
429 } // namespace rocksdb