]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | ||
10 | #include "db/flush_job.h" | |
11 | ||
f67539c2 | 12 | #include <cinttypes> |
7c673cae FG |
13 | |
14 | #include <algorithm> | |
15 | #include <vector> | |
16 | ||
17 | #include "db/builder.h" | |
18 | #include "db/db_iter.h" | |
19 | #include "db/dbformat.h" | |
20 | #include "db/event_helpers.h" | |
21 | #include "db/log_reader.h" | |
22 | #include "db/log_writer.h" | |
494da23a | 23 | #include "db/memtable.h" |
7c673cae FG |
24 | #include "db/memtable_list.h" |
25 | #include "db/merge_context.h" | |
494da23a | 26 | #include "db/range_tombstone_fragmenter.h" |
7c673cae | 27 | #include "db/version_set.h" |
f67539c2 TL |
28 | #include "file/file_util.h" |
29 | #include "file/filename.h" | |
30 | #include "logging/event_logger.h" | |
31 | #include "logging/log_buffer.h" | |
32 | #include "logging/logging.h" | |
7c673cae FG |
33 | #include "monitoring/iostats_context_imp.h" |
34 | #include "monitoring/perf_context_imp.h" | |
35 | #include "monitoring/thread_status_util.h" | |
7c673cae | 36 | #include "port/port.h" |
7c673cae FG |
37 | #include "rocksdb/db.h" |
38 | #include "rocksdb/env.h" | |
39 | #include "rocksdb/statistics.h" | |
40 | #include "rocksdb/status.h" | |
41 | #include "rocksdb/table.h" | |
7c673cae FG |
42 | #include "table/merging_iterator.h" |
43 | #include "table/table_builder.h" | |
44 | #include "table/two_level_iterator.h" | |
f67539c2 | 45 | #include "test_util/sync_point.h" |
7c673cae | 46 | #include "util/coding.h" |
7c673cae FG |
47 | #include "util/mutexlock.h" |
48 | #include "util/stop_watch.h" | |
7c673cae | 49 | |
f67539c2 | 50 | namespace ROCKSDB_NAMESPACE { |
7c673cae | 51 | |
11fdf7f2 TL |
52 | const char* GetFlushReasonString (FlushReason flush_reason) { |
53 | switch (flush_reason) { | |
54 | case FlushReason::kOthers: | |
55 | return "Other Reasons"; | |
56 | case FlushReason::kGetLiveFiles: | |
57 | return "Get Live Files"; | |
58 | case FlushReason::kShutDown: | |
59 | return "Shut down"; | |
60 | case FlushReason::kExternalFileIngestion: | |
61 | return "External File Ingestion"; | |
62 | case FlushReason::kManualCompaction: | |
63 | return "Manual Compaction"; | |
64 | case FlushReason::kWriteBufferManager: | |
65 | return "Write Buffer Manager"; | |
66 | case FlushReason::kWriteBufferFull: | |
67 | return "Write Buffer Full"; | |
68 | case FlushReason::kTest: | |
69 | return "Test"; | |
70 | case FlushReason::kDeleteFiles: | |
71 | return "Delete Files"; | |
72 | case FlushReason::kAutoCompaction: | |
73 | return "Auto Compaction"; | |
74 | case FlushReason::kManualFlush: | |
75 | return "Manual Flush"; | |
76 | case FlushReason::kErrorRecovery: | |
77 | return "Error Recovery"; | |
78 | default: | |
79 | return "Invalid"; | |
80 | } | |
81 | } | |
82 | ||
20effc67 TL |
83 | FlushJob::FlushJob( |
84 | const std::string& dbname, ColumnFamilyData* cfd, | |
85 | const ImmutableDBOptions& db_options, | |
86 | const MutableCFOptions& mutable_cf_options, const uint64_t* max_memtable_id, | |
87 | const FileOptions& file_options, VersionSet* versions, | |
88 | InstrumentedMutex* db_mutex, std::atomic<bool>* shutting_down, | |
89 | std::vector<SequenceNumber> existing_snapshots, | |
90 | SequenceNumber earliest_write_conflict_snapshot, | |
91 | SnapshotChecker* snapshot_checker, JobContext* job_context, | |
92 | LogBuffer* log_buffer, FSDirectory* db_directory, | |
93 | FSDirectory* output_file_directory, CompressionType output_compression, | |
94 | Statistics* stats, EventLogger* event_logger, bool measure_io_stats, | |
95 | const bool sync_output_directory, const bool write_manifest, | |
96 | Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer, | |
97 | const std::string& db_id, const std::string& db_session_id, | |
98 | std::string full_history_ts_low) | |
7c673cae | 99 | : dbname_(dbname), |
20effc67 TL |
100 | db_id_(db_id), |
101 | db_session_id_(db_session_id), | |
7c673cae FG |
102 | cfd_(cfd), |
103 | db_options_(db_options), | |
104 | mutable_cf_options_(mutable_cf_options), | |
494da23a | 105 | max_memtable_id_(max_memtable_id), |
f67539c2 | 106 | file_options_(file_options), |
7c673cae FG |
107 | versions_(versions), |
108 | db_mutex_(db_mutex), | |
109 | shutting_down_(shutting_down), | |
110 | existing_snapshots_(std::move(existing_snapshots)), | |
111 | earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), | |
11fdf7f2 | 112 | snapshot_checker_(snapshot_checker), |
7c673cae FG |
113 | job_context_(job_context), |
114 | log_buffer_(log_buffer), | |
115 | db_directory_(db_directory), | |
116 | output_file_directory_(output_file_directory), | |
117 | output_compression_(output_compression), | |
118 | stats_(stats), | |
119 | event_logger_(event_logger), | |
120 | measure_io_stats_(measure_io_stats), | |
494da23a TL |
121 | sync_output_directory_(sync_output_directory), |
122 | write_manifest_(write_manifest), | |
11fdf7f2 TL |
123 | edit_(nullptr), |
124 | base_(nullptr), | |
494da23a | 125 | pick_memtable_called(false), |
20effc67 TL |
126 | thread_pri_(thread_pri), |
127 | io_tracer_(io_tracer), | |
128 | full_history_ts_low_(std::move(full_history_ts_low)) { | |
7c673cae FG |
129 | // Update the thread status to indicate flush. |
130 | ReportStartedFlush(); | |
131 | TEST_SYNC_POINT("FlushJob::FlushJob()"); | |
132 | } | |
133 | ||
134 | FlushJob::~FlushJob() { | |
20effc67 | 135 | io_status_.PermitUncheckedError(); |
7c673cae FG |
136 | ThreadStatusUtil::ResetThreadStatus(); |
137 | } | |
138 | ||
139 | void FlushJob::ReportStartedFlush() { | |
140 | ThreadStatusUtil::SetColumnFamily(cfd_, cfd_->ioptions()->env, | |
141 | db_options_.enable_thread_tracking); | |
142 | ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH); | |
143 | ThreadStatusUtil::SetThreadOperationProperty( | |
144 | ThreadStatus::COMPACTION_JOB_ID, | |
145 | job_context_->job_id); | |
146 | IOSTATS_RESET(bytes_written); | |
147 | } | |
148 | ||
149 | void FlushJob::ReportFlushInputSize(const autovector<MemTable*>& mems) { | |
150 | uint64_t input_size = 0; | |
151 | for (auto* mem : mems) { | |
152 | input_size += mem->ApproximateMemoryUsage(); | |
153 | } | |
154 | ThreadStatusUtil::IncreaseThreadOperationProperty( | |
155 | ThreadStatus::FLUSH_BYTES_MEMTABLES, | |
156 | input_size); | |
157 | } | |
158 | ||
159 | void FlushJob::RecordFlushIOStats() { | |
160 | RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written)); | |
161 | ThreadStatusUtil::IncreaseThreadOperationProperty( | |
162 | ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written)); | |
163 | IOSTATS_RESET(bytes_written); | |
164 | } | |
165 | ||
166 | void FlushJob::PickMemTable() { | |
167 | db_mutex_->AssertHeld(); | |
168 | assert(!pick_memtable_called); | |
169 | pick_memtable_called = true; | |
170 | // Save the contents of the earliest memtable as a new Table | |
494da23a | 171 | cfd_->imm()->PickMemtablesToFlush(max_memtable_id_, &mems_); |
7c673cae FG |
172 | if (mems_.empty()) { |
173 | return; | |
174 | } | |
175 | ||
176 | ReportFlushInputSize(mems_); | |
177 | ||
178 | // entries mems are (implicitly) sorted in ascending order by their created | |
179 | // time. We will use the first memtable's `edit` to keep the meta info for | |
180 | // this flush. | |
181 | MemTable* m = mems_[0]; | |
182 | edit_ = m->GetEdits(); | |
183 | edit_->SetPrevLogNumber(0); | |
184 | // SetLogNumber(log_num) indicates logs with number smaller than log_num | |
185 | // will no longer be picked up for recovery. | |
186 | edit_->SetLogNumber(mems_.back()->GetNextLogNumber()); | |
187 | edit_->SetColumnFamily(cfd_->GetID()); | |
188 | ||
189 | // path 0 for level 0 file. | |
190 | meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); | |
191 | ||
192 | base_ = cfd_->current(); | |
193 | base_->Ref(); // it is likely that we do not need this reference | |
194 | } | |
195 | ||
11fdf7f2 TL |
196 | Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, |
197 | FileMetaData* file_meta) { | |
198 | TEST_SYNC_POINT("FlushJob::Start"); | |
7c673cae FG |
199 | db_mutex_->AssertHeld(); |
200 | assert(pick_memtable_called); | |
201 | AutoThreadOperationStageUpdater stage_run( | |
202 | ThreadStatus::STAGE_FLUSH_RUN); | |
203 | if (mems_.empty()) { | |
204 | ROCKS_LOG_BUFFER(log_buffer_, "[%s] Nothing in memtable to flush", | |
205 | cfd_->GetName().c_str()); | |
206 | return Status::OK(); | |
207 | } | |
208 | ||
209 | // I/O measurement variables | |
210 | PerfLevel prev_perf_level = PerfLevel::kEnableTime; | |
211 | uint64_t prev_write_nanos = 0; | |
212 | uint64_t prev_fsync_nanos = 0; | |
213 | uint64_t prev_range_sync_nanos = 0; | |
214 | uint64_t prev_prepare_write_nanos = 0; | |
494da23a TL |
215 | uint64_t prev_cpu_write_nanos = 0; |
216 | uint64_t prev_cpu_read_nanos = 0; | |
7c673cae FG |
217 | if (measure_io_stats_) { |
218 | prev_perf_level = GetPerfLevel(); | |
219 | SetPerfLevel(PerfLevel::kEnableTime); | |
220 | prev_write_nanos = IOSTATS(write_nanos); | |
221 | prev_fsync_nanos = IOSTATS(fsync_nanos); | |
222 | prev_range_sync_nanos = IOSTATS(range_sync_nanos); | |
223 | prev_prepare_write_nanos = IOSTATS(prepare_write_nanos); | |
494da23a TL |
224 | prev_cpu_write_nanos = IOSTATS(cpu_write_nanos); |
225 | prev_cpu_read_nanos = IOSTATS(cpu_read_nanos); | |
7c673cae FG |
226 | } |
227 | ||
228 | // This will release and re-acquire the mutex. | |
229 | Status s = WriteLevel0Table(); | |
230 | ||
f67539c2 TL |
231 | if (s.ok() && cfd_->IsDropped()) { |
232 | s = Status::ColumnFamilyDropped("Column family dropped during compaction"); | |
233 | } | |
234 | if ((s.ok() || s.IsColumnFamilyDropped()) && | |
235 | shutting_down_->load(std::memory_order_acquire)) { | |
236 | s = Status::ShutdownInProgress("Database shutdown"); | |
7c673cae FG |
237 | } |
238 | ||
239 | if (!s.ok()) { | |
240 | cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber()); | |
494da23a | 241 | } else if (write_manifest_) { |
7c673cae FG |
242 | TEST_SYNC_POINT("FlushJob::InstallResults"); |
243 | // Replace immutable memtable with the generated Table | |
20effc67 | 244 | IOStatus tmp_io_s; |
494da23a | 245 | s = cfd_->imm()->TryInstallMemtableFlushResults( |
11fdf7f2 | 246 | cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_, |
7c673cae | 247 | meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_, |
20effc67 TL |
248 | log_buffer_, &committed_flush_jobs_info_, &tmp_io_s); |
249 | if (!tmp_io_s.ok()) { | |
250 | io_status_ = tmp_io_s; | |
251 | } | |
7c673cae FG |
252 | } |
253 | ||
254 | if (s.ok() && file_meta != nullptr) { | |
255 | *file_meta = meta_; | |
256 | } | |
257 | RecordFlushIOStats(); | |
258 | ||
f67539c2 TL |
259 | // When measure_io_stats_ is true, the default 512 bytes is not enough. |
260 | auto stream = event_logger_->LogToBuffer(log_buffer_, 1024); | |
7c673cae FG |
261 | stream << "job" << job_context_->job_id << "event" |
262 | << "flush_finished"; | |
11fdf7f2 TL |
263 | stream << "output_compression" |
264 | << CompressionTypeToString(output_compression_); | |
7c673cae FG |
265 | stream << "lsm_state"; |
266 | stream.StartArray(); | |
267 | auto vstorage = cfd_->current()->storage_info(); | |
268 | for (int level = 0; level < vstorage->num_levels(); ++level) { | |
269 | stream << vstorage->NumLevelFiles(level); | |
270 | } | |
271 | stream.EndArray(); | |
20effc67 TL |
272 | |
273 | const auto& blob_files = vstorage->GetBlobFiles(); | |
274 | if (!blob_files.empty()) { | |
275 | stream << "blob_file_head" << blob_files.begin()->first; | |
276 | stream << "blob_file_tail" << blob_files.rbegin()->first; | |
277 | } | |
278 | ||
7c673cae FG |
279 | stream << "immutable_memtables" << cfd_->imm()->NumNotFlushed(); |
280 | ||
281 | if (measure_io_stats_) { | |
282 | if (prev_perf_level != PerfLevel::kEnableTime) { | |
283 | SetPerfLevel(prev_perf_level); | |
284 | } | |
285 | stream << "file_write_nanos" << (IOSTATS(write_nanos) - prev_write_nanos); | |
286 | stream << "file_range_sync_nanos" | |
287 | << (IOSTATS(range_sync_nanos) - prev_range_sync_nanos); | |
288 | stream << "file_fsync_nanos" << (IOSTATS(fsync_nanos) - prev_fsync_nanos); | |
289 | stream << "file_prepare_write_nanos" | |
290 | << (IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos); | |
494da23a TL |
291 | stream << "file_cpu_write_nanos" |
292 | << (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos); | |
293 | stream << "file_cpu_read_nanos" | |
294 | << (IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos); | |
7c673cae FG |
295 | } |
296 | ||
297 | return s; | |
298 | } | |
299 | ||
300 | void FlushJob::Cancel() { | |
301 | db_mutex_->AssertHeld(); | |
302 | assert(base_ != nullptr); | |
303 | base_->Unref(); | |
304 | } | |
305 | ||
306 | Status FlushJob::WriteLevel0Table() { | |
307 | AutoThreadOperationStageUpdater stage_updater( | |
308 | ThreadStatus::STAGE_FLUSH_WRITE_L0); | |
309 | db_mutex_->AssertHeld(); | |
310 | const uint64_t start_micros = db_options_.env->NowMicros(); | |
494da23a | 311 | const uint64_t start_cpu_micros = db_options_.env->NowCPUNanos() / 1000; |
7c673cae | 312 | Status s; |
20effc67 TL |
313 | |
314 | std::vector<BlobFileAddition> blob_file_additions; | |
315 | ||
7c673cae | 316 | { |
11fdf7f2 | 317 | auto write_hint = cfd_->CalculateSSTWriteHint(0); |
7c673cae FG |
318 | db_mutex_->Unlock(); |
319 | if (log_buffer_) { | |
320 | log_buffer_->FlushBufferToLog(); | |
321 | } | |
322 | // memtables and range_del_iters store internal iterators over each data | |
323 | // memtable and its associated range deletion memtable, respectively, at | |
324 | // corresponding indexes. | |
325 | std::vector<InternalIterator*> memtables; | |
494da23a TL |
326 | std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>> |
327 | range_del_iters; | |
7c673cae FG |
328 | ReadOptions ro; |
329 | ro.total_order_seek = true; | |
330 | Arena arena; | |
331 | uint64_t total_num_entries = 0, total_num_deletes = 0; | |
494da23a | 332 | uint64_t total_data_size = 0; |
7c673cae FG |
333 | size_t total_memory_usage = 0; |
334 | for (MemTable* m : mems_) { | |
335 | ROCKS_LOG_INFO( | |
336 | db_options_.info_log, | |
337 | "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n", | |
338 | cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber()); | |
339 | memtables.push_back(m->NewIterator(ro, &arena)); | |
494da23a TL |
340 | auto* range_del_iter = |
341 | m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber); | |
7c673cae | 342 | if (range_del_iter != nullptr) { |
494da23a | 343 | range_del_iters.emplace_back(range_del_iter); |
7c673cae FG |
344 | } |
345 | total_num_entries += m->num_entries(); | |
346 | total_num_deletes += m->num_deletes(); | |
494da23a | 347 | total_data_size += m->get_data_size(); |
7c673cae FG |
348 | total_memory_usage += m->ApproximateMemoryUsage(); |
349 | } | |
350 | ||
494da23a TL |
351 | event_logger_->Log() << "job" << job_context_->job_id << "event" |
352 | << "flush_started" | |
353 | << "num_memtables" << mems_.size() << "num_entries" | |
354 | << total_num_entries << "num_deletes" | |
355 | << total_num_deletes << "total_data_size" | |
356 | << total_data_size << "memory_usage" | |
357 | << total_memory_usage << "flush_reason" | |
358 | << GetFlushReasonString(cfd_->GetFlushReason()); | |
7c673cae FG |
359 | |
360 | { | |
361 | ScopedArenaIterator iter( | |
362 | NewMergingIterator(&cfd_->internal_comparator(), &memtables[0], | |
363 | static_cast<int>(memtables.size()), &arena)); | |
7c673cae FG |
364 | ROCKS_LOG_INFO(db_options_.info_log, |
365 | "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started", | |
366 | cfd_->GetName().c_str(), job_context_->job_id, | |
367 | meta_.fd.GetNumber()); | |
368 | ||
369 | TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression", | |
370 | &output_compression_); | |
11fdf7f2 TL |
371 | int64_t _current_time = 0; |
372 | auto status = db_options_.env->GetCurrentTime(&_current_time); | |
373 | // Safe to proceed even if GetCurrentTime fails. So, log and proceed. | |
374 | if (!status.ok()) { | |
375 | ROCKS_LOG_WARN( | |
376 | db_options_.info_log, | |
377 | "Failed to get current time to populate creation_time property. " | |
378 | "Status: %s", | |
379 | status.ToString().c_str()); | |
380 | } | |
381 | const uint64_t current_time = static_cast<uint64_t>(_current_time); | |
382 | ||
383 | uint64_t oldest_key_time = | |
384 | mems_.front()->ApproximateOldestKeyTime(); | |
385 | ||
f67539c2 TL |
386 | // It's not clear whether oldest_key_time is always available. In case |
387 | // it is not available, use current_time. | |
20effc67 TL |
388 | uint64_t oldest_ancester_time = std::min(current_time, oldest_key_time); |
389 | ||
390 | TEST_SYNC_POINT_CALLBACK( | |
391 | "FlushJob::WriteLevel0Table:oldest_ancester_time", | |
392 | &oldest_ancester_time); | |
393 | meta_.oldest_ancester_time = oldest_ancester_time; | |
394 | ||
f67539c2 TL |
395 | meta_.file_creation_time = current_time; |
396 | ||
397 | uint64_t creation_time = (cfd_->ioptions()->compaction_style == | |
398 | CompactionStyle::kCompactionStyleFIFO) | |
399 | ? current_time | |
400 | : meta_.oldest_ancester_time; | |
401 | ||
20effc67 TL |
402 | IOStatus io_s; |
403 | const std::string* const full_history_ts_low = | |
404 | (full_history_ts_low_.empty()) ? nullptr : &full_history_ts_low_; | |
7c673cae | 405 | s = BuildTable( |
20effc67 | 406 | dbname_, versions_, db_options_, *cfd_->ioptions(), |
f67539c2 | 407 | mutable_cf_options_, file_options_, cfd_->table_cache(), iter.get(), |
20effc67 TL |
408 | std::move(range_del_iters), &meta_, &blob_file_additions, |
409 | cfd_->internal_comparator(), cfd_->int_tbl_prop_collector_factories(), | |
410 | cfd_->GetID(), cfd_->GetName(), existing_snapshots_, | |
11fdf7f2 | 411 | earliest_write_conflict_snapshot_, snapshot_checker_, |
494da23a | 412 | output_compression_, mutable_cf_options_.sample_for_compression, |
20effc67 | 413 | mutable_cf_options_.compression_opts, |
7c673cae | 414 | mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), |
20effc67 TL |
415 | TableFileCreationReason::kFlush, &io_s, io_tracer_, event_logger_, |
416 | job_context_->job_id, Env::IO_HIGH, &table_properties_, 0 /* level */, | |
417 | creation_time, oldest_key_time, write_hint, current_time, db_id_, | |
418 | db_session_id_, full_history_ts_low); | |
419 | if (!io_s.ok()) { | |
420 | io_status_ = io_s; | |
421 | } | |
7c673cae FG |
422 | LogFlush(db_options_.info_log); |
423 | } | |
424 | ROCKS_LOG_INFO(db_options_.info_log, | |
425 | "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64 | |
426 | " bytes %s" | |
427 | "%s", | |
428 | cfd_->GetName().c_str(), job_context_->job_id, | |
429 | meta_.fd.GetNumber(), meta_.fd.GetFileSize(), | |
430 | s.ToString().c_str(), | |
431 | meta_.marked_for_compaction ? " (needs compaction)" : ""); | |
432 | ||
494da23a | 433 | if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) { |
20effc67 | 434 | s = output_file_directory_->Fsync(IOOptions(), nullptr); |
7c673cae | 435 | } |
f67539c2 | 436 | TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_); |
7c673cae FG |
437 | db_mutex_->Lock(); |
438 | } | |
439 | base_->Unref(); | |
440 | ||
441 | // Note that if file_size is zero, the file has been deleted and | |
442 | // should not be added to the manifest. | |
20effc67 TL |
443 | const bool has_output = meta_.fd.GetFileSize() > 0; |
444 | ||
445 | if (s.ok() && has_output) { | |
7c673cae FG |
446 | // if we have more than 1 background thread, then we cannot |
447 | // insert files directly into higher levels because some other | |
448 | // threads could be concurrently producing compacted files for | |
449 | // that key range. | |
450 | // Add file to L0 | |
451 | edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(), | |
452 | meta_.fd.GetFileSize(), meta_.smallest, meta_.largest, | |
11fdf7f2 | 453 | meta_.fd.smallest_seqno, meta_.fd.largest_seqno, |
f67539c2 TL |
454 | meta_.marked_for_compaction, meta_.oldest_blob_file_number, |
455 | meta_.oldest_ancester_time, meta_.file_creation_time, | |
456 | meta_.file_checksum, meta_.file_checksum_func_name); | |
20effc67 TL |
457 | |
458 | edit_->SetBlobFileAdditions(std::move(blob_file_additions)); | |
7c673cae | 459 | } |
f67539c2 TL |
460 | #ifndef ROCKSDB_LITE |
461 | // Piggyback FlushJobInfo on the first first flushed memtable. | |
462 | mems_[0]->SetFlushJobInfo(GetFlushJobInfo()); | |
463 | #endif // !ROCKSDB_LITE | |
7c673cae FG |
464 | |
465 | // Note that here we treat flush as level 0 compaction in internal stats | |
11fdf7f2 | 466 | InternalStats::CompactionStats stats(CompactionReason::kFlush, 1); |
7c673cae | 467 | stats.micros = db_options_.env->NowMicros() - start_micros; |
494da23a | 468 | stats.cpu_micros = db_options_.env->NowCPUNanos() / 1000 - start_cpu_micros; |
20effc67 TL |
469 | |
470 | if (has_output) { | |
471 | stats.bytes_written = meta_.fd.GetFileSize(); | |
472 | stats.num_output_files = 1; | |
473 | } | |
474 | ||
475 | const auto& blobs = edit_->GetBlobFileAdditions(); | |
476 | for (const auto& blob : blobs) { | |
477 | stats.bytes_written += blob.GetTotalBlobBytes(); | |
478 | } | |
479 | ||
480 | stats.num_output_files += static_cast<int>(blobs.size()); | |
481 | ||
494da23a TL |
482 | RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros); |
483 | cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats); | |
7c673cae | 484 | cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED, |
20effc67 | 485 | stats.bytes_written); |
7c673cae FG |
486 | RecordFlushIOStats(); |
487 | return s; | |
488 | } | |
489 | ||
f67539c2 TL |
490 | #ifndef ROCKSDB_LITE |
491 | std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const { | |
492 | db_mutex_->AssertHeld(); | |
493 | std::unique_ptr<FlushJobInfo> info(new FlushJobInfo{}); | |
494 | info->cf_id = cfd_->GetID(); | |
495 | info->cf_name = cfd_->GetName(); | |
496 | ||
497 | const uint64_t file_number = meta_.fd.GetNumber(); | |
498 | info->file_path = | |
499 | MakeTableFileName(cfd_->ioptions()->cf_paths[0].path, file_number); | |
500 | info->file_number = file_number; | |
501 | info->oldest_blob_file_number = meta_.oldest_blob_file_number; | |
502 | info->thread_id = db_options_.env->GetThreadID(); | |
503 | info->job_id = job_context_->job_id; | |
504 | info->smallest_seqno = meta_.fd.smallest_seqno; | |
505 | info->largest_seqno = meta_.fd.largest_seqno; | |
506 | info->table_properties = table_properties_; | |
507 | info->flush_reason = cfd_->GetFlushReason(); | |
508 | return info; | |
509 | } | |
510 | #endif // !ROCKSDB_LITE | |
511 | ||
512 | } // namespace ROCKSDB_NAMESPACE |