]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/db_impl_compaction_flush.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / db_impl_compaction_flush.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9#include "db/db_impl.h"
10
11#ifndef __STDC_FORMAT_MACROS
12#define __STDC_FORMAT_MACROS
13#endif
14#include <inttypes.h>
15
16#include "db/builder.h"
11fdf7f2
TL
17#include "db/error_handler.h"
18#include "db/event_helpers.h"
7c673cae
FG
19#include "monitoring/iostats_context_imp.h"
20#include "monitoring/perf_context_imp.h"
21#include "monitoring/thread_status_updater.h"
22#include "monitoring/thread_status_util.h"
494da23a 23#include "util/concurrent_task_limiter_impl.h"
7c673cae
FG
24#include "util/sst_file_manager_impl.h"
25#include "util/sync_point.h"
26
27namespace rocksdb {
11fdf7f2
TL
28
29bool DBImpl::EnoughRoomForCompaction(
30 ColumnFamilyData* cfd, const std::vector<CompactionInputFiles>& inputs,
31 bool* sfm_reserved_compact_space, LogBuffer* log_buffer) {
32 // Check if we have enough room to do the compaction
33 bool enough_room = true;
34#ifndef ROCKSDB_LITE
35 auto sfm = static_cast<SstFileManagerImpl*>(
36 immutable_db_options_.sst_file_manager.get());
37 if (sfm) {
38 // Pass the current bg_error_ to SFM so it can decide what checks to
39 // perform. If this DB instance hasn't seen any error yet, the SFM can be
40 // optimistic and not do disk space checks
41 enough_room =
42 sfm->EnoughRoomForCompaction(cfd, inputs, error_handler_.GetBGError());
43 if (enough_room) {
44 *sfm_reserved_compact_space = true;
45 }
46 }
47#else
48 (void)cfd;
49 (void)inputs;
50 (void)sfm_reserved_compact_space;
51#endif // ROCKSDB_LITE
52 if (!enough_room) {
53 // Just in case tests want to change the value of enough_room
54 TEST_SYNC_POINT_CALLBACK(
55 "DBImpl::BackgroundCompaction():CancelledCompaction", &enough_room);
56 ROCKS_LOG_BUFFER(log_buffer,
57 "Cancelled compaction because not enough room");
58 RecordTick(stats_, COMPACTION_CANCELLED, 1);
59 }
60 return enough_room;
61}
62
494da23a
TL
63bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force,
64 std::unique_ptr<TaskLimiterToken>* token,
65 LogBuffer* log_buffer) {
66 assert(*token == nullptr);
67 auto limiter = static_cast<ConcurrentTaskLimiterImpl*>(
68 cfd->ioptions()->compaction_thread_limiter.get());
69 if (limiter == nullptr) {
70 return true;
71 }
72 *token = limiter->GetToken(force);
73 if (*token != nullptr) {
74 ROCKS_LOG_BUFFER(log_buffer,
75 "Thread limiter [%s] increase [%s] compaction task, "
76 "force: %s, tasks after: %d",
77 limiter->GetName().c_str(), cfd->GetName().c_str(),
78 force ? "true" : "false", limiter->GetOutstandingTask());
79 return true;
80 }
81 return false;
82}
83
7c673cae
FG
84Status DBImpl::SyncClosedLogs(JobContext* job_context) {
85 TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
86 mutex_.AssertHeld();
87 autovector<log::Writer*, 1> logs_to_sync;
88 uint64_t current_log_number = logfile_number_;
89 while (logs_.front().number < current_log_number &&
90 logs_.front().getting_synced) {
91 log_sync_cv_.Wait();
92 }
93 for (auto it = logs_.begin();
94 it != logs_.end() && it->number < current_log_number; ++it) {
95 auto& log = *it;
96 assert(!log.getting_synced);
97 log.getting_synced = true;
98 logs_to_sync.push_back(log.writer);
99 }
100
101 Status s;
102 if (!logs_to_sync.empty()) {
103 mutex_.Unlock();
104
105 for (log::Writer* log : logs_to_sync) {
106 ROCKS_LOG_INFO(immutable_db_options_.info_log,
107 "[JOB %d] Syncing log #%" PRIu64, job_context->job_id,
108 log->get_log_number());
109 s = log->file()->Sync(immutable_db_options_.use_fsync);
11fdf7f2
TL
110 if (!s.ok()) {
111 break;
112 }
7c673cae
FG
113 }
114 if (s.ok()) {
115 s = directories_.GetWalDir()->Fsync();
116 }
117
118 mutex_.Lock();
119
120 // "number <= current_log_number - 1" is equivalent to
121 // "number < current_log_number".
122 MarkLogsSynced(current_log_number - 1, true, s);
123 if (!s.ok()) {
11fdf7f2 124 error_handler_.SetBGError(s, BackgroundErrorReason::kFlush);
7c673cae
FG
125 TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
126 return s;
127 }
128 }
129 return s;
130}
131
132Status DBImpl::FlushMemTableToOutputFile(
133 ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
11fdf7f2 134 bool* made_progress, JobContext* job_context,
494da23a
TL
135 SuperVersionContext* superversion_context,
136 std::vector<SequenceNumber>& snapshot_seqs,
137 SequenceNumber earliest_write_conflict_snapshot,
138 SnapshotChecker* snapshot_checker, LogBuffer* log_buffer,
139 Env::Priority thread_pri) {
7c673cae
FG
140 mutex_.AssertHeld();
141 assert(cfd->imm()->NumNotFlushed() != 0);
142 assert(cfd->imm()->IsFlushPending());
143
7c673cae 144 FlushJob flush_job(
11fdf7f2 145 dbname_, cfd, immutable_db_options_, mutable_cf_options,
494da23a
TL
146 nullptr /* memtable_id */, env_options_for_compaction_, versions_.get(),
147 &mutex_, &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot,
148 snapshot_checker, job_context, log_buffer, directories_.GetDbDir(),
149 GetDataDir(cfd, 0U),
7c673cae 150 GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
494da23a
TL
151 &event_logger_, mutable_cf_options.report_bg_io_stats,
152 true /* sync_output_directory */, true /* write_manifest */, thread_pri);
7c673cae
FG
153
154 FileMetaData file_meta;
155
494da23a 156 TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables");
7c673cae 157 flush_job.PickMemTable();
494da23a 158 TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:AfterPickMemtables");
7c673cae
FG
159
160#ifndef ROCKSDB_LITE
161 // may temporarily unlock and lock the mutex.
162 NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id,
163 flush_job.GetTableProperties());
164#endif // ROCKSDB_LITE
165
166 Status s;
167 if (logfile_number_ > 0 &&
494da23a 168 versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1) {
7c673cae
FG
169 // If there are more than one column families, we need to make sure that
170 // all the log files except the most recent one are synced. Otherwise if
171 // the host crashes after flushing and before WAL is persistent, the
172 // flushed SST may contain data from write batches whose updates to
173 // other column families are missing.
174 // SyncClosedLogs() may unlock and re-lock the db_mutex.
175 s = SyncClosedLogs(job_context);
494da23a
TL
176 } else {
177 TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Skip");
7c673cae
FG
178 }
179
180 // Within flush_job.Run, rocksdb may call event listener to notify
181 // file creation and deletion.
182 //
183 // Note that flush_job.Run will unlock and lock the db_mutex,
184 // and EventListener callback will be called when the db_mutex
185 // is unlocked by the current thread.
186 if (s.ok()) {
11fdf7f2 187 s = flush_job.Run(&logs_with_prep_tracker_, &file_meta);
7c673cae
FG
188 } else {
189 flush_job.Cancel();
190 }
191
192 if (s.ok()) {
11fdf7f2
TL
193 InstallSuperVersionAndScheduleWork(cfd, superversion_context,
194 mutable_cf_options);
7c673cae 195 if (made_progress) {
494da23a 196 *made_progress = true;
7c673cae
FG
197 }
198 VersionStorageInfo::LevelSummaryStorage tmp;
199 ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
200 cfd->GetName().c_str(),
201 cfd->current()->storage_info()->LevelSummary(&tmp));
202 }
203
11fdf7f2
TL
204 if (!s.ok() && !s.IsShutdownInProgress()) {
205 Status new_bg_error = s;
206 error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
7c673cae
FG
207 }
208 if (s.ok()) {
209#ifndef ROCKSDB_LITE
210 // may temporarily unlock and lock the mutex.
211 NotifyOnFlushCompleted(cfd, &file_meta, mutable_cf_options,
212 job_context->job_id, flush_job.GetTableProperties());
213 auto sfm = static_cast<SstFileManagerImpl*>(
214 immutable_db_options_.sst_file_manager.get());
215 if (sfm) {
216 // Notify sst_file_manager that a new file was added
217 std::string file_path = MakeTableFileName(
11fdf7f2 218 cfd->ioptions()->cf_paths[0].path, file_meta.fd.GetNumber());
7c673cae 219 sfm->OnAddFile(file_path);
11fdf7f2 220 if (sfm->IsMaxAllowedSpaceReached()) {
494da23a
TL
221 Status new_bg_error =
222 Status::SpaceLimit("Max allowed space was reached");
7c673cae
FG
223 TEST_SYNC_POINT_CALLBACK(
224 "DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached",
11fdf7f2
TL
225 &new_bg_error);
226 error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
7c673cae
FG
227 }
228 }
229#endif // ROCKSDB_LITE
230 }
231 return s;
232}
233
11fdf7f2
TL
234Status DBImpl::FlushMemTablesToOutputFiles(
235 const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
494da23a
TL
236 JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) {
237 if (immutable_db_options_.atomic_flush) {
238 return AtomicFlushMemTablesToOutputFiles(
239 bg_flush_args, made_progress, job_context, log_buffer, thread_pri);
240 }
241 std::vector<SequenceNumber> snapshot_seqs;
242 SequenceNumber earliest_write_conflict_snapshot;
243 SnapshotChecker* snapshot_checker;
244 GetSnapshotContext(job_context, &snapshot_seqs,
245 &earliest_write_conflict_snapshot, &snapshot_checker);
246 Status status;
11fdf7f2
TL
247 for (auto& arg : bg_flush_args) {
248 ColumnFamilyData* cfd = arg.cfd_;
494da23a 249 MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
11fdf7f2 250 SuperVersionContext* superversion_context = arg.superversion_context_;
494da23a
TL
251 Status s = FlushMemTableToOutputFile(
252 cfd, mutable_cf_options, made_progress, job_context,
253 superversion_context, snapshot_seqs, earliest_write_conflict_snapshot,
254 snapshot_checker, log_buffer, thread_pri);
11fdf7f2 255 if (!s.ok()) {
494da23a
TL
256 status = s;
257 if (!s.IsShutdownInProgress()) {
258 // At this point, DB is not shutting down, nor is cfd dropped.
259 // Something is wrong, thus we break out of the loop.
260 break;
261 }
262 }
263 }
264 return status;
265}
266
267/*
268 * Atomically flushes multiple column families.
269 *
270 * For each column family, all memtables with ID smaller than or equal to the
271 * ID specified in bg_flush_args will be flushed. Only after all column
272 * families finish flush will this function commit to MANIFEST. If any of the
273 * column families are not flushed successfully, this function does not have
274 * any side-effect on the state of the database.
275 */
276Status DBImpl::AtomicFlushMemTablesToOutputFiles(
277 const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
278 JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) {
279 mutex_.AssertHeld();
280
281 autovector<ColumnFamilyData*> cfds;
282 for (const auto& arg : bg_flush_args) {
283 cfds.emplace_back(arg.cfd_);
284 }
285
286#ifndef NDEBUG
287 for (const auto cfd : cfds) {
288 assert(cfd->imm()->NumNotFlushed() != 0);
289 assert(cfd->imm()->IsFlushPending());
290 }
291#endif /* !NDEBUG */
292
293 std::vector<SequenceNumber> snapshot_seqs;
294 SequenceNumber earliest_write_conflict_snapshot;
295 SnapshotChecker* snapshot_checker;
296 GetSnapshotContext(job_context, &snapshot_seqs,
297 &earliest_write_conflict_snapshot, &snapshot_checker);
298
299 autovector<Directory*> distinct_output_dirs;
300 autovector<std::string> distinct_output_dir_paths;
301 std::vector<FlushJob> jobs;
302 std::vector<MutableCFOptions> all_mutable_cf_options;
303 int num_cfs = static_cast<int>(cfds.size());
304 all_mutable_cf_options.reserve(num_cfs);
305 for (int i = 0; i < num_cfs; ++i) {
306 auto cfd = cfds[i];
307 Directory* data_dir = GetDataDir(cfd, 0U);
308 const std::string& curr_path = cfd->ioptions()->cf_paths[0].path;
309
310 // Add to distinct output directories if eligible. Use linear search. Since
311 // the number of elements in the vector is not large, performance should be
312 // tolerable.
313 bool found = false;
314 for (const auto& path : distinct_output_dir_paths) {
315 if (path == curr_path) {
316 found = true;
317 break;
318 }
319 }
320 if (!found) {
321 distinct_output_dir_paths.emplace_back(curr_path);
322 distinct_output_dirs.emplace_back(data_dir);
323 }
324
325 all_mutable_cf_options.emplace_back(*cfd->GetLatestMutableCFOptions());
326 const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.back();
327 const uint64_t* max_memtable_id = &(bg_flush_args[i].max_memtable_id_);
328 jobs.emplace_back(
329 dbname_, cfd, immutable_db_options_, mutable_cf_options,
330 max_memtable_id, env_options_for_compaction_, versions_.get(), &mutex_,
331 &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot,
332 snapshot_checker, job_context, log_buffer, directories_.GetDbDir(),
333 data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
334 stats_, &event_logger_, mutable_cf_options.report_bg_io_stats,
335 false /* sync_output_directory */, false /* write_manifest */,
336 thread_pri);
337 jobs.back().PickMemTable();
338 }
339
340 std::vector<FileMetaData> file_meta(num_cfs);
341 Status s;
342 assert(num_cfs == static_cast<int>(jobs.size()));
343
344#ifndef ROCKSDB_LITE
345 for (int i = 0; i != num_cfs; ++i) {
346 const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.at(i);
347 // may temporarily unlock and lock the mutex.
348 NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options,
349 job_context->job_id, jobs[i].GetTableProperties());
350 }
351#endif /* !ROCKSDB_LITE */
352
353 if (logfile_number_ > 0) {
354 // TODO (yanqin) investigate whether we should sync the closed logs for
355 // single column family case.
356 s = SyncClosedLogs(job_context);
357 }
358
359 // exec_status stores the execution status of flush_jobs as
360 // <bool /* executed */, Status /* status code */>
361 autovector<std::pair<bool, Status>> exec_status;
362 for (int i = 0; i != num_cfs; ++i) {
363 // Initially all jobs are not executed, with status OK.
364 exec_status.emplace_back(false, Status::OK());
365 }
366
367 if (s.ok()) {
368 // TODO (yanqin): parallelize jobs with threads.
369 for (int i = 1; i != num_cfs; ++i) {
370 exec_status[i].second =
371 jobs[i].Run(&logs_with_prep_tracker_, &file_meta[i]);
372 exec_status[i].first = true;
373 }
374 if (num_cfs > 1) {
375 TEST_SYNC_POINT(
376 "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:1");
377 TEST_SYNC_POINT(
378 "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2");
379 }
380 exec_status[0].second =
381 jobs[0].Run(&logs_with_prep_tracker_, &file_meta[0]);
382 exec_status[0].first = true;
383
384 Status error_status;
385 for (const auto& e : exec_status) {
386 if (!e.second.ok()) {
387 s = e.second;
388 if (!e.second.IsShutdownInProgress()) {
389 // If a flush job did not return OK, and the CF is not dropped, and
390 // the DB is not shutting down, then we have to return this result to
391 // caller later.
392 error_status = e.second;
393 }
394 }
395 }
396
397 s = error_status.ok() ? s : error_status;
398 }
399
400 if (s.ok() || s.IsShutdownInProgress()) {
401 // Sync on all distinct output directories.
402 for (auto dir : distinct_output_dirs) {
403 if (dir != nullptr) {
404 s = dir->Fsync();
405 if (!s.ok()) {
406 break;
407 }
408 }
409 }
410 }
411
412 if (s.ok()) {
413 auto wait_to_install_func = [&]() {
414 bool ready = true;
415 for (size_t i = 0; i != cfds.size(); ++i) {
416 const auto& mems = jobs[i].GetMemTables();
417 if (cfds[i]->IsDropped()) {
418 // If the column family is dropped, then do not wait.
419 continue;
420 } else if (!mems.empty() &&
421 cfds[i]->imm()->GetEarliestMemTableID() < mems[0]->GetID()) {
422 // If a flush job needs to install the flush result for mems and
423 // mems[0] is not the earliest memtable, it means another thread must
424 // be installing flush results for the same column family, then the
425 // current thread needs to wait.
426 ready = false;
427 break;
428 } else if (mems.empty() && cfds[i]->imm()->GetEarliestMemTableID() <=
429 bg_flush_args[i].max_memtable_id_) {
430 // If a flush job does not need to install flush results, then it has
431 // to wait until all memtables up to max_memtable_id_ (inclusive) are
432 // installed.
433 ready = false;
434 break;
435 }
436 }
437 return ready;
438 };
439
440 bool resuming_from_bg_err = error_handler_.IsDBStopped();
441 while ((!error_handler_.IsDBStopped() ||
442 error_handler_.GetRecoveryError().ok()) &&
443 !wait_to_install_func()) {
444 atomic_flush_install_cv_.Wait();
445 }
446
447 s = resuming_from_bg_err ? error_handler_.GetRecoveryError()
448 : error_handler_.GetBGError();
449 }
450
451 if (s.ok()) {
452 autovector<ColumnFamilyData*> tmp_cfds;
453 autovector<const autovector<MemTable*>*> mems_list;
454 autovector<const MutableCFOptions*> mutable_cf_options_list;
455 autovector<FileMetaData*> tmp_file_meta;
456 for (int i = 0; i != num_cfs; ++i) {
457 const auto& mems = jobs[i].GetMemTables();
458 if (!cfds[i]->IsDropped() && !mems.empty()) {
459 tmp_cfds.emplace_back(cfds[i]);
460 mems_list.emplace_back(&mems);
461 mutable_cf_options_list.emplace_back(&all_mutable_cf_options[i]);
462 tmp_file_meta.emplace_back(&file_meta[i]);
463 }
464 }
465
466 s = InstallMemtableAtomicFlushResults(
467 nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list,
468 versions_.get(), &mutex_, tmp_file_meta,
469 &job_context->memtables_to_free, directories_.GetDbDir(), log_buffer);
470 }
471
472 if (s.ok() || s.IsShutdownInProgress()) {
473 assert(num_cfs ==
474 static_cast<int>(job_context->superversion_contexts.size()));
475 for (int i = 0; i != num_cfs; ++i) {
476 if (cfds[i]->IsDropped()) {
477 continue;
478 }
479 InstallSuperVersionAndScheduleWork(cfds[i],
480 &job_context->superversion_contexts[i],
481 all_mutable_cf_options[i]);
482 VersionStorageInfo::LevelSummaryStorage tmp;
483 ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
484 cfds[i]->GetName().c_str(),
485 cfds[i]->current()->storage_info()->LevelSummary(&tmp));
486 }
487 if (made_progress) {
488 *made_progress = true;
489 }
490#ifndef ROCKSDB_LITE
491 auto sfm = static_cast<SstFileManagerImpl*>(
492 immutable_db_options_.sst_file_manager.get());
493 for (int i = 0; i != num_cfs; ++i) {
494 if (cfds[i]->IsDropped()) {
495 continue;
496 }
497 NotifyOnFlushCompleted(cfds[i], &file_meta[i], all_mutable_cf_options[i],
498 job_context->job_id, jobs[i].GetTableProperties());
499 if (sfm) {
500 std::string file_path = MakeTableFileName(
501 cfds[i]->ioptions()->cf_paths[0].path, file_meta[i].fd.GetNumber());
502 sfm->OnAddFile(file_path);
503 if (sfm->IsMaxAllowedSpaceReached() &&
504 error_handler_.GetBGError().ok()) {
505 Status new_bg_error =
506 Status::SpaceLimit("Max allowed space was reached");
507 error_handler_.SetBGError(new_bg_error,
508 BackgroundErrorReason::kFlush);
509 }
510 }
11fdf7f2 511 }
494da23a
TL
512#endif // ROCKSDB_LITE
513 }
514
515 // Need to undo atomic flush if something went wrong, i.e. s is not OK and
516 // it is not because of CF drop.
517 if (!s.ok() && !s.IsShutdownInProgress()) {
518 // Have to cancel the flush jobs that have NOT executed because we need to
519 // unref the versions.
520 for (int i = 0; i != num_cfs; ++i) {
521 if (!exec_status[i].first) {
522 jobs[i].Cancel();
523 }
524 }
525 for (int i = 0; i != num_cfs; ++i) {
526 if (exec_status[i].first && exec_status[i].second.ok()) {
527 auto& mems = jobs[i].GetMemTables();
528 cfds[i]->imm()->RollbackMemtableFlush(mems,
529 file_meta[i].fd.GetNumber());
530 }
531 }
532 Status new_bg_error = s;
533 error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
11fdf7f2 534 }
494da23a 535
11fdf7f2
TL
536 return s;
537}
538
7c673cae
FG
539void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
540 const MutableCFOptions& mutable_cf_options,
541 int job_id, TableProperties prop) {
542#ifndef ROCKSDB_LITE
543 if (immutable_db_options_.listeners.size() == 0U) {
544 return;
545 }
546 mutex_.AssertHeld();
547 if (shutting_down_.load(std::memory_order_acquire)) {
548 return;
549 }
550 bool triggered_writes_slowdown =
551 (cfd->current()->storage_info()->NumLevelFiles(0) >=
552 mutable_cf_options.level0_slowdown_writes_trigger);
553 bool triggered_writes_stop =
554 (cfd->current()->storage_info()->NumLevelFiles(0) >=
555 mutable_cf_options.level0_stop_writes_trigger);
556 // release lock while notifying events
557 mutex_.Unlock();
558 {
559 FlushJobInfo info;
494da23a 560 info.cf_id = cfd->GetID();
7c673cae
FG
561 info.cf_name = cfd->GetName();
562 // TODO(yhchiang): make db_paths dynamic in case flush does not
563 // go to L0 in the future.
11fdf7f2 564 info.file_path = MakeTableFileName(cfd->ioptions()->cf_paths[0].path,
7c673cae
FG
565 file_meta->fd.GetNumber());
566 info.thread_id = env_->GetThreadID();
567 info.job_id = job_id;
568 info.triggered_writes_slowdown = triggered_writes_slowdown;
569 info.triggered_writes_stop = triggered_writes_stop;
11fdf7f2
TL
570 info.smallest_seqno = file_meta->fd.smallest_seqno;
571 info.largest_seqno = file_meta->fd.largest_seqno;
7c673cae 572 info.table_properties = prop;
11fdf7f2 573 info.flush_reason = cfd->GetFlushReason();
7c673cae
FG
574 for (auto listener : immutable_db_options_.listeners) {
575 listener->OnFlushBegin(this, info);
576 }
577 }
578 mutex_.Lock();
579// no need to signal bg_cv_ as it will be signaled at the end of the
580// flush process.
11fdf7f2
TL
581#else
582 (void)cfd;
583 (void)file_meta;
584 (void)mutable_cf_options;
585 (void)job_id;
586 (void)prop;
7c673cae
FG
587#endif // ROCKSDB_LITE
588}
589
590void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd,
591 FileMetaData* file_meta,
592 const MutableCFOptions& mutable_cf_options,
593 int job_id, TableProperties prop) {
594#ifndef ROCKSDB_LITE
595 if (immutable_db_options_.listeners.size() == 0U) {
596 return;
597 }
598 mutex_.AssertHeld();
599 if (shutting_down_.load(std::memory_order_acquire)) {
600 return;
601 }
602 bool triggered_writes_slowdown =
603 (cfd->current()->storage_info()->NumLevelFiles(0) >=
604 mutable_cf_options.level0_slowdown_writes_trigger);
605 bool triggered_writes_stop =
606 (cfd->current()->storage_info()->NumLevelFiles(0) >=
607 mutable_cf_options.level0_stop_writes_trigger);
608 // release lock while notifying events
609 mutex_.Unlock();
610 {
611 FlushJobInfo info;
494da23a 612 info.cf_id = cfd->GetID();
7c673cae
FG
613 info.cf_name = cfd->GetName();
614 // TODO(yhchiang): make db_paths dynamic in case flush does not
615 // go to L0 in the future.
11fdf7f2 616 info.file_path = MakeTableFileName(cfd->ioptions()->cf_paths[0].path,
7c673cae
FG
617 file_meta->fd.GetNumber());
618 info.thread_id = env_->GetThreadID();
619 info.job_id = job_id;
620 info.triggered_writes_slowdown = triggered_writes_slowdown;
621 info.triggered_writes_stop = triggered_writes_stop;
11fdf7f2
TL
622 info.smallest_seqno = file_meta->fd.smallest_seqno;
623 info.largest_seqno = file_meta->fd.largest_seqno;
7c673cae 624 info.table_properties = prop;
11fdf7f2 625 info.flush_reason = cfd->GetFlushReason();
7c673cae
FG
626 for (auto listener : immutable_db_options_.listeners) {
627 listener->OnFlushCompleted(this, info);
628 }
629 }
630 mutex_.Lock();
631 // no need to signal bg_cv_ as it will be signaled at the end of the
632 // flush process.
11fdf7f2
TL
633#else
634 (void)cfd;
635 (void)file_meta;
636 (void)mutable_cf_options;
637 (void)job_id;
638 (void)prop;
7c673cae
FG
639#endif // ROCKSDB_LITE
640}
641
642Status DBImpl::CompactRange(const CompactRangeOptions& options,
643 ColumnFamilyHandle* column_family,
644 const Slice* begin, const Slice* end) {
11fdf7f2
TL
645 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
646 auto cfd = cfh->cfd();
647
648 if (options.target_path_id >= cfd->ioptions()->cf_paths.size()) {
7c673cae
FG
649 return Status::InvalidArgument("Invalid target path ID");
650 }
651
7c673cae
FG
652 bool exclusive = options.exclusive_manual_compaction;
653
11fdf7f2
TL
654 bool flush_needed = true;
655 if (begin != nullptr && end != nullptr) {
656 // TODO(ajkr): We could also optimize away the flush in certain cases where
657 // one/both sides of the interval are unbounded. But it requires more
658 // changes to RangesOverlapWithMemtables.
659 Range range(*begin, *end);
660 SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
661 cfd->RangesOverlapWithMemtables({range}, super_version, &flush_needed);
662 CleanupSuperVersion(super_version);
663 }
664
665 Status s;
666 if (flush_needed) {
667 FlushOptions fo;
668 fo.allow_write_stall = options.allow_write_stall;
494da23a
TL
669 if (immutable_db_options_.atomic_flush) {
670 autovector<ColumnFamilyData*> cfds;
671 mutex_.Lock();
672 SelectColumnFamiliesForAtomicFlush(&cfds);
673 mutex_.Unlock();
674 s = AtomicFlushMemTables(cfds, fo, FlushReason::kManualCompaction,
675 false /* writes_stopped */);
676 } else {
677 s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction,
678 false /* writes_stopped*/);
679 }
11fdf7f2
TL
680 if (!s.ok()) {
681 LogFlush(immutable_db_options_.info_log);
682 return s;
683 }
7c673cae
FG
684 }
685
686 int max_level_with_files = 0;
687 {
688 InstrumentedMutexLock l(&mutex_);
689 Version* base = cfd->current();
690 for (int level = 1; level < base->storage_info()->num_non_empty_levels();
691 level++) {
692 if (base->storage_info()->OverlapInLevel(level, begin, end)) {
693 max_level_with_files = level;
694 }
695 }
696 }
697
698 int final_output_level = 0;
699 if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal &&
700 cfd->NumberLevels() > 1) {
701 // Always compact all files together.
7c673cae 702 final_output_level = cfd->NumberLevels() - 1;
11fdf7f2
TL
703 // if bottom most level is reserved
704 if (immutable_db_options_.allow_ingest_behind) {
705 final_output_level--;
706 }
707 s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
708 final_output_level, options.target_path_id,
709 options.max_subcompactions, begin, end, exclusive);
7c673cae
FG
710 } else {
711 for (int level = 0; level <= max_level_with_files; level++) {
712 int output_level;
713 // in case the compaction is universal or if we're compacting the
714 // bottom-most level, the output level will be the same as input one.
715 // level 0 can never be the bottommost level (i.e. if all files are in
716 // level 0, we will compact to level 1)
717 if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
718 cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
719 output_level = level;
720 } else if (level == max_level_with_files && level > 0) {
721 if (options.bottommost_level_compaction ==
722 BottommostLevelCompaction::kSkip) {
723 // Skip bottommost level compaction
724 continue;
725 } else if (options.bottommost_level_compaction ==
726 BottommostLevelCompaction::kIfHaveCompactionFilter &&
727 cfd->ioptions()->compaction_filter == nullptr &&
728 cfd->ioptions()->compaction_filter_factory == nullptr) {
729 // Skip bottommost level compaction since we don't have a compaction
730 // filter
731 continue;
732 }
733 output_level = level;
734 } else {
735 output_level = level + 1;
736 if (cfd->ioptions()->compaction_style == kCompactionStyleLevel &&
737 cfd->ioptions()->level_compaction_dynamic_level_bytes &&
738 level == 0) {
739 output_level = ColumnFamilyData::kCompactToBaseLevel;
740 }
741 }
742 s = RunManualCompaction(cfd, level, output_level, options.target_path_id,
494da23a
TL
743 options.max_subcompactions, begin, end,
744 exclusive);
7c673cae
FG
745 if (!s.ok()) {
746 break;
747 }
748 if (output_level == ColumnFamilyData::kCompactToBaseLevel) {
749 final_output_level = cfd->NumberLevels() - 1;
750 } else if (output_level > final_output_level) {
751 final_output_level = output_level;
752 }
753 TEST_SYNC_POINT("DBImpl::RunManualCompaction()::1");
754 TEST_SYNC_POINT("DBImpl::RunManualCompaction()::2");
755 }
756 }
757 if (!s.ok()) {
758 LogFlush(immutable_db_options_.info_log);
759 return s;
760 }
761
762 if (options.change_level) {
763 ROCKS_LOG_INFO(immutable_db_options_.info_log,
764 "[RefitLevel] waiting for background threads to stop");
765 s = PauseBackgroundWork();
766 if (s.ok()) {
767 s = ReFitLevel(cfd, final_output_level, options.target_level);
768 }
769 ContinueBackgroundWork();
770 }
771 LogFlush(immutable_db_options_.info_log);
772
773 {
774 InstrumentedMutexLock l(&mutex_);
775 // an automatic compaction that has been scheduled might have been
776 // preempted by the manual compactions. Need to schedule it back.
777 MaybeScheduleFlushOrCompaction();
778 }
779
780 return s;
781}
782
11fdf7f2
TL
783Status DBImpl::CompactFiles(const CompactionOptions& compact_options,
784 ColumnFamilyHandle* column_family,
785 const std::vector<std::string>& input_file_names,
786 const int output_level, const int output_path_id,
494da23a
TL
787 std::vector<std::string>* const output_file_names,
788 CompactionJobInfo* compaction_job_info) {
7c673cae 789#ifdef ROCKSDB_LITE
11fdf7f2
TL
790 (void)compact_options;
791 (void)column_family;
792 (void)input_file_names;
793 (void)output_level;
794 (void)output_path_id;
795 (void)output_file_names;
494da23a 796 (void)compaction_job_info;
11fdf7f2 797 // not supported in lite version
7c673cae
FG
798 return Status::NotSupported("Not supported in ROCKSDB LITE");
799#else
800 if (column_family == nullptr) {
801 return Status::InvalidArgument("ColumnFamilyHandle must be non-null.");
802 }
803
804 auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
805 assert(cfd);
806
807 Status s;
808 JobContext job_context(0, true);
809 LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
810 immutable_db_options_.info_log.get());
811
812 // Perform CompactFiles
494da23a 813 TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile2");
7c673cae
FG
814 {
815 InstrumentedMutexLock l(&mutex_);
816
817 // This call will unlock/lock the mutex to wait for current running
818 // IngestExternalFile() calls to finish.
819 WaitForIngestFile();
820
494da23a
TL
821 // We need to get current after `WaitForIngestFile`, because
822 // `IngestExternalFile` may add files that overlap with `input_file_names`
823 auto* current = cfd->current();
824 current->Ref();
825
826 s = CompactFilesImpl(compact_options, cfd, current, input_file_names,
11fdf7f2 827 output_file_names, output_level, output_path_id,
494da23a
TL
828 &job_context, &log_buffer, compaction_job_info);
829
830 current->Unref();
7c673cae
FG
831 }
832
833 // Find and delete obsolete files
834 {
835 InstrumentedMutexLock l(&mutex_);
836 // If !s.ok(), this means that Compaction failed. In that case, we want
837 // to delete all obsolete files we might have created and we force
838 // FindObsoleteFiles(). This is because job_context does not
839 // catch all created files if compaction failed.
840 FindObsoleteFiles(&job_context, !s.ok());
841 } // release the mutex
842
843 // delete unnecessary files if any, this is done outside the mutex
11fdf7f2
TL
844 if (job_context.HaveSomethingToClean() ||
845 job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
7c673cae
FG
846 // Have to flush the info logs before bg_compaction_scheduled_--
847 // because if bg_flush_scheduled_ becomes 0 and the lock is
848 // released, the deconstructor of DB can kick in and destroy all the
849 // states of DB so info_log might not be available after that point.
850 // It also applies to access other states that DB owns.
851 log_buffer.FlushBufferToLog();
852 if (job_context.HaveSomethingToDelete()) {
853 // no mutex is locked here. No need to Unlock() and Lock() here.
854 PurgeObsoleteFiles(job_context);
855 }
856 job_context.Clean();
857 }
858
859 return s;
860#endif // ROCKSDB_LITE
861}
862
863#ifndef ROCKSDB_LITE
864Status DBImpl::CompactFilesImpl(
865 const CompactionOptions& compact_options, ColumnFamilyData* cfd,
866 Version* version, const std::vector<std::string>& input_file_names,
11fdf7f2 867 std::vector<std::string>* const output_file_names, const int output_level,
494da23a
TL
868 int output_path_id, JobContext* job_context, LogBuffer* log_buffer,
869 CompactionJobInfo* compaction_job_info) {
7c673cae
FG
870 mutex_.AssertHeld();
871
872 if (shutting_down_.load(std::memory_order_acquire)) {
873 return Status::ShutdownInProgress();
874 }
875
876 std::unordered_set<uint64_t> input_set;
494da23a 877 for (const auto& file_name : input_file_names) {
7c673cae
FG
878 input_set.insert(TableFileNameToNumber(file_name));
879 }
880
881 ColumnFamilyMetaData cf_meta;
882 // TODO(yhchiang): can directly use version here if none of the
883 // following functions call is pluggable to external developers.
884 version->GetColumnFamilyMetaData(&cf_meta);
885
886 if (output_path_id < 0) {
11fdf7f2 887 if (cfd->ioptions()->cf_paths.size() == 1U) {
7c673cae
FG
888 output_path_id = 0;
889 } else {
890 return Status::NotSupported(
891 "Automatic output path selection is not "
892 "yet supported in CompactFiles()");
893 }
894 }
895
896 Status s = cfd->compaction_picker()->SanitizeCompactionInputFiles(
897 &input_set, cf_meta, output_level);
898 if (!s.ok()) {
899 return s;
900 }
901
902 std::vector<CompactionInputFiles> input_files;
903 s = cfd->compaction_picker()->GetCompactionInputsFromFileNumbers(
904 &input_files, &input_set, version->storage_info(), compact_options);
905 if (!s.ok()) {
906 return s;
907 }
908
494da23a 909 for (const auto& inputs : input_files) {
7c673cae
FG
910 if (cfd->compaction_picker()->AreFilesInCompaction(inputs.files)) {
911 return Status::Aborted(
912 "Some of the necessary compaction input "
913 "files are already being compacted");
914 }
915 }
11fdf7f2
TL
916 bool sfm_reserved_compact_space = false;
917 // First check if we have enough room to do the compaction
918 bool enough_room = EnoughRoomForCompaction(
919 cfd, input_files, &sfm_reserved_compact_space, log_buffer);
920
921 if (!enough_room) {
922 // m's vars will get set properly at the end of this function,
923 // as long as status == CompactionTooLarge
924 return Status::CompactionTooLarge();
925 }
7c673cae
FG
926
927 // At this point, CompactFiles will be run.
928 bg_compaction_scheduled_++;
929
494da23a 930 std::unique_ptr<Compaction> c;
7c673cae
FG
931 assert(cfd->compaction_picker());
932 c.reset(cfd->compaction_picker()->CompactFiles(
933 compact_options, input_files, output_level, version->storage_info(),
934 *cfd->GetLatestMutableCFOptions(), output_path_id));
11fdf7f2
TL
935 // we already sanitized the set of input files and checked for conflicts
936 // without releasing the lock, so we're guaranteed a compaction can be formed.
937 assert(c != nullptr);
938
7c673cae
FG
939 c->SetInputVersion(version);
940 // deletion compaction currently not allowed in CompactFiles.
941 assert(!c->deletion_compaction());
942
494da23a 943 std::vector<SequenceNumber> snapshot_seqs;
7c673cae 944 SequenceNumber earliest_write_conflict_snapshot;
494da23a
TL
945 SnapshotChecker* snapshot_checker;
946 GetSnapshotContext(job_context, &snapshot_seqs,
947 &earliest_write_conflict_snapshot, &snapshot_checker);
7c673cae
FG
948
949 auto pending_outputs_inserted_elem =
950 CaptureCurrentFileNumberInPendingOutputs();
951
952 assert(is_snapshot_supported_ || snapshots_.empty());
494da23a 953 CompactionJobStats compaction_job_stats;
7c673cae 954 CompactionJob compaction_job(
11fdf7f2
TL
955 job_context->job_id, c.get(), immutable_db_options_,
956 env_options_for_compaction_, versions_.get(), &shutting_down_,
957 preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
958 GetDataDir(c->column_family_data(), c->output_path_id()), stats_, &mutex_,
959 &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot,
960 snapshot_checker, table_cache_, &event_logger_,
961 c->mutable_cf_options()->paranoid_file_checks,
7c673cae 962 c->mutable_cf_options()->report_bg_io_stats, dbname_,
494da23a 963 &compaction_job_stats, Env::Priority::USER);
7c673cae
FG
964
965 // Creating a compaction influences the compaction score because the score
966 // takes running compactions into account (by skipping files that are already
967 // being compacted). Since we just changed compaction score, we recalculate it
968 // here.
969 version->storage_info()->ComputeCompactionScore(*cfd->ioptions(),
970 *c->mutable_cf_options());
971
972 compaction_job.Prepare();
973
974 mutex_.Unlock();
975 TEST_SYNC_POINT("CompactFilesImpl:0");
976 TEST_SYNC_POINT("CompactFilesImpl:1");
977 compaction_job.Run();
978 TEST_SYNC_POINT("CompactFilesImpl:2");
979 TEST_SYNC_POINT("CompactFilesImpl:3");
980 mutex_.Lock();
981
982 Status status = compaction_job.Install(*c->mutable_cf_options());
983 if (status.ok()) {
494da23a
TL
984 InstallSuperVersionAndScheduleWork(c->column_family_data(),
985 &job_context->superversion_contexts[0],
986 *c->mutable_cf_options());
7c673cae
FG
987 }
988 c->ReleaseCompactionFiles(s);
11fdf7f2
TL
989#ifndef ROCKSDB_LITE
990 // Need to make sure SstFileManager does its bookkeeping
991 auto sfm = static_cast<SstFileManagerImpl*>(
992 immutable_db_options_.sst_file_manager.get());
993 if (sfm && sfm_reserved_compact_space) {
994 sfm->OnCompactionCompletion(c.get());
995 }
996#endif // ROCKSDB_LITE
7c673cae
FG
997
998 ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
999
494da23a
TL
1000 if (compaction_job_info != nullptr) {
1001 BuildCompactionJobInfo(cfd, c.get(), s, compaction_job_stats,
1002 job_context->job_id, version, compaction_job_info);
1003 }
1004
7c673cae
FG
1005 if (status.ok()) {
1006 // Done
1007 } else if (status.IsShutdownInProgress()) {
1008 // Ignore compaction errors found during shutting down
1009 } else {
1010 ROCKS_LOG_WARN(immutable_db_options_.info_log,
1011 "[%s] [JOB %d] Compaction error: %s",
1012 c->column_family_data()->GetName().c_str(),
1013 job_context->job_id, status.ToString().c_str());
11fdf7f2
TL
1014 error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
1015 }
1016
1017 if (output_file_names != nullptr) {
1018 for (const auto newf : c->edit()->GetNewFiles()) {
1019 (*output_file_names)
1020 .push_back(TableFileName(c->immutable_cf_options()->cf_paths,
1021 newf.second.fd.GetNumber(),
1022 newf.second.fd.GetPathId()));
7c673cae
FG
1023 }
1024 }
1025
1026 c.reset();
1027
1028 bg_compaction_scheduled_--;
1029 if (bg_compaction_scheduled_ == 0) {
1030 bg_cv_.SignalAll();
1031 }
494da23a 1032 MaybeScheduleFlushOrCompaction();
11fdf7f2 1033 TEST_SYNC_POINT("CompactFilesImpl:End");
7c673cae
FG
1034
1035 return status;
1036}
1037#endif // ROCKSDB_LITE
1038
1039Status DBImpl::PauseBackgroundWork() {
1040 InstrumentedMutexLock guard_lock(&mutex_);
1041 bg_compaction_paused_++;
11fdf7f2
TL
1042 while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0 ||
1043 bg_flush_scheduled_ > 0) {
7c673cae
FG
1044 bg_cv_.Wait();
1045 }
1046 bg_work_paused_++;
1047 return Status::OK();
1048}
1049
1050Status DBImpl::ContinueBackgroundWork() {
1051 InstrumentedMutexLock guard_lock(&mutex_);
1052 if (bg_work_paused_ == 0) {
1053 return Status::InvalidArgument();
1054 }
1055 assert(bg_work_paused_ > 0);
1056 assert(bg_compaction_paused_ > 0);
1057 bg_compaction_paused_--;
1058 bg_work_paused_--;
1059 // It's sufficient to check just bg_work_paused_ here since
1060 // bg_work_paused_ is always no greater than bg_compaction_paused_
1061 if (bg_work_paused_ == 0) {
1062 MaybeScheduleFlushOrCompaction();
1063 }
1064 return Status::OK();
1065}
1066
494da23a
TL
1067void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c,
1068 const Status& st,
1069 const CompactionJobStats& job_stats,
1070 int job_id) {
7c673cae 1071#ifndef ROCKSDB_LITE
494da23a 1072 if (immutable_db_options_.listeners.empty()) {
7c673cae
FG
1073 return;
1074 }
1075 mutex_.AssertHeld();
1076 if (shutting_down_.load(std::memory_order_acquire)) {
1077 return;
1078 }
11fdf7f2
TL
1079 Version* current = cfd->current();
1080 current->Ref();
7c673cae
FG
1081 // release lock while notifying events
1082 mutex_.Unlock();
494da23a 1083 TEST_SYNC_POINT("DBImpl::NotifyOnCompactionBegin::UnlockMutex");
7c673cae
FG
1084 {
1085 CompactionJobInfo info;
1086 info.cf_name = cfd->GetName();
1087 info.status = st;
1088 info.thread_id = env_->GetThreadID();
1089 info.job_id = job_id;
1090 info.base_input_level = c->start_level();
1091 info.output_level = c->output_level();
494da23a 1092 info.stats = job_stats;
7c673cae
FG
1093 info.table_properties = c->GetOutputTableProperties();
1094 info.compaction_reason = c->compaction_reason();
1095 info.compression = c->output_compression();
1096 for (size_t i = 0; i < c->num_input_levels(); ++i) {
1097 for (const auto fmd : *c->inputs(i)) {
11fdf7f2 1098 auto fn = TableFileName(c->immutable_cf_options()->cf_paths,
7c673cae
FG
1099 fmd->fd.GetNumber(), fmd->fd.GetPathId());
1100 info.input_files.push_back(fn);
1101 if (info.table_properties.count(fn) == 0) {
1102 std::shared_ptr<const TableProperties> tp;
11fdf7f2 1103 auto s = current->GetTableProperties(&tp, fmd, &fn);
7c673cae
FG
1104 if (s.ok()) {
1105 info.table_properties[fn] = tp;
1106 }
1107 }
1108 }
1109 }
1110 for (const auto newf : c->edit()->GetNewFiles()) {
11fdf7f2
TL
1111 info.output_files.push_back(TableFileName(
1112 c->immutable_cf_options()->cf_paths, newf.second.fd.GetNumber(),
1113 newf.second.fd.GetPathId()));
7c673cae 1114 }
494da23a
TL
1115 for (auto listener : immutable_db_options_.listeners) {
1116 listener->OnCompactionBegin(this, info);
1117 }
1118 }
1119 mutex_.Lock();
1120 current->Unref();
1121#else
1122 (void)cfd;
1123 (void)c;
1124 (void)st;
1125 (void)job_stats;
1126 (void)job_id;
1127#endif // ROCKSDB_LITE
1128}
1129
1130void DBImpl::NotifyOnCompactionCompleted(
1131 ColumnFamilyData* cfd, Compaction* c, const Status& st,
1132 const CompactionJobStats& compaction_job_stats, const int job_id) {
1133#ifndef ROCKSDB_LITE
1134 if (immutable_db_options_.listeners.size() == 0U) {
1135 return;
1136 }
1137 mutex_.AssertHeld();
1138 if (shutting_down_.load(std::memory_order_acquire)) {
1139 return;
1140 }
1141 Version* current = cfd->current();
1142 current->Ref();
1143 // release lock while notifying events
1144 mutex_.Unlock();
1145 TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex");
1146 {
1147 CompactionJobInfo info;
1148 BuildCompactionJobInfo(cfd, c, st, compaction_job_stats, job_id, current,
1149 &info);
7c673cae
FG
1150 for (auto listener : immutable_db_options_.listeners) {
1151 listener->OnCompactionCompleted(this, info);
1152 }
1153 }
1154 mutex_.Lock();
11fdf7f2 1155 current->Unref();
7c673cae
FG
1156 // no need to signal bg_cv_ as it will be signaled at the end of the
1157 // flush process.
11fdf7f2
TL
1158#else
1159 (void)cfd;
1160 (void)c;
1161 (void)st;
1162 (void)compaction_job_stats;
1163 (void)job_id;
7c673cae
FG
1164#endif // ROCKSDB_LITE
1165}
1166
1167// REQUIREMENT: block all background work by calling PauseBackgroundWork()
1168// before calling this function
1169Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
1170 assert(level < cfd->NumberLevels());
1171 if (target_level >= cfd->NumberLevels()) {
1172 return Status::InvalidArgument("Target level exceeds number of levels");
1173 }
1174
11fdf7f2 1175 SuperVersionContext sv_context(/* create_superversion */ true);
7c673cae
FG
1176
1177 Status status;
1178
1179 InstrumentedMutexLock guard_lock(&mutex_);
1180
1181 // only allow one thread refitting
1182 if (refitting_level_) {
1183 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1184 "[ReFitLevel] another thread is refitting");
1185 return Status::NotSupported("another thread is refitting");
1186 }
1187 refitting_level_ = true;
1188
1189 const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
1190 // move to a smaller level
1191 int to_level = target_level;
1192 if (target_level < 0) {
1193 to_level = FindMinimumEmptyLevelFitting(cfd, mutable_cf_options, level);
1194 }
1195
1196 auto* vstorage = cfd->current()->storage_info();
1197 if (to_level > level) {
1198 if (level == 0) {
1199 return Status::NotSupported(
1200 "Cannot change from level 0 to other levels.");
1201 }
1202 // Check levels are empty for a trivial move
1203 for (int l = level + 1; l <= to_level; l++) {
1204 if (vstorage->NumLevelFiles(l) > 0) {
1205 return Status::NotSupported(
1206 "Levels between source and target are not empty for a move.");
1207 }
1208 }
1209 }
1210 if (to_level != level) {
1211 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1212 "[%s] Before refitting:\n%s", cfd->GetName().c_str(),
1213 cfd->current()->DebugString().data());
1214
1215 VersionEdit edit;
1216 edit.SetColumnFamily(cfd->GetID());
1217 for (const auto& f : vstorage->LevelFiles(level)) {
1218 edit.DeleteFile(level, f->fd.GetNumber());
1219 edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
1220 f->fd.GetFileSize(), f->smallest, f->largest,
11fdf7f2 1221 f->fd.smallest_seqno, f->fd.largest_seqno,
7c673cae
FG
1222 f->marked_for_compaction);
1223 }
1224 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1225 "[%s] Apply version edit:\n%s", cfd->GetName().c_str(),
1226 edit.DebugString().data());
1227
1228 status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_,
1229 directories_.GetDbDir());
11fdf7f2 1230 InstallSuperVersionAndScheduleWork(cfd, &sv_context, mutable_cf_options);
7c673cae
FG
1231
1232 ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] LogAndApply: %s\n",
1233 cfd->GetName().c_str(), status.ToString().data());
1234
1235 if (status.ok()) {
1236 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1237 "[%s] After refitting:\n%s", cfd->GetName().c_str(),
1238 cfd->current()->DebugString().data());
1239 }
1240 }
1241
11fdf7f2 1242 sv_context.Clean();
7c673cae
FG
1243 refitting_level_ = false;
1244
1245 return status;
1246}
1247
1248int DBImpl::NumberLevels(ColumnFamilyHandle* column_family) {
1249 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1250 return cfh->cfd()->NumberLevels();
1251}
1252
11fdf7f2 1253int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle* /*column_family*/) {
7c673cae
FG
1254 return 0;
1255}
1256
1257int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) {
1258 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1259 InstrumentedMutexLock l(&mutex_);
11fdf7f2
TL
1260 return cfh->cfd()
1261 ->GetSuperVersion()
1262 ->mutable_cf_options.level0_stop_writes_trigger;
7c673cae
FG
1263}
1264
1265Status DBImpl::Flush(const FlushOptions& flush_options,
1266 ColumnFamilyHandle* column_family) {
1267 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
11fdf7f2
TL
1268 ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.",
1269 cfh->GetName().c_str());
494da23a
TL
1270 Status s;
1271 if (immutable_db_options_.atomic_flush) {
1272 s = AtomicFlushMemTables({cfh->cfd()}, flush_options,
1273 FlushReason::kManualFlush);
1274 } else {
1275 s = FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualFlush);
1276 }
1277
11fdf7f2
TL
1278 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1279 "[%s] Manual flush finished, status: %s\n",
1280 cfh->GetName().c_str(), s.ToString().c_str());
1281 return s;
1282}
1283
494da23a
TL
1284Status DBImpl::Flush(const FlushOptions& flush_options,
1285 const std::vector<ColumnFamilyHandle*>& column_families) {
11fdf7f2 1286 Status s;
494da23a
TL
1287 if (!immutable_db_options_.atomic_flush) {
1288 for (auto cfh : column_families) {
1289 s = Flush(flush_options, cfh);
1290 if (!s.ok()) {
1291 break;
1292 }
11fdf7f2 1293 }
494da23a
TL
1294 } else {
1295 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1296 "Manual atomic flush start.\n"
1297 "=====Column families:=====");
1298 for (auto cfh : column_families) {
1299 auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh);
1300 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",
1301 cfhi->GetName().c_str());
11fdf7f2 1302 }
494da23a
TL
1303 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1304 "=====End of column families list=====");
1305 autovector<ColumnFamilyData*> cfds;
1306 std::for_each(column_families.begin(), column_families.end(),
1307 [&cfds](ColumnFamilyHandle* elem) {
1308 auto cfh = static_cast<ColumnFamilyHandleImpl*>(elem);
1309 cfds.emplace_back(cfh->cfd());
1310 });
1311 s = AtomicFlushMemTables(cfds, flush_options, FlushReason::kManualFlush);
1312 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1313 "Manual atomic flush finished, status: %s\n"
1314 "=====Column families:=====",
1315 s.ToString().c_str());
1316 for (auto cfh : column_families) {
1317 auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh);
1318 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",
1319 cfhi->GetName().c_str());
11fdf7f2 1320 }
494da23a
TL
1321 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1322 "=====End of column families list=====");
11fdf7f2 1323 }
11fdf7f2 1324 return s;
7c673cae
FG
1325}
1326
1327Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
1328 int output_level, uint32_t output_path_id,
11fdf7f2 1329 uint32_t max_subcompactions,
7c673cae
FG
1330 const Slice* begin, const Slice* end,
1331 bool exclusive, bool disallow_trivial_move) {
1332 assert(input_level == ColumnFamilyData::kCompactAllLevels ||
1333 input_level >= 0);
1334
1335 InternalKey begin_storage, end_storage;
1336 CompactionArg* ca;
1337
1338 bool scheduled = false;
1339 bool manual_conflict = false;
11fdf7f2 1340 ManualCompactionState manual;
7c673cae
FG
1341 manual.cfd = cfd;
1342 manual.input_level = input_level;
1343 manual.output_level = output_level;
1344 manual.output_path_id = output_path_id;
1345 manual.done = false;
1346 manual.in_progress = false;
1347 manual.incomplete = false;
1348 manual.exclusive = exclusive;
1349 manual.disallow_trivial_move = disallow_trivial_move;
1350 // For universal compaction, we enforce every manual compaction to compact
1351 // all files.
1352 if (begin == nullptr ||
1353 cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
1354 cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1355 manual.begin = nullptr;
1356 } else {
11fdf7f2 1357 begin_storage.SetMinPossibleForUserKey(*begin);
7c673cae
FG
1358 manual.begin = &begin_storage;
1359 }
1360 if (end == nullptr ||
1361 cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
1362 cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1363 manual.end = nullptr;
1364 } else {
11fdf7f2 1365 end_storage.SetMaxPossibleForUserKey(*end);
7c673cae
FG
1366 manual.end = &end_storage;
1367 }
1368
1369 TEST_SYNC_POINT("DBImpl::RunManualCompaction:0");
1370 TEST_SYNC_POINT("DBImpl::RunManualCompaction:1");
1371 InstrumentedMutexLock l(&mutex_);
1372
1373 // When a manual compaction arrives, temporarily disable scheduling of
1374 // non-manual compactions and wait until the number of scheduled compaction
1375 // jobs drops to zero. This is needed to ensure that this manual compaction
1376 // can compact any range of keys/files.
1377 //
1378 // HasPendingManualCompaction() is true when at least one thread is inside
1379 // RunManualCompaction(), i.e. during that time no other compaction will
1380 // get scheduled (see MaybeScheduleFlushOrCompaction).
1381 //
1382 // Note that the following loop doesn't stop more that one thread calling
1383 // RunManualCompaction() from getting to the second while loop below.
1384 // However, only one of them will actually schedule compaction, while
1385 // others will wait on a condition variable until it completes.
1386
1387 AddManualCompaction(&manual);
1388 TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_);
1389 if (exclusive) {
11fdf7f2
TL
1390 while (bg_bottom_compaction_scheduled_ > 0 ||
1391 bg_compaction_scheduled_ > 0) {
1392 TEST_SYNC_POINT("DBImpl::RunManualCompaction:WaitScheduled");
7c673cae
FG
1393 ROCKS_LOG_INFO(
1394 immutable_db_options_.info_log,
1395 "[%s] Manual compaction waiting for all other scheduled background "
1396 "compactions to finish",
1397 cfd->GetName().c_str());
1398 bg_cv_.Wait();
1399 }
1400 }
1401
1402 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1403 "[%s] Manual compaction starting", cfd->GetName().c_str());
1404
494da23a
TL
1405 LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
1406 immutable_db_options_.info_log.get());
7c673cae
FG
1407 // We don't check bg_error_ here, because if we get the error in compaction,
1408 // the compaction will set manual.status to bg_error_ and set manual.done to
1409 // true.
1410 while (!manual.done) {
1411 assert(HasPendingManualCompaction());
1412 manual_conflict = false;
11fdf7f2 1413 Compaction* compaction = nullptr;
7c673cae
FG
1414 if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) ||
1415 scheduled ||
11fdf7f2
TL
1416 (((manual.manual_end = &manual.tmp_storage1) != nullptr) &&
1417 ((compaction = manual.cfd->CompactRange(
1418 *manual.cfd->GetLatestMutableCFOptions(), manual.input_level,
1419 manual.output_level, manual.output_path_id, max_subcompactions,
1420 manual.begin, manual.end, &manual.manual_end,
1421 &manual_conflict)) == nullptr &&
1422 manual_conflict))) {
7c673cae
FG
1423 // exclusive manual compactions should not see a conflict during
1424 // CompactRange
1425 assert(!exclusive || !manual_conflict);
1426 // Running either this or some other manual compaction
1427 bg_cv_.Wait();
1428 if (scheduled && manual.incomplete == true) {
1429 assert(!manual.in_progress);
1430 scheduled = false;
1431 manual.incomplete = false;
1432 }
1433 } else if (!scheduled) {
11fdf7f2 1434 if (compaction == nullptr) {
7c673cae
FG
1435 manual.done = true;
1436 bg_cv_.SignalAll();
1437 continue;
1438 }
1439 ca = new CompactionArg;
1440 ca->db = this;
11fdf7f2
TL
1441 ca->prepicked_compaction = new PrepickedCompaction;
1442 ca->prepicked_compaction->manual_compaction_state = &manual;
1443 ca->prepicked_compaction->compaction = compaction;
494da23a
TL
1444 if (!RequestCompactionToken(
1445 cfd, true, &ca->prepicked_compaction->task_token, &log_buffer)) {
1446 // Don't throttle manual compaction, only count outstanding tasks.
1447 assert(false);
1448 }
7c673cae
FG
1449 manual.incomplete = false;
1450 bg_compaction_scheduled_++;
1451 env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
494da23a 1452 &DBImpl::UnscheduleCompactionCallback);
7c673cae
FG
1453 scheduled = true;
1454 }
1455 }
1456
494da23a 1457 log_buffer.FlushBufferToLog();
7c673cae
FG
1458 assert(!manual.in_progress);
1459 assert(HasPendingManualCompaction());
1460 RemoveManualCompaction(&manual);
1461 bg_cv_.SignalAll();
1462 return manual.status;
1463}
1464
494da23a
TL
1465void DBImpl::GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
1466 FlushRequest* req) {
1467 assert(req != nullptr);
1468 req->reserve(cfds.size());
1469 for (const auto cfd : cfds) {
1470 if (nullptr == cfd) {
1471 // cfd may be null, see DBImpl::ScheduleFlushes
1472 continue;
1473 }
1474 uint64_t max_memtable_id = cfd->imm()->GetLatestMemTableID();
1475 req->emplace_back(cfd, max_memtable_id);
1476 }
1477}
1478
7c673cae
FG
1479Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
1480 const FlushOptions& flush_options,
11fdf7f2 1481 FlushReason flush_reason, bool writes_stopped) {
7c673cae 1482 Status s;
11fdf7f2
TL
1483 uint64_t flush_memtable_id = 0;
1484 if (!flush_options.allow_write_stall) {
1485 bool flush_needed = true;
1486 s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
1487 TEST_SYNC_POINT("DBImpl::FlushMemTable:StallWaitDone");
1488 if (!s.ok() || !flush_needed) {
1489 return s;
1490 }
1491 }
1492 FlushRequest flush_req;
7c673cae
FG
1493 {
1494 WriteContext context;
1495 InstrumentedMutexLock guard_lock(&mutex_);
1496
7c673cae
FG
1497 WriteThread::Writer w;
1498 if (!writes_stopped) {
1499 write_thread_.EnterUnbatched(&w, &mutex_);
1500 }
1501
494da23a 1502 if (!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load()) {
11fdf7f2 1503 s = SwitchMemtable(cfd, &context);
494da23a
TL
1504 }
1505
1506 if (s.ok()) {
1507 if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
1508 !cached_recoverable_state_empty_.load()) {
1509 flush_memtable_id = cfd->imm()->GetLatestMemTableID();
1510 flush_req.emplace_back(cfd, flush_memtable_id);
1511 }
11fdf7f2
TL
1512 }
1513
1514 if (s.ok() && !flush_req.empty()) {
1515 for (auto& elem : flush_req) {
1516 ColumnFamilyData* loop_cfd = elem.first;
1517 loop_cfd->imm()->FlushRequested();
1518 }
1519 SchedulePendingFlush(flush_req, flush_reason);
1520 MaybeScheduleFlushOrCompaction();
1521 }
7c673cae
FG
1522
1523 if (!writes_stopped) {
1524 write_thread_.ExitUnbatched(&w);
1525 }
7c673cae
FG
1526 }
1527
1528 if (s.ok() && flush_options.wait) {
11fdf7f2
TL
1529 autovector<ColumnFamilyData*> cfds;
1530 autovector<const uint64_t*> flush_memtable_ids;
1531 for (auto& iter : flush_req) {
1532 cfds.push_back(iter.first);
1533 flush_memtable_ids.push_back(&(iter.second));
1534 }
494da23a
TL
1535 s = WaitForFlushMemTables(cfds, flush_memtable_ids,
1536 (flush_reason == FlushReason::kErrorRecovery));
7c673cae 1537 }
11fdf7f2 1538 TEST_SYNC_POINT("FlushMemTableFinished");
7c673cae
FG
1539 return s;
1540}
1541
494da23a
TL
1542// Flush all elments in 'column_family_datas'
1543// and atomically record the result to the MANIFEST.
1544Status DBImpl::AtomicFlushMemTables(
1545 const autovector<ColumnFamilyData*>& column_family_datas,
1546 const FlushOptions& flush_options, FlushReason flush_reason,
1547 bool writes_stopped) {
1548 Status s;
1549 if (!flush_options.allow_write_stall) {
1550 int num_cfs_to_flush = 0;
1551 for (auto cfd : column_family_datas) {
1552 bool flush_needed = true;
1553 s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
1554 if (!s.ok()) {
1555 return s;
1556 } else if (flush_needed) {
1557 ++num_cfs_to_flush;
1558 }
1559 }
1560 if (0 == num_cfs_to_flush) {
1561 return s;
1562 }
1563 }
1564 FlushRequest flush_req;
1565 autovector<ColumnFamilyData*> cfds;
1566 {
1567 WriteContext context;
1568 InstrumentedMutexLock guard_lock(&mutex_);
1569
1570 WriteThread::Writer w;
1571 if (!writes_stopped) {
1572 write_thread_.EnterUnbatched(&w, &mutex_);
1573 }
1574
1575 for (auto cfd : column_family_datas) {
1576 if (cfd->IsDropped()) {
1577 continue;
1578 }
1579 if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
1580 !cached_recoverable_state_empty_.load()) {
1581 cfds.emplace_back(cfd);
1582 }
1583 }
1584 for (auto cfd : cfds) {
1585 if (cfd->mem()->IsEmpty() && cached_recoverable_state_empty_.load()) {
1586 continue;
1587 }
1588 cfd->Ref();
1589 s = SwitchMemtable(cfd, &context);
1590 cfd->Unref();
1591 if (!s.ok()) {
1592 break;
1593 }
1594 }
1595 if (s.ok()) {
1596 AssignAtomicFlushSeq(cfds);
1597 for (auto cfd : cfds) {
1598 cfd->imm()->FlushRequested();
1599 }
1600 GenerateFlushRequest(cfds, &flush_req);
1601 SchedulePendingFlush(flush_req, flush_reason);
1602 MaybeScheduleFlushOrCompaction();
1603 }
1604
1605 if (!writes_stopped) {
1606 write_thread_.ExitUnbatched(&w);
1607 }
1608 }
1609 TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:AfterScheduleFlush");
1610
1611 if (s.ok() && flush_options.wait) {
1612 autovector<const uint64_t*> flush_memtable_ids;
1613 for (auto& iter : flush_req) {
1614 flush_memtable_ids.push_back(&(iter.second));
1615 }
1616 s = WaitForFlushMemTables(cfds, flush_memtable_ids,
1617 (flush_reason == FlushReason::kErrorRecovery));
1618 }
1619 return s;
1620}
1621
11fdf7f2
TL
1622// Calling FlushMemTable(), whether from DB::Flush() or from Backup Engine, can
1623// cause write stall, for example if one memtable is being flushed already.
1624// This method tries to avoid write stall (similar to CompactRange() behavior)
1625// it emulates how the SuperVersion / LSM would change if flush happens, checks
1626// it against various constrains and delays flush if it'd cause write stall.
1627// Called should check status and flush_needed to see if flush already happened.
1628Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
494da23a 1629 bool* flush_needed) {
11fdf7f2
TL
1630 {
1631 *flush_needed = true;
1632 InstrumentedMutexLock l(&mutex_);
1633 uint64_t orig_active_memtable_id = cfd->mem()->GetID();
1634 WriteStallCondition write_stall_condition = WriteStallCondition::kNormal;
1635 do {
1636 if (write_stall_condition != WriteStallCondition::kNormal) {
494da23a
TL
1637 // Same error handling as user writes: Don't wait if there's a
1638 // background error, even if it's a soft error. We might wait here
1639 // indefinitely as the pending flushes/compactions may never finish
1640 // successfully, resulting in the stall condition lasting indefinitely
1641 if (error_handler_.IsBGWorkStopped()) {
1642 return error_handler_.GetBGError();
1643 }
1644
11fdf7f2
TL
1645 TEST_SYNC_POINT("DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait");
1646 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1647 "[%s] WaitUntilFlushWouldNotStallWrites"
1648 " waiting on stall conditions to clear",
1649 cfd->GetName().c_str());
1650 bg_cv_.Wait();
1651 }
1652 if (cfd->IsDropped() || shutting_down_.load(std::memory_order_acquire)) {
1653 return Status::ShutdownInProgress();
1654 }
1655
1656 uint64_t earliest_memtable_id =
1657 std::min(cfd->mem()->GetID(), cfd->imm()->GetEarliestMemTableID());
1658 if (earliest_memtable_id > orig_active_memtable_id) {
1659 // We waited so long that the memtable we were originally waiting on was
1660 // flushed.
1661 *flush_needed = false;
1662 return Status::OK();
1663 }
1664
1665 const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions();
1666 const auto* vstorage = cfd->current()->storage_info();
1667
1668 // Skip stalling check if we're below auto-flush and auto-compaction
1669 // triggers. If it stalled in these conditions, that'd mean the stall
1670 // triggers are so low that stalling is needed for any background work. In
1671 // that case we shouldn't wait since background work won't be scheduled.
1672 if (cfd->imm()->NumNotFlushed() <
1673 cfd->ioptions()->min_write_buffer_number_to_merge &&
1674 vstorage->l0_delay_trigger_count() <
1675 mutable_cf_options.level0_file_num_compaction_trigger) {
1676 break;
1677 }
1678
1679 // check whether one extra immutable memtable or an extra L0 file would
1680 // cause write stalling mode to be entered. It could still enter stall
1681 // mode due to pending compaction bytes, but that's less common
1682 write_stall_condition =
1683 ColumnFamilyData::GetWriteStallConditionAndCause(
1684 cfd->imm()->NumNotFlushed() + 1,
1685 vstorage->l0_delay_trigger_count() + 1,
1686 vstorage->estimated_compaction_needed_bytes(), mutable_cf_options)
1687 .first;
1688 } while (write_stall_condition != WriteStallCondition::kNormal);
1689 }
1690 return Status::OK();
1691}
1692
1693// Wait for memtables to be flushed for multiple column families.
1694// let N = cfds.size()
1695// for i in [0, N),
1696// 1) if flush_memtable_ids[i] is not null, then the memtables with lower IDs
1697// have to be flushed for THIS column family;
1698// 2) if flush_memtable_ids[i] is null, then all memtables in THIS column
1699// family have to be flushed.
1700// Finish waiting when ALL column families finish flushing memtables.
494da23a
TL
1701// resuming_from_bg_err indicates whether the caller is trying to resume from
1702// background error or in normal processing.
11fdf7f2
TL
1703Status DBImpl::WaitForFlushMemTables(
1704 const autovector<ColumnFamilyData*>& cfds,
494da23a
TL
1705 const autovector<const uint64_t*>& flush_memtable_ids,
1706 bool resuming_from_bg_err) {
11fdf7f2 1707 int num = static_cast<int>(cfds.size());
7c673cae
FG
1708 // Wait until the compaction completes
1709 InstrumentedMutexLock l(&mutex_);
494da23a
TL
1710 // If the caller is trying to resume from bg error, then
1711 // error_handler_.IsDBStopped() is true.
1712 while (resuming_from_bg_err || !error_handler_.IsDBStopped()) {
7c673cae
FG
1713 if (shutting_down_.load(std::memory_order_acquire)) {
1714 return Status::ShutdownInProgress();
1715 }
494da23a
TL
1716 // If an error has occurred during resumption, then no need to wait.
1717 if (!error_handler_.GetRecoveryError().ok()) {
1718 break;
1719 }
11fdf7f2
TL
1720 // Number of column families that have been dropped.
1721 int num_dropped = 0;
1722 // Number of column families that have finished flush.
1723 int num_finished = 0;
1724 for (int i = 0; i < num; ++i) {
1725 if (cfds[i]->IsDropped()) {
1726 ++num_dropped;
1727 } else if (cfds[i]->imm()->NumNotFlushed() == 0 ||
1728 (flush_memtable_ids[i] != nullptr &&
1729 cfds[i]->imm()->GetEarliestMemTableID() >
1730 *flush_memtable_ids[i])) {
1731 ++num_finished;
1732 }
1733 }
1734 if (1 == num_dropped && 1 == num) {
7c673cae
FG
1735 return Status::InvalidArgument("Cannot flush a dropped CF");
1736 }
11fdf7f2
TL
1737 // Column families involved in this flush request have either been dropped
1738 // or finished flush. Then it's time to finish waiting.
1739 if (num_dropped + num_finished == num) {
1740 break;
1741 }
7c673cae
FG
1742 bg_cv_.Wait();
1743 }
11fdf7f2 1744 Status s;
494da23a
TL
1745 // If not resuming from bg error, and an error has caused the DB to stop,
1746 // then report the bg error to caller.
1747 if (!resuming_from_bg_err && error_handler_.IsDBStopped()) {
11fdf7f2 1748 s = error_handler_.GetBGError();
7c673cae
FG
1749 }
1750 return s;
1751}
1752
1753Status DBImpl::EnableAutoCompaction(
1754 const std::vector<ColumnFamilyHandle*>& column_family_handles) {
1755 Status s;
1756 for (auto cf_ptr : column_family_handles) {
1757 Status status =
1758 this->SetOptions(cf_ptr, {{"disable_auto_compactions", "false"}});
1759 if (!status.ok()) {
1760 s = status;
1761 }
1762 }
1763
1764 return s;
1765}
1766
1767void DBImpl::MaybeScheduleFlushOrCompaction() {
1768 mutex_.AssertHeld();
1769 if (!opened_successfully_) {
1770 // Compaction may introduce data race to DB open
1771 return;
1772 }
1773 if (bg_work_paused_ > 0) {
1774 // we paused the background work
1775 return;
11fdf7f2 1776 } else if (error_handler_.IsBGWorkStopped() &&
494da23a 1777 !error_handler_.IsRecoveryInProgress()) {
11fdf7f2
TL
1778 // There has been a hard error and this call is not part of the recovery
1779 // sequence. Bail out here so we don't get into an endless loop of
1780 // scheduling BG work which will again call this function
1781 return;
7c673cae
FG
1782 } else if (shutting_down_.load(std::memory_order_acquire)) {
1783 // DB is being deleted; no more background compactions
1784 return;
1785 }
11fdf7f2
TL
1786 auto bg_job_limits = GetBGJobLimits();
1787 bool is_flush_pool_empty =
1788 env_->GetBackgroundThreads(Env::Priority::HIGH) == 0;
1789 while (!is_flush_pool_empty && unscheduled_flushes_ > 0 &&
1790 bg_flush_scheduled_ < bg_job_limits.max_flushes) {
7c673cae 1791 bg_flush_scheduled_++;
494da23a
TL
1792 FlushThreadArg* fta = new FlushThreadArg;
1793 fta->db_ = this;
1794 fta->thread_pri_ = Env::Priority::HIGH;
1795 env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::HIGH, this,
1796 &DBImpl::UnscheduleFlushCallback);
7c673cae
FG
1797 }
1798
11fdf7f2
TL
1799 // special case -- if high-pri (flush) thread pool is empty, then schedule
1800 // flushes in low-pri (compaction) thread pool.
1801 if (is_flush_pool_empty) {
7c673cae
FG
1802 while (unscheduled_flushes_ > 0 &&
1803 bg_flush_scheduled_ + bg_compaction_scheduled_ <
11fdf7f2 1804 bg_job_limits.max_flushes) {
7c673cae 1805 bg_flush_scheduled_++;
494da23a
TL
1806 FlushThreadArg* fta = new FlushThreadArg;
1807 fta->db_ = this;
1808 fta->thread_pri_ = Env::Priority::LOW;
1809 env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::LOW, this,
1810 &DBImpl::UnscheduleFlushCallback);
7c673cae
FG
1811 }
1812 }
1813
1814 if (bg_compaction_paused_ > 0) {
1815 // we paused the background compaction
1816 return;
11fdf7f2
TL
1817 } else if (error_handler_.IsBGWorkStopped()) {
1818 // Compaction is not part of the recovery sequence from a hard error. We
1819 // might get here because recovery might do a flush and install a new
1820 // super version, which will try to schedule pending compactions. Bail
1821 // out here and let the higher level recovery handle compactions
1822 return;
7c673cae
FG
1823 }
1824
1825 if (HasExclusiveManualCompaction()) {
1826 // only manual compactions are allowed to run. don't schedule automatic
1827 // compactions
11fdf7f2 1828 TEST_SYNC_POINT("DBImpl::MaybeScheduleFlushOrCompaction:Conflict");
7c673cae
FG
1829 return;
1830 }
1831
11fdf7f2 1832 while (bg_compaction_scheduled_ < bg_job_limits.max_compactions &&
7c673cae
FG
1833 unscheduled_compactions_ > 0) {
1834 CompactionArg* ca = new CompactionArg;
1835 ca->db = this;
11fdf7f2 1836 ca->prepicked_compaction = nullptr;
7c673cae
FG
1837 bg_compaction_scheduled_++;
1838 unscheduled_compactions_--;
1839 env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
494da23a 1840 &DBImpl::UnscheduleCompactionCallback);
7c673cae
FG
1841 }
1842}
1843
11fdf7f2 1844DBImpl::BGJobLimits DBImpl::GetBGJobLimits() const {
7c673cae 1845 mutex_.AssertHeld();
11fdf7f2
TL
1846 return GetBGJobLimits(immutable_db_options_.max_background_flushes,
1847 mutable_db_options_.max_background_compactions,
1848 mutable_db_options_.max_background_jobs,
1849 write_controller_.NeedSpeedupCompaction());
1850}
1851
1852DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes,
1853 int max_background_compactions,
1854 int max_background_jobs,
1855 bool parallelize_compactions) {
1856 BGJobLimits res;
1857 if (max_background_flushes == -1 && max_background_compactions == -1) {
1858 // for our first stab implementing max_background_jobs, simply allocate a
1859 // quarter of the threads to flushes.
1860 res.max_flushes = std::max(1, max_background_jobs / 4);
1861 res.max_compactions = std::max(1, max_background_jobs - res.max_flushes);
7c673cae 1862 } else {
11fdf7f2
TL
1863 // compatibility code in case users haven't migrated to max_background_jobs,
1864 // which automatically computes flush/compaction limits
1865 res.max_flushes = std::max(1, max_background_flushes);
1866 res.max_compactions = std::max(1, max_background_compactions);
7c673cae 1867 }
11fdf7f2
TL
1868 if (!parallelize_compactions) {
1869 // throttle background compactions until we deem necessary
1870 res.max_compactions = 1;
1871 }
1872 return res;
7c673cae
FG
1873}
1874
1875void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) {
11fdf7f2 1876 assert(!cfd->queued_for_compaction());
7c673cae
FG
1877 cfd->Ref();
1878 compaction_queue_.push_back(cfd);
11fdf7f2 1879 cfd->set_queued_for_compaction(true);
7c673cae
FG
1880}
1881
1882ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() {
1883 assert(!compaction_queue_.empty());
1884 auto cfd = *compaction_queue_.begin();
1885 compaction_queue_.pop_front();
11fdf7f2
TL
1886 assert(cfd->queued_for_compaction());
1887 cfd->set_queued_for_compaction(false);
7c673cae
FG
1888 return cfd;
1889}
1890
11fdf7f2 1891DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() {
7c673cae 1892 assert(!flush_queue_.empty());
11fdf7f2
TL
1893 FlushRequest flush_req = flush_queue_.front();
1894 assert(unscheduled_flushes_ >= static_cast<int>(flush_req.size()));
1895 unscheduled_flushes_ -= static_cast<int>(flush_req.size());
7c673cae 1896 flush_queue_.pop_front();
11fdf7f2
TL
1897 // TODO: need to unset flush reason?
1898 return flush_req;
7c673cae
FG
1899}
1900
494da23a
TL
1901ColumnFamilyData* DBImpl::PickCompactionFromQueue(
1902 std::unique_ptr<TaskLimiterToken>* token, LogBuffer* log_buffer) {
1903 assert(!compaction_queue_.empty());
1904 assert(*token == nullptr);
1905 autovector<ColumnFamilyData*> throttled_candidates;
1906 ColumnFamilyData* cfd = nullptr;
1907 while (!compaction_queue_.empty()) {
1908 auto first_cfd = *compaction_queue_.begin();
1909 compaction_queue_.pop_front();
1910 assert(first_cfd->queued_for_compaction());
1911 if (!RequestCompactionToken(first_cfd, false, token, log_buffer)) {
1912 throttled_candidates.push_back(first_cfd);
1913 continue;
1914 }
1915 cfd = first_cfd;
1916 cfd->set_queued_for_compaction(false);
1917 break;
1918 }
1919 // Add throttled compaction candidates back to queue in the original order.
1920 for (auto iter = throttled_candidates.rbegin();
1921 iter != throttled_candidates.rend(); ++iter) {
1922 compaction_queue_.push_front(*iter);
1923 }
1924 return cfd;
1925}
1926
11fdf7f2
TL
1927void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req,
1928 FlushReason flush_reason) {
1929 if (flush_req.empty()) {
1930 return;
1931 }
1932 for (auto& iter : flush_req) {
1933 ColumnFamilyData* cfd = iter.first;
1934 cfd->Ref();
1935 cfd->SetFlushReason(flush_reason);
7c673cae 1936 }
11fdf7f2
TL
1937 unscheduled_flushes_ += static_cast<int>(flush_req.size());
1938 flush_queue_.push_back(flush_req);
7c673cae
FG
1939}
1940
1941void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
11fdf7f2 1942 if (!cfd->queued_for_compaction() && cfd->NeedsCompaction()) {
7c673cae
FG
1943 AddToCompactionQueue(cfd);
1944 ++unscheduled_compactions_;
1945 }
1946}
1947
11fdf7f2
TL
1948void DBImpl::SchedulePendingPurge(std::string fname, std::string dir_to_sync,
1949 FileType type, uint64_t number, int job_id) {
7c673cae 1950 mutex_.AssertHeld();
11fdf7f2 1951 PurgeFileInfo file_info(fname, dir_to_sync, type, number, job_id);
7c673cae
FG
1952 purge_queue_.push_back(std::move(file_info));
1953}
1954
494da23a
TL
1955void DBImpl::BGWorkFlush(void* arg) {
1956 FlushThreadArg fta = *(reinterpret_cast<FlushThreadArg*>(arg));
1957 delete reinterpret_cast<FlushThreadArg*>(arg);
1958
1959 IOSTATS_SET_THREAD_POOL_ID(fta.thread_pri_);
7c673cae 1960 TEST_SYNC_POINT("DBImpl::BGWorkFlush");
494da23a 1961 reinterpret_cast<DBImpl*>(fta.db_)->BackgroundCallFlush(fta.thread_pri_);
7c673cae
FG
1962 TEST_SYNC_POINT("DBImpl::BGWorkFlush:done");
1963}
1964
1965void DBImpl::BGWorkCompaction(void* arg) {
1966 CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
1967 delete reinterpret_cast<CompactionArg*>(arg);
1968 IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW);
1969 TEST_SYNC_POINT("DBImpl::BGWorkCompaction");
11fdf7f2
TL
1970 auto prepicked_compaction =
1971 static_cast<PrepickedCompaction*>(ca.prepicked_compaction);
1972 reinterpret_cast<DBImpl*>(ca.db)->BackgroundCallCompaction(
1973 prepicked_compaction, Env::Priority::LOW);
1974 delete prepicked_compaction;
1975}
1976
1977void DBImpl::BGWorkBottomCompaction(void* arg) {
1978 CompactionArg ca = *(static_cast<CompactionArg*>(arg));
1979 delete static_cast<CompactionArg*>(arg);
1980 IOSTATS_SET_THREAD_POOL_ID(Env::Priority::BOTTOM);
1981 TEST_SYNC_POINT("DBImpl::BGWorkBottomCompaction");
1982 auto* prepicked_compaction = ca.prepicked_compaction;
1983 assert(prepicked_compaction && prepicked_compaction->compaction &&
1984 !prepicked_compaction->manual_compaction_state);
1985 ca.db->BackgroundCallCompaction(prepicked_compaction, Env::Priority::BOTTOM);
1986 delete prepicked_compaction;
7c673cae
FG
1987}
1988
1989void DBImpl::BGWorkPurge(void* db) {
1990 IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
1991 TEST_SYNC_POINT("DBImpl::BGWorkPurge:start");
1992 reinterpret_cast<DBImpl*>(db)->BackgroundCallPurge();
1993 TEST_SYNC_POINT("DBImpl::BGWorkPurge:end");
1994}
1995
494da23a 1996void DBImpl::UnscheduleCompactionCallback(void* arg) {
7c673cae
FG
1997 CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
1998 delete reinterpret_cast<CompactionArg*>(arg);
11fdf7f2
TL
1999 if (ca.prepicked_compaction != nullptr) {
2000 if (ca.prepicked_compaction->compaction != nullptr) {
2001 delete ca.prepicked_compaction->compaction;
2002 }
2003 delete ca.prepicked_compaction;
7c673cae 2004 }
494da23a
TL
2005 TEST_SYNC_POINT("DBImpl::UnscheduleCompactionCallback");
2006}
2007
2008void DBImpl::UnscheduleFlushCallback(void* arg) {
2009 delete reinterpret_cast<FlushThreadArg*>(arg);
2010 TEST_SYNC_POINT("DBImpl::UnscheduleFlushCallback");
7c673cae
FG
2011}
2012
2013Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
494da23a
TL
2014 LogBuffer* log_buffer, FlushReason* reason,
2015 Env::Priority thread_pri) {
7c673cae
FG
2016 mutex_.AssertHeld();
2017
11fdf7f2
TL
2018 Status status;
2019 *reason = FlushReason::kOthers;
2020 // If BG work is stopped due to an error, but a recovery is in progress,
2021 // that means this flush is part of the recovery. So allow it to go through
2022 if (!error_handler_.IsBGWorkStopped()) {
2023 if (shutting_down_.load(std::memory_order_acquire)) {
2024 status = Status::ShutdownInProgress();
2025 }
2026 } else if (!error_handler_.IsRecoveryInProgress()) {
2027 status = error_handler_.GetBGError();
7c673cae
FG
2028 }
2029
2030 if (!status.ok()) {
2031 return status;
2032 }
2033
11fdf7f2
TL
2034 autovector<BGFlushArg> bg_flush_args;
2035 std::vector<SuperVersionContext>& superversion_contexts =
2036 job_context->superversion_contexts;
7c673cae
FG
2037 while (!flush_queue_.empty()) {
2038 // This cfd is already referenced
11fdf7f2
TL
2039 const FlushRequest& flush_req = PopFirstFromFlushQueue();
2040 superversion_contexts.clear();
2041 superversion_contexts.reserve(flush_req.size());
2042
2043 for (const auto& iter : flush_req) {
2044 ColumnFamilyData* cfd = iter.first;
2045 if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) {
2046 // can't flush this CF, try next one
2047 if (cfd->Unref()) {
2048 delete cfd;
2049 }
2050 continue;
7c673cae 2051 }
11fdf7f2
TL
2052 superversion_contexts.emplace_back(SuperVersionContext(true));
2053 bg_flush_args.emplace_back(cfd, iter.second,
2054 &(superversion_contexts.back()));
2055 }
2056 if (!bg_flush_args.empty()) {
2057 break;
7c673cae 2058 }
7c673cae
FG
2059 }
2060
11fdf7f2
TL
2061 if (!bg_flush_args.empty()) {
2062 auto bg_job_limits = GetBGJobLimits();
2063 for (const auto& arg : bg_flush_args) {
2064 ColumnFamilyData* cfd = arg.cfd_;
2065 ROCKS_LOG_BUFFER(
2066 log_buffer,
2067 "Calling FlushMemTableToOutputFile with column "
2068 "family [%s], flush slots available %d, compaction slots available "
2069 "%d, "
2070 "flush slots scheduled %d, compaction slots scheduled %d",
2071 cfd->GetName().c_str(), bg_job_limits.max_flushes,
2072 bg_job_limits.max_compactions, bg_flush_scheduled_,
2073 bg_compaction_scheduled_);
2074 }
2075 status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress,
494da23a 2076 job_context, log_buffer, thread_pri);
11fdf7f2
TL
2077 // All the CFDs in the FlushReq must have the same flush reason, so just
2078 // grab the first one
2079 *reason = bg_flush_args[0].cfd_->GetFlushReason();
2080 for (auto& arg : bg_flush_args) {
2081 ColumnFamilyData* cfd = arg.cfd_;
2082 if (cfd->Unref()) {
2083 delete cfd;
2084 arg.cfd_ = nullptr;
2085 }
7c673cae
FG
2086 }
2087 }
2088 return status;
2089}
2090
494da23a 2091void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
7c673cae
FG
2092 bool made_progress = false;
2093 JobContext job_context(next_job_id_.fetch_add(1), true);
7c673cae
FG
2094
2095 TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:start");
2096
2097 LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
2098 immutable_db_options_.info_log.get());
2099 {
2100 InstrumentedMutexLock l(&mutex_);
11fdf7f2 2101 assert(bg_flush_scheduled_);
7c673cae
FG
2102 num_running_flushes_++;
2103
2104 auto pending_outputs_inserted_elem =
2105 CaptureCurrentFileNumberInPendingOutputs();
11fdf7f2 2106 FlushReason reason;
7c673cae 2107
494da23a
TL
2108 Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer,
2109 &reason, thread_pri);
11fdf7f2
TL
2110 if (!s.ok() && !s.IsShutdownInProgress() &&
2111 reason != FlushReason::kErrorRecovery) {
7c673cae
FG
2112 // Wait a little bit before retrying background flush in
2113 // case this is an environmental problem and we do not want to
2114 // chew up resources for failed flushes for the duration of
2115 // the problem.
2116 uint64_t error_cnt =
11fdf7f2 2117 default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
7c673cae
FG
2118 bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
2119 mutex_.Unlock();
2120 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2121 "Waiting after background flush error: %s"
2122 "Accumulated background error counts: %" PRIu64,
2123 s.ToString().c_str(), error_cnt);
2124 log_buffer.FlushBufferToLog();
2125 LogFlush(immutable_db_options_.info_log);
2126 env_->SleepForMicroseconds(1000000);
2127 mutex_.Lock();
2128 }
2129
494da23a 2130 TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FlushFinish:0");
7c673cae
FG
2131 ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
2132
2133 // If flush failed, we want to delete all temporary files that we might have
2134 // created. Thus, we force full scan in FindObsoleteFiles()
2135 FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress());
2136 // delete unnecessary files if any, this is done outside the mutex
11fdf7f2
TL
2137 if (job_context.HaveSomethingToClean() ||
2138 job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
7c673cae 2139 mutex_.Unlock();
11fdf7f2 2140 TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FilesFound");
7c673cae
FG
2141 // Have to flush the info logs before bg_flush_scheduled_--
2142 // because if bg_flush_scheduled_ becomes 0 and the lock is
2143 // released, the deconstructor of DB can kick in and destroy all the
2144 // states of DB so info_log might not be available after that point.
2145 // It also applies to access other states that DB owns.
2146 log_buffer.FlushBufferToLog();
2147 if (job_context.HaveSomethingToDelete()) {
2148 PurgeObsoleteFiles(job_context);
2149 }
2150 job_context.Clean();
2151 mutex_.Lock();
2152 }
494da23a 2153 TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:ContextCleanedUp");
7c673cae
FG
2154
2155 assert(num_running_flushes_ > 0);
2156 num_running_flushes_--;
2157 bg_flush_scheduled_--;
2158 // See if there's more work to be done
2159 MaybeScheduleFlushOrCompaction();
494da23a 2160 atomic_flush_install_cv_.SignalAll();
7c673cae
FG
2161 bg_cv_.SignalAll();
2162 // IMPORTANT: there should be no code after calling SignalAll. This call may
2163 // signal the DB destructor that it's OK to proceed with destruction. In
2164 // that case, all DB variables will be dealloacated and referencing them
2165 // will cause trouble.
2166 }
2167}
2168
11fdf7f2
TL
2169void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
2170 Env::Priority bg_thread_pri) {
7c673cae 2171 bool made_progress = false;
7c673cae
FG
2172 JobContext job_context(next_job_id_.fetch_add(1), true);
2173 TEST_SYNC_POINT("BackgroundCallCompaction:0");
7c673cae
FG
2174 LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
2175 immutable_db_options_.info_log.get());
2176 {
2177 InstrumentedMutexLock l(&mutex_);
2178
2179 // This call will unlock/lock the mutex to wait for current running
2180 // IngestExternalFile() calls to finish.
2181 WaitForIngestFile();
2182
2183 num_running_compactions_++;
2184
2185 auto pending_outputs_inserted_elem =
2186 CaptureCurrentFileNumberInPendingOutputs();
2187
11fdf7f2
TL
2188 assert((bg_thread_pri == Env::Priority::BOTTOM &&
2189 bg_bottom_compaction_scheduled_) ||
2190 (bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_));
2191 Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer,
494da23a 2192 prepicked_compaction, bg_thread_pri);
7c673cae 2193 TEST_SYNC_POINT("BackgroundCallCompaction:1");
494da23a
TL
2194 if (s.IsBusy()) {
2195 bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
2196 mutex_.Unlock();
2197 env_->SleepForMicroseconds(10000); // prevent hot loop
2198 mutex_.Lock();
2199 } else if (!s.ok() && !s.IsShutdownInProgress()) {
7c673cae
FG
2200 // Wait a little bit before retrying background compaction in
2201 // case this is an environmental problem and we do not want to
2202 // chew up resources for failed compactions for the duration of
2203 // the problem.
2204 uint64_t error_cnt =
2205 default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
2206 bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
2207 mutex_.Unlock();
2208 log_buffer.FlushBufferToLog();
2209 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2210 "Waiting after background compaction error: %s, "
2211 "Accumulated background error counts: %" PRIu64,
2212 s.ToString().c_str(), error_cnt);
2213 LogFlush(immutable_db_options_.info_log);
2214 env_->SleepForMicroseconds(1000000);
2215 mutex_.Lock();
2216 }
2217
2218 ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
2219
2220 // If compaction failed, we want to delete all temporary files that we might
2221 // have created (they might not be all recorded in job_context in case of a
2222 // failure). Thus, we force full scan in FindObsoleteFiles()
2223 FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress());
11fdf7f2 2224 TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles");
7c673cae
FG
2225
2226 // delete unnecessary files if any, this is done outside the mutex
11fdf7f2
TL
2227 if (job_context.HaveSomethingToClean() ||
2228 job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
7c673cae
FG
2229 mutex_.Unlock();
2230 // Have to flush the info logs before bg_compaction_scheduled_--
2231 // because if bg_flush_scheduled_ becomes 0 and the lock is
2232 // released, the deconstructor of DB can kick in and destroy all the
2233 // states of DB so info_log might not be available after that point.
2234 // It also applies to access other states that DB owns.
2235 log_buffer.FlushBufferToLog();
2236 if (job_context.HaveSomethingToDelete()) {
2237 PurgeObsoleteFiles(job_context);
11fdf7f2 2238 TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles");
7c673cae
FG
2239 }
2240 job_context.Clean();
2241 mutex_.Lock();
2242 }
2243
2244 assert(num_running_compactions_ > 0);
2245 num_running_compactions_--;
11fdf7f2
TL
2246 if (bg_thread_pri == Env::Priority::LOW) {
2247 bg_compaction_scheduled_--;
2248 } else {
2249 assert(bg_thread_pri == Env::Priority::BOTTOM);
2250 bg_bottom_compaction_scheduled_--;
2251 }
7c673cae
FG
2252
2253 versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
2254
2255 // See if there's more work to be done
2256 MaybeScheduleFlushOrCompaction();
11fdf7f2
TL
2257 if (made_progress ||
2258 (bg_compaction_scheduled_ == 0 &&
2259 bg_bottom_compaction_scheduled_ == 0) ||
2260 HasPendingManualCompaction() || unscheduled_compactions_ == 0) {
7c673cae
FG
2261 // signal if
2262 // * made_progress -- need to wakeup DelayWrite
11fdf7f2 2263 // * bg_{bottom,}_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
7c673cae
FG
2264 // * HasPendingManualCompaction -- need to wakeup RunManualCompaction
2265 // If none of this is true, there is no need to signal since nobody is
2266 // waiting for it
2267 bg_cv_.SignalAll();
2268 }
2269 // IMPORTANT: there should be no code after calling SignalAll. This call may
2270 // signal the DB destructor that it's OK to proceed with destruction. In
2271 // that case, all DB variables will be dealloacated and referencing them
2272 // will cause trouble.
2273 }
2274}
2275
2276Status DBImpl::BackgroundCompaction(bool* made_progress,
2277 JobContext* job_context,
11fdf7f2 2278 LogBuffer* log_buffer,
494da23a
TL
2279 PrepickedCompaction* prepicked_compaction,
2280 Env::Priority thread_pri) {
11fdf7f2
TL
2281 ManualCompactionState* manual_compaction =
2282 prepicked_compaction == nullptr
2283 ? nullptr
2284 : prepicked_compaction->manual_compaction_state;
7c673cae
FG
2285 *made_progress = false;
2286 mutex_.AssertHeld();
2287 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Start");
2288
2289 bool is_manual = (manual_compaction != nullptr);
494da23a 2290 std::unique_ptr<Compaction> c;
11fdf7f2
TL
2291 if (prepicked_compaction != nullptr &&
2292 prepicked_compaction->compaction != nullptr) {
2293 c.reset(prepicked_compaction->compaction);
2294 }
2295 bool is_prepicked = is_manual || c;
7c673cae
FG
2296
2297 // (manual_compaction->in_progress == false);
2298 bool trivial_move_disallowed =
2299 is_manual && manual_compaction->disallow_trivial_move;
2300
2301 CompactionJobStats compaction_job_stats;
11fdf7f2
TL
2302 Status status;
2303 if (!error_handler_.IsBGWorkStopped()) {
2304 if (shutting_down_.load(std::memory_order_acquire)) {
2305 status = Status::ShutdownInProgress();
2306 }
2307 } else {
2308 status = error_handler_.GetBGError();
2309 // If we get here, it means a hard error happened after this compaction
2310 // was scheduled by MaybeScheduleFlushOrCompaction(), but before it got
2311 // a chance to execute. Since we didn't pop a cfd from the compaction
2312 // queue, increment unscheduled_compactions_
2313 unscheduled_compactions_++;
7c673cae
FG
2314 }
2315
2316 if (!status.ok()) {
2317 if (is_manual) {
2318 manual_compaction->status = status;
2319 manual_compaction->done = true;
2320 manual_compaction->in_progress = false;
7c673cae
FG
2321 manual_compaction = nullptr;
2322 }
494da23a
TL
2323 if (c) {
2324 c->ReleaseCompactionFiles(status);
2325 c.reset();
2326 }
7c673cae
FG
2327 return status;
2328 }
2329
2330 if (is_manual) {
2331 // another thread cannot pick up the same work
2332 manual_compaction->in_progress = true;
2333 }
2334
494da23a
TL
2335 std::unique_ptr<TaskLimiterToken> task_token;
2336
7c673cae
FG
2337 // InternalKey manual_end_storage;
2338 // InternalKey* manual_end = &manual_end_storage;
11fdf7f2 2339 bool sfm_reserved_compact_space = false;
7c673cae 2340 if (is_manual) {
11fdf7f2 2341 ManualCompactionState* m = manual_compaction;
7c673cae 2342 assert(m->in_progress);
7c673cae
FG
2343 if (!c) {
2344 m->done = true;
2345 m->manual_end = nullptr;
2346 ROCKS_LOG_BUFFER(log_buffer,
2347 "[%s] Manual compaction from level-%d from %s .. "
2348 "%s; nothing to do\n",
2349 m->cfd->GetName().c_str(), m->input_level,
2350 (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
2351 (m->end ? m->end->DebugString().c_str() : "(end)"));
2352 } else {
11fdf7f2
TL
2353 // First check if we have enough room to do the compaction
2354 bool enough_room = EnoughRoomForCompaction(
2355 m->cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer);
2356
2357 if (!enough_room) {
2358 // Then don't do the compaction
2359 c->ReleaseCompactionFiles(status);
2360 c.reset();
2361 // m's vars will get set properly at the end of this function,
2362 // as long as status == CompactionTooLarge
2363 status = Status::CompactionTooLarge();
2364 } else {
2365 ROCKS_LOG_BUFFER(
2366 log_buffer,
2367 "[%s] Manual compaction from level-%d to level-%d from %s .. "
2368 "%s; will stop at %s\n",
2369 m->cfd->GetName().c_str(), m->input_level, c->output_level(),
2370 (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
2371 (m->end ? m->end->DebugString().c_str() : "(end)"),
2372 ((m->done || m->manual_end == nullptr)
2373 ? "(end)"
2374 : m->manual_end->DebugString().c_str()));
2375 }
2376 }
2377 } else if (!is_prepicked && !compaction_queue_.empty()) {
2378 if (HasExclusiveManualCompaction()) {
2379 // Can't compact right now, but try again later
2380 TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict");
2381
2382 // Stay in the compaction queue.
2383 unscheduled_compactions_++;
2384
2385 return Status::OK();
2386 }
2387
494da23a
TL
2388 auto cfd = PickCompactionFromQueue(&task_token, log_buffer);
2389 if (cfd == nullptr) {
2390 // Can't find any executable task from the compaction queue.
2391 // All tasks have been throttled by compaction thread limiter.
2392 ++unscheduled_compactions_;
2393 return Status::Busy();
2394 }
2395
7c673cae
FG
2396 // We unreference here because the following code will take a Ref() on
2397 // this cfd if it is going to use it (Compaction class holds a
2398 // reference).
2399 // This will all happen under a mutex so we don't have to be afraid of
2400 // somebody else deleting it.
2401 if (cfd->Unref()) {
7c673cae
FG
2402 // This was the last reference of the column family, so no need to
2403 // compact.
494da23a 2404 delete cfd;
7c673cae
FG
2405 return Status::OK();
2406 }
2407
7c673cae
FG
2408 // Pick up latest mutable CF Options and use it throughout the
2409 // compaction job
2410 // Compaction makes a copy of the latest MutableCFOptions. It should be used
2411 // throughout the compaction procedure to make sure consistency. It will
2412 // eventually be installed into SuperVersion
2413 auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
2414 if (!mutable_cf_options->disable_auto_compactions && !cfd->IsDropped()) {
2415 // NOTE: try to avoid unnecessary copy of MutableCFOptions if
2416 // compaction is not necessary. Need to make sure mutex is held
2417 // until we make a copy in the following code
2418 TEST_SYNC_POINT("DBImpl::BackgroundCompaction():BeforePickCompaction");
2419 c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer));
2420 TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction");
11fdf7f2 2421
7c673cae 2422 if (c != nullptr) {
11fdf7f2
TL
2423 bool enough_room = EnoughRoomForCompaction(
2424 cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer);
2425
2426 if (!enough_room) {
2427 // Then don't do the compaction
2428 c->ReleaseCompactionFiles(status);
2429 c->column_family_data()
2430 ->current()
2431 ->storage_info()
2432 ->ComputeCompactionScore(*(c->immutable_cf_options()),
2433 *(c->mutable_cf_options()));
7c673cae
FG
2434 AddToCompactionQueue(cfd);
2435 ++unscheduled_compactions_;
11fdf7f2
TL
2436
2437 c.reset();
2438 // Don't need to sleep here, because BackgroundCallCompaction
2439 // will sleep if !s.ok()
2440 status = Status::CompactionTooLarge();
2441 } else {
2442 // update statistics
494da23a
TL
2443 RecordInHistogram(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
2444 c->inputs(0)->size());
11fdf7f2
TL
2445 // There are three things that can change compaction score:
2446 // 1) When flush or compaction finish. This case is covered by
2447 // InstallSuperVersionAndScheduleWork
2448 // 2) When MutableCFOptions changes. This case is also covered by
2449 // InstallSuperVersionAndScheduleWork, because this is when the new
2450 // options take effect.
2451 // 3) When we Pick a new compaction, we "remove" those files being
2452 // compacted from the calculation, which then influences compaction
2453 // score. Here we check if we need the new compaction even without the
2454 // files that are currently being compacted. If we need another
2455 // compaction, we might be able to execute it in parallel, so we add
2456 // it to the queue and schedule a new thread.
2457 if (cfd->NeedsCompaction()) {
2458 // Yes, we need more compactions!
2459 AddToCompactionQueue(cfd);
2460 ++unscheduled_compactions_;
2461 MaybeScheduleFlushOrCompaction();
2462 }
7c673cae
FG
2463 }
2464 }
2465 }
2466 }
2467
2468 if (!c) {
2469 // Nothing to do
2470 ROCKS_LOG_BUFFER(log_buffer, "Compaction nothing to do");
2471 } else if (c->deletion_compaction()) {
2472 // TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old
2473 // file if there is alive snapshot pointing to it
494da23a
TL
2474 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
2475 c->column_family_data());
7c673cae
FG
2476 assert(c->num_input_files(1) == 0);
2477 assert(c->level() == 0);
2478 assert(c->column_family_data()->ioptions()->compaction_style ==
2479 kCompactionStyleFIFO);
2480
2481 compaction_job_stats.num_input_files = c->num_input_files(0);
2482
494da23a
TL
2483 NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
2484 compaction_job_stats, job_context->job_id);
2485
7c673cae
FG
2486 for (const auto& f : *c->inputs(0)) {
2487 c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
2488 }
2489 status = versions_->LogAndApply(c->column_family_data(),
2490 *c->mutable_cf_options(), c->edit(),
2491 &mutex_, directories_.GetDbDir());
494da23a
TL
2492 InstallSuperVersionAndScheduleWork(c->column_family_data(),
2493 &job_context->superversion_contexts[0],
2494 *c->mutable_cf_options());
7c673cae
FG
2495 ROCKS_LOG_BUFFER(log_buffer, "[%s] Deleted %d files\n",
2496 c->column_family_data()->GetName().c_str(),
2497 c->num_input_files(0));
2498 *made_progress = true;
494da23a
TL
2499 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
2500 c->column_family_data());
7c673cae
FG
2501 } else if (!trivial_move_disallowed && c->IsTrivialMove()) {
2502 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:TrivialMove");
494da23a
TL
2503 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
2504 c->column_family_data());
7c673cae
FG
2505 // Instrument for event update
2506 // TODO(yhchiang): add op details for showing trivial-move.
2507 ThreadStatusUtil::SetColumnFamily(
2508 c->column_family_data(), c->column_family_data()->ioptions()->env,
2509 immutable_db_options_.enable_thread_tracking);
2510 ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
2511
2512 compaction_job_stats.num_input_files = c->num_input_files(0);
2513
494da23a
TL
2514 NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
2515 compaction_job_stats, job_context->job_id);
2516
7c673cae
FG
2517 // Move files to next level
2518 int32_t moved_files = 0;
2519 int64_t moved_bytes = 0;
2520 for (unsigned int l = 0; l < c->num_input_levels(); l++) {
2521 if (c->level(l) == c->output_level()) {
2522 continue;
2523 }
2524 for (size_t i = 0; i < c->num_input_files(l); i++) {
2525 FileMetaData* f = c->input(l, i);
2526 c->edit()->DeleteFile(c->level(l), f->fd.GetNumber());
2527 c->edit()->AddFile(c->output_level(), f->fd.GetNumber(),
2528 f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest,
11fdf7f2
TL
2529 f->largest, f->fd.smallest_seqno,
2530 f->fd.largest_seqno, f->marked_for_compaction);
2531
2532 ROCKS_LOG_BUFFER(
2533 log_buffer,
2534 "[%s] Moving #%" PRIu64 " to level-%d %" PRIu64 " bytes\n",
2535 c->column_family_data()->GetName().c_str(), f->fd.GetNumber(),
2536 c->output_level(), f->fd.GetFileSize());
7c673cae
FG
2537 ++moved_files;
2538 moved_bytes += f->fd.GetFileSize();
2539 }
2540 }
2541
2542 status = versions_->LogAndApply(c->column_family_data(),
2543 *c->mutable_cf_options(), c->edit(),
2544 &mutex_, directories_.GetDbDir());
2545 // Use latest MutableCFOptions
494da23a
TL
2546 InstallSuperVersionAndScheduleWork(c->column_family_data(),
2547 &job_context->superversion_contexts[0],
2548 *c->mutable_cf_options());
7c673cae
FG
2549
2550 VersionStorageInfo::LevelSummaryStorage tmp;
2551 c->column_family_data()->internal_stats()->IncBytesMoved(c->output_level(),
2552 moved_bytes);
2553 {
2554 event_logger_.LogToBuffer(log_buffer)
2555 << "job" << job_context->job_id << "event"
2556 << "trivial_move"
2557 << "destination_level" << c->output_level() << "files" << moved_files
2558 << "total_files_size" << moved_bytes;
2559 }
2560 ROCKS_LOG_BUFFER(
2561 log_buffer,
2562 "[%s] Moved #%d files to level-%d %" PRIu64 " bytes %s: %s\n",
2563 c->column_family_data()->GetName().c_str(), moved_files,
2564 c->output_level(), moved_bytes, status.ToString().c_str(),
2565 c->column_family_data()->current()->storage_info()->LevelSummary(&tmp));
2566 *made_progress = true;
2567
2568 // Clear Instrument
2569 ThreadStatusUtil::ResetThreadStatus();
494da23a
TL
2570 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
2571 c->column_family_data());
11fdf7f2
TL
2572 } else if (!is_prepicked && c->output_level() > 0 &&
2573 c->output_level() ==
2574 c->column_family_data()
2575 ->current()
2576 ->storage_info()
2577 ->MaxOutputLevel(
2578 immutable_db_options_.allow_ingest_behind) &&
2579 env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
2580 // Forward compactions involving last level to the bottom pool if it exists,
2581 // such that compactions unlikely to contribute to write stalls can be
2582 // delayed or deprioritized.
2583 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:ForwardToBottomPriPool");
2584 CompactionArg* ca = new CompactionArg;
2585 ca->db = this;
2586 ca->prepicked_compaction = new PrepickedCompaction;
2587 ca->prepicked_compaction->compaction = c.release();
2588 ca->prepicked_compaction->manual_compaction_state = nullptr;
494da23a
TL
2589 // Transfer requested token, so it doesn't need to do it again.
2590 ca->prepicked_compaction->task_token = std::move(task_token);
11fdf7f2
TL
2591 ++bg_bottom_compaction_scheduled_;
2592 env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, Env::Priority::BOTTOM,
494da23a 2593 this, &DBImpl::UnscheduleCompactionCallback);
7c673cae 2594 } else {
494da23a
TL
2595 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
2596 c->column_family_data());
11fdf7f2
TL
2597 int output_level __attribute__((__unused__));
2598 output_level = c->output_level();
7c673cae
FG
2599 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial",
2600 &output_level);
494da23a 2601 std::vector<SequenceNumber> snapshot_seqs;
7c673cae 2602 SequenceNumber earliest_write_conflict_snapshot;
494da23a
TL
2603 SnapshotChecker* snapshot_checker;
2604 GetSnapshotContext(job_context, &snapshot_seqs,
2605 &earliest_write_conflict_snapshot, &snapshot_checker);
7c673cae
FG
2606 assert(is_snapshot_supported_ || snapshots_.empty());
2607 CompactionJob compaction_job(
11fdf7f2
TL
2608 job_context->job_id, c.get(), immutable_db_options_,
2609 env_options_for_compaction_, versions_.get(), &shutting_down_,
2610 preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
2611 GetDataDir(c->column_family_data(), c->output_path_id()), stats_,
494da23a
TL
2612 &mutex_, &error_handler_, snapshot_seqs,
2613 earliest_write_conflict_snapshot, snapshot_checker, table_cache_,
2614 &event_logger_, c->mutable_cf_options()->paranoid_file_checks,
7c673cae 2615 c->mutable_cf_options()->report_bg_io_stats, dbname_,
494da23a 2616 &compaction_job_stats, thread_pri);
7c673cae
FG
2617 compaction_job.Prepare();
2618
494da23a
TL
2619 NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
2620 compaction_job_stats, job_context->job_id);
2621
7c673cae
FG
2622 mutex_.Unlock();
2623 compaction_job.Run();
2624 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun");
2625 mutex_.Lock();
2626
2627 status = compaction_job.Install(*c->mutable_cf_options());
2628 if (status.ok()) {
494da23a
TL
2629 InstallSuperVersionAndScheduleWork(c->column_family_data(),
2630 &job_context->superversion_contexts[0],
2631 *c->mutable_cf_options());
7c673cae
FG
2632 }
2633 *made_progress = true;
494da23a
TL
2634 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
2635 c->column_family_data());
7c673cae
FG
2636 }
2637 if (c != nullptr) {
2638 c->ReleaseCompactionFiles(status);
2639 *made_progress = true;
11fdf7f2
TL
2640
2641#ifndef ROCKSDB_LITE
2642 // Need to make sure SstFileManager does its bookkeeping
2643 auto sfm = static_cast<SstFileManagerImpl*>(
2644 immutable_db_options_.sst_file_manager.get());
2645 if (sfm && sfm_reserved_compact_space) {
2646 sfm->OnCompactionCompletion(c.get());
2647 }
2648#endif // ROCKSDB_LITE
2649
2650 NotifyOnCompactionCompleted(c->column_family_data(), c.get(), status,
2651 compaction_job_stats, job_context->job_id);
7c673cae 2652 }
7c673cae 2653
11fdf7f2 2654 if (status.ok() || status.IsCompactionTooLarge()) {
7c673cae
FG
2655 // Done
2656 } else if (status.IsShutdownInProgress()) {
2657 // Ignore compaction errors found during shutting down
2658 } else {
2659 ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s",
2660 status.ToString().c_str());
11fdf7f2
TL
2661 error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
2662 if (c != nullptr && !is_manual && !error_handler_.IsBGWorkStopped()) {
2663 // Put this cfd back in the compaction queue so we can retry after some
2664 // time
2665 auto cfd = c->column_family_data();
2666 assert(cfd != nullptr);
2667 // Since this compaction failed, we need to recompute the score so it
2668 // takes the original input files into account
2669 c->column_family_data()
2670 ->current()
2671 ->storage_info()
2672 ->ComputeCompactionScore(*(c->immutable_cf_options()),
2673 *(c->mutable_cf_options()));
2674 if (!cfd->queued_for_compaction()) {
2675 AddToCompactionQueue(cfd);
2676 ++unscheduled_compactions_;
2677 }
7c673cae
FG
2678 }
2679 }
11fdf7f2
TL
2680 // this will unref its input_version and column_family_data
2681 c.reset();
7c673cae
FG
2682
2683 if (is_manual) {
11fdf7f2 2684 ManualCompactionState* m = manual_compaction;
7c673cae
FG
2685 if (!status.ok()) {
2686 m->status = status;
2687 m->done = true;
2688 }
2689 // For universal compaction:
2690 // Because universal compaction always happens at level 0, so one
2691 // compaction will pick up all overlapped files. No files will be
2692 // filtered out due to size limit and left for a successive compaction.
2693 // So we can safely conclude the current compaction.
2694 //
2695 // Also note that, if we don't stop here, then the current compaction
2696 // writes a new file back to level 0, which will be used in successive
2697 // compaction. Hence the manual compaction will never finish.
2698 //
2699 // Stop the compaction if manual_end points to nullptr -- this means
2700 // that we compacted the whole range. manual_end should always point
2701 // to nullptr in case of universal compaction
2702 if (m->manual_end == nullptr) {
2703 m->done = true;
2704 }
2705 if (!m->done) {
2706 // We only compacted part of the requested range. Update *m
2707 // to the range that is left to be compacted.
2708 // Universal and FIFO compactions should always compact the whole range
2709 assert(m->cfd->ioptions()->compaction_style !=
2710 kCompactionStyleUniversal ||
2711 m->cfd->ioptions()->num_levels > 1);
2712 assert(m->cfd->ioptions()->compaction_style != kCompactionStyleFIFO);
2713 m->tmp_storage = *m->manual_end;
2714 m->begin = &m->tmp_storage;
2715 m->incomplete = true;
2716 }
11fdf7f2 2717 m->in_progress = false; // not being processed anymore
7c673cae
FG
2718 }
2719 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Finish");
2720 return status;
2721}
2722
2723bool DBImpl::HasPendingManualCompaction() {
2724 return (!manual_compaction_dequeue_.empty());
2725}
2726
11fdf7f2 2727void DBImpl::AddManualCompaction(DBImpl::ManualCompactionState* m) {
7c673cae
FG
2728 manual_compaction_dequeue_.push_back(m);
2729}
2730
11fdf7f2 2731void DBImpl::RemoveManualCompaction(DBImpl::ManualCompactionState* m) {
7c673cae 2732 // Remove from queue
11fdf7f2 2733 std::deque<ManualCompactionState*>::iterator it =
7c673cae
FG
2734 manual_compaction_dequeue_.begin();
2735 while (it != manual_compaction_dequeue_.end()) {
2736 if (m == (*it)) {
2737 it = manual_compaction_dequeue_.erase(it);
2738 return;
2739 }
2740 it++;
2741 }
2742 assert(false);
2743 return;
2744}
2745
11fdf7f2 2746bool DBImpl::ShouldntRunManualCompaction(ManualCompactionState* m) {
7c673cae
FG
2747 if (num_running_ingest_file_ > 0) {
2748 // We need to wait for other IngestExternalFile() calls to finish
2749 // before running a manual compaction.
2750 return true;
2751 }
2752 if (m->exclusive) {
11fdf7f2
TL
2753 return (bg_bottom_compaction_scheduled_ > 0 ||
2754 bg_compaction_scheduled_ > 0);
7c673cae 2755 }
11fdf7f2 2756 std::deque<ManualCompactionState*>::iterator it =
7c673cae
FG
2757 manual_compaction_dequeue_.begin();
2758 bool seen = false;
2759 while (it != manual_compaction_dequeue_.end()) {
2760 if (m == (*it)) {
2761 it++;
2762 seen = true;
2763 continue;
2764 } else if (MCOverlap(m, (*it)) && (!seen && !(*it)->in_progress)) {
2765 // Consider the other manual compaction *it, conflicts if:
2766 // overlaps with m
2767 // and (*it) is ahead in the queue and is not yet in progress
2768 return true;
2769 }
2770 it++;
2771 }
2772 return false;
2773}
2774
2775bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) {
2776 // Remove from priority queue
11fdf7f2 2777 std::deque<ManualCompactionState*>::iterator it =
7c673cae
FG
2778 manual_compaction_dequeue_.begin();
2779 while (it != manual_compaction_dequeue_.end()) {
2780 if ((*it)->exclusive) {
2781 return true;
2782 }
2783 if ((cfd == (*it)->cfd) && (!((*it)->in_progress || (*it)->done))) {
2784 // Allow automatic compaction if manual compaction is
11fdf7f2 2785 // in progress
7c673cae
FG
2786 return true;
2787 }
2788 it++;
2789 }
2790 return false;
2791}
2792
2793bool DBImpl::HasExclusiveManualCompaction() {
2794 // Remove from priority queue
11fdf7f2 2795 std::deque<ManualCompactionState*>::iterator it =
7c673cae
FG
2796 manual_compaction_dequeue_.begin();
2797 while (it != manual_compaction_dequeue_.end()) {
2798 if ((*it)->exclusive) {
2799 return true;
2800 }
2801 it++;
2802 }
2803 return false;
2804}
2805
11fdf7f2 2806bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) {
7c673cae
FG
2807 if ((m->exclusive) || (m1->exclusive)) {
2808 return true;
2809 }
2810 if (m->cfd != m1->cfd) {
2811 return false;
2812 }
2813 return true;
2814}
2815
494da23a
TL
2816#ifndef ROCKSDB_LITE
2817void DBImpl::BuildCompactionJobInfo(
2818 const ColumnFamilyData* cfd, Compaction* c, const Status& st,
2819 const CompactionJobStats& compaction_job_stats, const int job_id,
2820 const Version* current, CompactionJobInfo* compaction_job_info) const {
2821 assert(compaction_job_info != nullptr);
2822 compaction_job_info->cf_id = cfd->GetID();
2823 compaction_job_info->cf_name = cfd->GetName();
2824 compaction_job_info->status = st;
2825 compaction_job_info->thread_id = env_->GetThreadID();
2826 compaction_job_info->job_id = job_id;
2827 compaction_job_info->base_input_level = c->start_level();
2828 compaction_job_info->output_level = c->output_level();
2829 compaction_job_info->stats = compaction_job_stats;
2830 compaction_job_info->table_properties = c->GetOutputTableProperties();
2831 compaction_job_info->compaction_reason = c->compaction_reason();
2832 compaction_job_info->compression = c->output_compression();
2833 for (size_t i = 0; i < c->num_input_levels(); ++i) {
2834 for (const auto fmd : *c->inputs(i)) {
2835 auto fn = TableFileName(c->immutable_cf_options()->cf_paths,
2836 fmd->fd.GetNumber(), fmd->fd.GetPathId());
2837 compaction_job_info->input_files.push_back(fn);
2838 if (compaction_job_info->table_properties.count(fn) == 0) {
2839 std::shared_ptr<const TableProperties> tp;
2840 auto s = current->GetTableProperties(&tp, fmd, &fn);
2841 if (s.ok()) {
2842 compaction_job_info->table_properties[fn] = tp;
2843 }
2844 }
2845 }
2846 }
2847 for (const auto& newf : c->edit()->GetNewFiles()) {
2848 compaction_job_info->output_files.push_back(
2849 TableFileName(c->immutable_cf_options()->cf_paths,
2850 newf.second.fd.GetNumber(), newf.second.fd.GetPathId()));
2851 }
2852}
2853#endif
2854
11fdf7f2
TL
2855// SuperVersionContext gets created and destructed outside of the lock --
2856// we use this conveniently to:
7c673cae
FG
2857// * malloc one SuperVersion() outside of the lock -- new_superversion
2858// * delete SuperVersion()s outside of the lock -- superversions_to_free
2859//
2860// However, if InstallSuperVersionAndScheduleWork() gets called twice with the
11fdf7f2 2861// same sv_context, we can't reuse the SuperVersion() that got
7c673cae
FG
2862// malloced because
2863// first call already used it. In that rare case, we take a hit and create a
2864// new SuperVersion() inside of the mutex. We do similar thing
2865// for superversion_to_free
7c673cae 2866
11fdf7f2
TL
2867void DBImpl::InstallSuperVersionAndScheduleWork(
2868 ColumnFamilyData* cfd, SuperVersionContext* sv_context,
494da23a 2869 const MutableCFOptions& mutable_cf_options) {
7c673cae
FG
2870 mutex_.AssertHeld();
2871
2872 // Update max_total_in_memory_state_
2873 size_t old_memtable_size = 0;
2874 auto* old_sv = cfd->GetSuperVersion();
2875 if (old_sv) {
2876 old_memtable_size = old_sv->mutable_cf_options.write_buffer_size *
2877 old_sv->mutable_cf_options.max_write_buffer_number;
2878 }
2879
11fdf7f2
TL
2880 // this branch is unlikely to step in
2881 if (UNLIKELY(sv_context->new_superversion == nullptr)) {
2882 sv_context->NewSuperVersion();
2883 }
2884 cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options);
7c673cae 2885
494da23a
TL
2886 // There may be a small data race here. The snapshot tricking bottommost
2887 // compaction may already be released here. But assuming there will always be
2888 // newer snapshot created and released frequently, the compaction will be
2889 // triggered soon anyway.
2890 bottommost_files_mark_threshold_ = kMaxSequenceNumber;
2891 for (auto* my_cfd : *versions_->GetColumnFamilySet()) {
2892 bottommost_files_mark_threshold_ = std::min(
2893 bottommost_files_mark_threshold_,
2894 my_cfd->current()->storage_info()->bottommost_files_mark_threshold());
2895 }
2896
7c673cae
FG
2897 // Whenever we install new SuperVersion, we might need to issue new flushes or
2898 // compactions.
7c673cae
FG
2899 SchedulePendingCompaction(cfd);
2900 MaybeScheduleFlushOrCompaction();
2901
2902 // Update max_total_in_memory_state_
11fdf7f2
TL
2903 max_total_in_memory_state_ = max_total_in_memory_state_ - old_memtable_size +
2904 mutable_cf_options.write_buffer_size *
2905 mutable_cf_options.max_write_buffer_number;
2906}
2907
2908// ShouldPurge is called by FindObsoleteFiles when doing a full scan,
2909// and db mutex (mutex_) should already be held. This function performs a
2910// linear scan of an vector (files_grabbed_for_purge_) in search of a
2911// certain element. We expect FindObsoleteFiles with full scan to occur once
2912// every 10 hours by default, and the size of the vector is small.
2913// Therefore, the cost is affordable even if the mutex is held.
2914// Actually, the current implementation of FindObsoleteFiles with
2915// full_scan=true can issue I/O requests to obtain list of files in
2916// directories, e.g. env_->getChildren while holding db mutex.
2917// In the future, if we want to reduce the cost of search, we may try to keep
2918// the vector sorted.
2919bool DBImpl::ShouldPurge(uint64_t file_number) const {
2920 for (auto fn : files_grabbed_for_purge_) {
2921 if (file_number == fn) {
2922 return false;
2923 }
2924 }
2925 for (const auto& purge_file_info : purge_queue_) {
2926 if (purge_file_info.number == file_number) {
2927 return false;
2928 }
2929 }
2930 return true;
2931}
2932
2933// MarkAsGrabbedForPurge is called by FindObsoleteFiles, and db mutex
2934// (mutex_) should already be held.
2935void DBImpl::MarkAsGrabbedForPurge(uint64_t file_number) {
2936 files_grabbed_for_purge_.emplace_back(file_number);
2937}
2938
2939void DBImpl::SetSnapshotChecker(SnapshotChecker* snapshot_checker) {
2940 InstrumentedMutexLock l(&mutex_);
2941 // snapshot_checker_ should only set once. If we need to set it multiple
2942 // times, we need to make sure the old one is not deleted while it is still
2943 // using by a compaction job.
2944 assert(!snapshot_checker_);
2945 snapshot_checker_.reset(snapshot_checker);
7c673cae 2946}
494da23a
TL
2947
2948void DBImpl::GetSnapshotContext(
2949 JobContext* job_context, std::vector<SequenceNumber>* snapshot_seqs,
2950 SequenceNumber* earliest_write_conflict_snapshot,
2951 SnapshotChecker** snapshot_checker_ptr) {
2952 mutex_.AssertHeld();
2953 assert(job_context != nullptr);
2954 assert(snapshot_seqs != nullptr);
2955 assert(earliest_write_conflict_snapshot != nullptr);
2956 assert(snapshot_checker_ptr != nullptr);
2957
2958 *snapshot_checker_ptr = snapshot_checker_.get();
2959 if (use_custom_gc_ && *snapshot_checker_ptr == nullptr) {
2960 *snapshot_checker_ptr = DisableGCSnapshotChecker::Instance();
2961 }
2962 if (*snapshot_checker_ptr != nullptr) {
2963 // If snapshot_checker is used, that means the flush/compaction may
2964 // contain values not visible to snapshot taken after
2965 // flush/compaction job starts. Take a snapshot and it will appear
2966 // in snapshot_seqs and force compaction iterator to consider such
2967 // snapshots.
2968 const Snapshot* job_snapshot =
2969 GetSnapshotImpl(false /*write_conflict_boundary*/, false /*lock*/);
2970 job_context->job_snapshot.reset(new ManagedSnapshot(this, job_snapshot));
2971 }
2972 *snapshot_seqs = snapshots_.GetAll(earliest_write_conflict_snapshot);
2973}
7c673cae 2974} // namespace rocksdb