]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/db_impl/db_impl_compaction_flush.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / db_impl / db_impl_compaction_flush.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
f67539c2 9#include <cinttypes>
1e59de90 10#include <deque>
7c673cae
FG
11
12#include "db/builder.h"
20effc67 13#include "db/db_impl/db_impl.h"
11fdf7f2
TL
14#include "db/error_handler.h"
15#include "db/event_helpers.h"
f67539c2 16#include "file/sst_file_manager_impl.h"
1e59de90 17#include "logging/logging.h"
7c673cae
FG
18#include "monitoring/iostats_context_imp.h"
19#include "monitoring/perf_context_imp.h"
20#include "monitoring/thread_status_updater.h"
21#include "monitoring/thread_status_util.h"
f67539c2
TL
22#include "test_util/sync_point.h"
23#include "util/cast_util.h"
494da23a 24#include "util/concurrent_task_limiter_impl.h"
7c673cae 25
f67539c2 26namespace ROCKSDB_NAMESPACE {
11fdf7f2
TL
27
28bool DBImpl::EnoughRoomForCompaction(
29 ColumnFamilyData* cfd, const std::vector<CompactionInputFiles>& inputs,
30 bool* sfm_reserved_compact_space, LogBuffer* log_buffer) {
31 // Check if we have enough room to do the compaction
32 bool enough_room = true;
33#ifndef ROCKSDB_LITE
34 auto sfm = static_cast<SstFileManagerImpl*>(
35 immutable_db_options_.sst_file_manager.get());
36 if (sfm) {
37 // Pass the current bg_error_ to SFM so it can decide what checks to
38 // perform. If this DB instance hasn't seen any error yet, the SFM can be
39 // optimistic and not do disk space checks
1e59de90
TL
40 Status bg_error = error_handler_.GetBGError();
41 enough_room = sfm->EnoughRoomForCompaction(cfd, inputs, bg_error);
42 bg_error.PermitUncheckedError(); // bg_error is just a copy of the Status
43 // from the error_handler_
11fdf7f2
TL
44 if (enough_room) {
45 *sfm_reserved_compact_space = true;
46 }
47 }
48#else
49 (void)cfd;
50 (void)inputs;
51 (void)sfm_reserved_compact_space;
52#endif // ROCKSDB_LITE
53 if (!enough_room) {
54 // Just in case tests want to change the value of enough_room
55 TEST_SYNC_POINT_CALLBACK(
56 "DBImpl::BackgroundCompaction():CancelledCompaction", &enough_room);
57 ROCKS_LOG_BUFFER(log_buffer,
58 "Cancelled compaction because not enough room");
59 RecordTick(stats_, COMPACTION_CANCELLED, 1);
60 }
61 return enough_room;
62}
63
494da23a
TL
64bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force,
65 std::unique_ptr<TaskLimiterToken>* token,
66 LogBuffer* log_buffer) {
67 assert(*token == nullptr);
68 auto limiter = static_cast<ConcurrentTaskLimiterImpl*>(
69 cfd->ioptions()->compaction_thread_limiter.get());
70 if (limiter == nullptr) {
71 return true;
72 }
73 *token = limiter->GetToken(force);
74 if (*token != nullptr) {
75 ROCKS_LOG_BUFFER(log_buffer,
76 "Thread limiter [%s] increase [%s] compaction task, "
77 "force: %s, tasks after: %d",
78 limiter->GetName().c_str(), cfd->GetName().c_str(),
79 force ? "true" : "false", limiter->GetOutstandingTask());
80 return true;
81 }
82 return false;
83}
84
1e59de90
TL
85IOStatus DBImpl::SyncClosedLogs(JobContext* job_context,
86 VersionEdit* synced_wals) {
7c673cae 87 TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
1e59de90 88 InstrumentedMutexLock l(&log_write_mutex_);
7c673cae
FG
89 autovector<log::Writer*, 1> logs_to_sync;
90 uint64_t current_log_number = logfile_number_;
91 while (logs_.front().number < current_log_number &&
1e59de90 92 logs_.front().IsSyncing()) {
7c673cae
FG
93 log_sync_cv_.Wait();
94 }
95 for (auto it = logs_.begin();
96 it != logs_.end() && it->number < current_log_number; ++it) {
97 auto& log = *it;
1e59de90 98 log.PrepareForSync();
7c673cae
FG
99 logs_to_sync.push_back(log.writer);
100 }
101
20effc67 102 IOStatus io_s;
7c673cae 103 if (!logs_to_sync.empty()) {
1e59de90
TL
104 log_write_mutex_.Unlock();
105
106 assert(job_context);
7c673cae
FG
107
108 for (log::Writer* log : logs_to_sync) {
109 ROCKS_LOG_INFO(immutable_db_options_.info_log,
110 "[JOB %d] Syncing log #%" PRIu64, job_context->job_id,
111 log->get_log_number());
1e59de90
TL
112 if (error_handler_.IsRecoveryInProgress()) {
113 log->file()->reset_seen_error();
114 }
20effc67
TL
115 io_s = log->file()->Sync(immutable_db_options_.use_fsync);
116 if (!io_s.ok()) {
11fdf7f2
TL
117 break;
118 }
f67539c2
TL
119
120 if (immutable_db_options_.recycle_log_file_num > 0) {
1e59de90
TL
121 if (error_handler_.IsRecoveryInProgress()) {
122 log->file()->reset_seen_error();
123 }
20effc67
TL
124 io_s = log->Close();
125 if (!io_s.ok()) {
f67539c2
TL
126 break;
127 }
128 }
7c673cae 129 }
20effc67 130 if (io_s.ok()) {
1e59de90
TL
131 io_s = directories_.GetWalDir()->FsyncWithDirOptions(
132 IOOptions(), nullptr,
133 DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
7c673cae
FG
134 }
135
1e59de90
TL
136 TEST_SYNC_POINT_CALLBACK("DBImpl::SyncClosedLogs:BeforeReLock",
137 /*arg=*/nullptr);
138 log_write_mutex_.Lock();
7c673cae
FG
139
140 // "number <= current_log_number - 1" is equivalent to
141 // "number < current_log_number".
20effc67 142 if (io_s.ok()) {
1e59de90 143 MarkLogsSynced(current_log_number - 1, true, synced_wals);
20effc67
TL
144 } else {
145 MarkLogsNotSynced(current_log_number - 1);
146 }
147 if (!io_s.ok()) {
7c673cae 148 TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
20effc67 149 return io_s;
7c673cae
FG
150 }
151 }
1e59de90 152 TEST_SYNC_POINT("DBImpl::SyncClosedLogs:end");
20effc67 153 return io_s;
7c673cae
FG
154}
155
156Status DBImpl::FlushMemTableToOutputFile(
157 ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
11fdf7f2 158 bool* made_progress, JobContext* job_context,
494da23a
TL
159 SuperVersionContext* superversion_context,
160 std::vector<SequenceNumber>& snapshot_seqs,
161 SequenceNumber earliest_write_conflict_snapshot,
162 SnapshotChecker* snapshot_checker, LogBuffer* log_buffer,
163 Env::Priority thread_pri) {
7c673cae 164 mutex_.AssertHeld();
20effc67 165 assert(cfd);
1e59de90 166 assert(cfd->imm());
7c673cae
FG
167 assert(cfd->imm()->NumNotFlushed() != 0);
168 assert(cfd->imm()->IsFlushPending());
1e59de90
TL
169 assert(versions_);
170 assert(versions_->GetColumnFamilySet());
171 // If there are more than one column families, we need to make sure that
172 // all the log files except the most recent one are synced. Otherwise if
173 // the host crashes after flushing and before WAL is persistent, the
174 // flushed SST may contain data from write batches whose updates to
175 // other (unflushed) column families are missing.
176 const bool needs_to_sync_closed_wals =
177 logfile_number_ > 0 &&
178 versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1;
179
180 // If needs_to_sync_closed_wals is true, we need to record the current
181 // maximum memtable ID of this column family so that a later PickMemtables()
182 // call will not pick memtables whose IDs are higher. This is due to the fact
183 // that SyncClosedLogs() may release the db mutex, and memtable switch can
184 // happen for this column family in the meantime. The newly created memtables
185 // have their data backed by unsynced WALs, thus they cannot be included in
186 // this flush job.
187 // Another reason why we must record the current maximum memtable ID of this
188 // column family: SyncClosedLogs() may release db mutex, thus it's possible
189 // for application to continue to insert into memtables increasing db's
190 // sequence number. The application may take a snapshot, but this snapshot is
191 // not included in `snapshot_seqs` which will be passed to flush job because
192 // `snapshot_seqs` has already been computed before this function starts.
193 // Recording the max memtable ID ensures that the flush job does not flush
194 // a memtable without knowing such snapshot(s).
195 uint64_t max_memtable_id = needs_to_sync_closed_wals
196 ? cfd->imm()->GetLatestMemTableID()
197 : std::numeric_limits<uint64_t>::max();
198
199 // If needs_to_sync_closed_wals is false, then the flush job will pick ALL
200 // existing memtables of the column family when PickMemTable() is called
201 // later. Although we won't call SyncClosedLogs() in this case, we may still
202 // call the callbacks of the listeners, i.e. NotifyOnFlushBegin() which also
203 // releases and re-acquires the db mutex. In the meantime, the application
204 // can still insert into the memtables and increase the db's sequence number.
205 // The application can take a snapshot, hoping that the latest visible state
206 // to this snapshto is preserved. This is hard to guarantee since db mutex
207 // not held. This newly-created snapshot is not included in `snapshot_seqs`
208 // and the flush job is unaware of its presence. Consequently, the flush job
209 // may drop certain keys when generating the L0, causing incorrect data to be
210 // returned for snapshot read using this snapshot.
211 // To address this, we make sure NotifyOnFlushBegin() executes after memtable
212 // picking so that no new snapshot can be taken between the two functions.
7c673cae 213
7c673cae 214 FlushJob flush_job(
1e59de90
TL
215 dbname_, cfd, immutable_db_options_, mutable_cf_options, max_memtable_id,
216 file_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_,
217 snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
218 job_context, log_buffer, directories_.GetDbDir(), GetDataDir(cfd, 0U),
7c673cae 219 GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
494da23a 220 &event_logger_, mutable_cf_options.report_bg_io_stats,
20effc67 221 true /* sync_output_directory */, true /* write_manifest */, thread_pri,
1e59de90
TL
222 io_tracer_, seqno_time_mapping_, db_id_, db_session_id_,
223 cfd->GetFullHistoryTsLow(), &blob_callback_);
7c673cae
FG
224 FileMetaData file_meta;
225
1e59de90
TL
226 Status s;
227 bool need_cancel = false;
228 IOStatus log_io_s = IOStatus::OK();
229 if (needs_to_sync_closed_wals) {
230 // SyncClosedLogs() may unlock and re-lock the log_write_mutex multiple
231 // times.
232 VersionEdit synced_wals;
233 mutex_.Unlock();
234 log_io_s = SyncClosedLogs(job_context, &synced_wals);
235 mutex_.Lock();
236 if (log_io_s.ok() && synced_wals.IsWalAddition()) {
237 log_io_s = status_to_io_status(ApplyWALToManifest(&synced_wals));
238 TEST_SYNC_POINT_CALLBACK("DBImpl::FlushMemTableToOutputFile:CommitWal:1",
239 nullptr);
240 }
241
242 if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() &&
243 !log_io_s.IsColumnFamilyDropped()) {
244 error_handler_.SetBGError(log_io_s, BackgroundErrorReason::kFlush);
245 }
246 } else {
247 TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Skip");
248 }
249 s = log_io_s;
250
251 // If the log sync failed, we do not need to pick memtable. Otherwise,
252 // num_flush_not_started_ needs to be rollback.
494da23a 253 TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables");
1e59de90
TL
254 if (s.ok()) {
255 flush_job.PickMemTable();
256 need_cancel = true;
257 }
258 TEST_SYNC_POINT_CALLBACK(
259 "DBImpl::FlushMemTableToOutputFile:AfterPickMemtables", &flush_job);
7c673cae
FG
260
261#ifndef ROCKSDB_LITE
262 // may temporarily unlock and lock the mutex.
f67539c2 263 NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id);
7c673cae
FG
264#endif // ROCKSDB_LITE
265
1e59de90 266 bool switched_to_mempurge = false;
7c673cae
FG
267 // Within flush_job.Run, rocksdb may call event listener to notify
268 // file creation and deletion.
269 //
270 // Note that flush_job.Run will unlock and lock the db_mutex,
271 // and EventListener callback will be called when the db_mutex
272 // is unlocked by the current thread.
273 if (s.ok()) {
1e59de90
TL
274 s = flush_job.Run(&logs_with_prep_tracker_, &file_meta,
275 &switched_to_mempurge);
276 need_cancel = false;
7c673cae 277 }
1e59de90
TL
278
279 if (!s.ok() && need_cancel) {
280 flush_job.Cancel();
20effc67 281 }
7c673cae
FG
282
283 if (s.ok()) {
11fdf7f2
TL
284 InstallSuperVersionAndScheduleWork(cfd, superversion_context,
285 mutable_cf_options);
7c673cae 286 if (made_progress) {
494da23a 287 *made_progress = true;
7c673cae 288 }
20effc67
TL
289
290 const std::string& column_family_name = cfd->GetName();
291
292 Version* const current = cfd->current();
293 assert(current);
294
295 const VersionStorageInfo* const storage_info = current->storage_info();
296 assert(storage_info);
297
7c673cae
FG
298 VersionStorageInfo::LevelSummaryStorage tmp;
299 ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
20effc67
TL
300 column_family_name.c_str(),
301 storage_info->LevelSummary(&tmp));
302
303 const auto& blob_files = storage_info->GetBlobFiles();
304 if (!blob_files.empty()) {
1e59de90
TL
305 assert(blob_files.front());
306 assert(blob_files.back());
307
308 ROCKS_LOG_BUFFER(
309 log_buffer,
310 "[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64 "\n",
311 column_family_name.c_str(), blob_files.front()->GetBlobFileNumber(),
312 blob_files.back()->GetBlobFileNumber());
20effc67 313 }
7c673cae
FG
314 }
315
f67539c2 316 if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
1e59de90 317 if (log_io_s.ok()) {
20effc67
TL
318 // Error while writing to MANIFEST.
319 // In fact, versions_->io_status() can also be the result of renaming
320 // CURRENT file. With current code, it's just difficult to tell. So just
321 // be pessimistic and try write to a new MANIFEST.
322 // TODO: distinguish between MANIFEST write and CURRENT renaming
323 if (!versions_->io_status().ok()) {
1e59de90
TL
324 // If WAL sync is successful (either WAL size is 0 or there is no IO
325 // error), all the Manifest write will be map to soft error.
326 // TODO: kManifestWriteNoWAL and kFlushNoWAL are misleading. Refactor is
327 // needed.
328 error_handler_.SetBGError(s,
329 BackgroundErrorReason::kManifestWriteNoWAL);
20effc67 330 } else {
1e59de90
TL
331 // If WAL sync is successful (either WAL size is 0 or there is no IO
332 // error), all the other SST file write errors will be set as
333 // kFlushNoWAL.
334 error_handler_.SetBGError(s, BackgroundErrorReason::kFlushNoWAL);
20effc67
TL
335 }
336 } else {
1e59de90 337 assert(s == log_io_s);
20effc67 338 Status new_bg_error = s;
1e59de90 339 error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
20effc67 340 }
7c673cae 341 }
1e59de90
TL
342 // If flush ran smoothly and no mempurge happened
343 // install new SST file path.
344 if (s.ok() && (!switched_to_mempurge)) {
7c673cae
FG
345#ifndef ROCKSDB_LITE
346 // may temporarily unlock and lock the mutex.
f67539c2
TL
347 NotifyOnFlushCompleted(cfd, mutable_cf_options,
348 flush_job.GetCommittedFlushJobsInfo());
7c673cae
FG
349 auto sfm = static_cast<SstFileManagerImpl*>(
350 immutable_db_options_.sst_file_manager.get());
351 if (sfm) {
352 // Notify sst_file_manager that a new file was added
353 std::string file_path = MakeTableFileName(
11fdf7f2 354 cfd->ioptions()->cf_paths[0].path, file_meta.fd.GetNumber());
1e59de90
TL
355 // TODO (PR7798). We should only add the file to the FileManager if it
356 // exists. Otherwise, some tests may fail. Ignore the error in the
357 // interim.
358 sfm->OnAddFile(file_path).PermitUncheckedError();
11fdf7f2 359 if (sfm->IsMaxAllowedSpaceReached()) {
494da23a
TL
360 Status new_bg_error =
361 Status::SpaceLimit("Max allowed space was reached");
7c673cae
FG
362 TEST_SYNC_POINT_CALLBACK(
363 "DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached",
11fdf7f2 364 &new_bg_error);
1e59de90 365 error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
7c673cae
FG
366 }
367 }
368#endif // ROCKSDB_LITE
369 }
f67539c2 370 TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:Finish");
7c673cae
FG
371 return s;
372}
373
11fdf7f2
TL
374Status DBImpl::FlushMemTablesToOutputFiles(
375 const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
494da23a
TL
376 JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) {
377 if (immutable_db_options_.atomic_flush) {
378 return AtomicFlushMemTablesToOutputFiles(
379 bg_flush_args, made_progress, job_context, log_buffer, thread_pri);
380 }
1e59de90 381 assert(bg_flush_args.size() == 1);
494da23a
TL
382 std::vector<SequenceNumber> snapshot_seqs;
383 SequenceNumber earliest_write_conflict_snapshot;
384 SnapshotChecker* snapshot_checker;
385 GetSnapshotContext(job_context, &snapshot_seqs,
386 &earliest_write_conflict_snapshot, &snapshot_checker);
1e59de90
TL
387 const auto& bg_flush_arg = bg_flush_args[0];
388 ColumnFamilyData* cfd = bg_flush_arg.cfd_;
389 // intentional infrequent copy for each flush
390 MutableCFOptions mutable_cf_options_copy = *cfd->GetLatestMutableCFOptions();
391 SuperVersionContext* superversion_context =
392 bg_flush_arg.superversion_context_;
393 Status s = FlushMemTableToOutputFile(
394 cfd, mutable_cf_options_copy, made_progress, job_context,
395 superversion_context, snapshot_seqs, earliest_write_conflict_snapshot,
396 snapshot_checker, log_buffer, thread_pri);
397 return s;
494da23a
TL
398}
399
400/*
401 * Atomically flushes multiple column families.
402 *
403 * For each column family, all memtables with ID smaller than or equal to the
404 * ID specified in bg_flush_args will be flushed. Only after all column
405 * families finish flush will this function commit to MANIFEST. If any of the
406 * column families are not flushed successfully, this function does not have
407 * any side-effect on the state of the database.
408 */
409Status DBImpl::AtomicFlushMemTablesToOutputFiles(
410 const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
411 JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) {
412 mutex_.AssertHeld();
413
414 autovector<ColumnFamilyData*> cfds;
415 for (const auto& arg : bg_flush_args) {
416 cfds.emplace_back(arg.cfd_);
417 }
418
419#ifndef NDEBUG
420 for (const auto cfd : cfds) {
421 assert(cfd->imm()->NumNotFlushed() != 0);
422 assert(cfd->imm()->IsFlushPending());
1e59de90 423 assert(cfd->GetFlushReason() == cfds[0]->GetFlushReason());
494da23a
TL
424 }
425#endif /* !NDEBUG */
426
427 std::vector<SequenceNumber> snapshot_seqs;
428 SequenceNumber earliest_write_conflict_snapshot;
429 SnapshotChecker* snapshot_checker;
430 GetSnapshotContext(job_context, &snapshot_seqs,
431 &earliest_write_conflict_snapshot, &snapshot_checker);
432
20effc67 433 autovector<FSDirectory*> distinct_output_dirs;
494da23a 434 autovector<std::string> distinct_output_dir_paths;
f67539c2 435 std::vector<std::unique_ptr<FlushJob>> jobs;
494da23a
TL
436 std::vector<MutableCFOptions> all_mutable_cf_options;
437 int num_cfs = static_cast<int>(cfds.size());
438 all_mutable_cf_options.reserve(num_cfs);
439 for (int i = 0; i < num_cfs; ++i) {
440 auto cfd = cfds[i];
20effc67 441 FSDirectory* data_dir = GetDataDir(cfd, 0U);
494da23a
TL
442 const std::string& curr_path = cfd->ioptions()->cf_paths[0].path;
443
444 // Add to distinct output directories if eligible. Use linear search. Since
445 // the number of elements in the vector is not large, performance should be
446 // tolerable.
447 bool found = false;
448 for (const auto& path : distinct_output_dir_paths) {
449 if (path == curr_path) {
450 found = true;
451 break;
452 }
453 }
454 if (!found) {
455 distinct_output_dir_paths.emplace_back(curr_path);
456 distinct_output_dirs.emplace_back(data_dir);
457 }
458
459 all_mutable_cf_options.emplace_back(*cfd->GetLatestMutableCFOptions());
460 const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.back();
1e59de90 461 uint64_t max_memtable_id = bg_flush_args[i].max_memtable_id_;
f67539c2 462 jobs.emplace_back(new FlushJob(
494da23a 463 dbname_, cfd, immutable_db_options_, mutable_cf_options,
f67539c2 464 max_memtable_id, file_options_for_compaction_, versions_.get(), &mutex_,
494da23a
TL
465 &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot,
466 snapshot_checker, job_context, log_buffer, directories_.GetDbDir(),
467 data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
468 stats_, &event_logger_, mutable_cf_options.report_bg_io_stats,
469 false /* sync_output_directory */, false /* write_manifest */,
1e59de90
TL
470 thread_pri, io_tracer_, seqno_time_mapping_, db_id_, db_session_id_,
471 cfd->GetFullHistoryTsLow(), &blob_callback_));
494da23a
TL
472 }
473
474 std::vector<FileMetaData> file_meta(num_cfs);
1e59de90
TL
475 // Use of deque<bool> because vector<bool>
476 // is specific and doesn't allow &v[i].
477 std::deque<bool> switched_to_mempurge(num_cfs, false);
494da23a 478 Status s;
1e59de90 479 IOStatus log_io_s = IOStatus::OK();
494da23a
TL
480 assert(num_cfs == static_cast<int>(jobs.size()));
481
482#ifndef ROCKSDB_LITE
483 for (int i = 0; i != num_cfs; ++i) {
484 const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.at(i);
485 // may temporarily unlock and lock the mutex.
486 NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options,
f67539c2 487 job_context->job_id);
494da23a
TL
488 }
489#endif /* !ROCKSDB_LITE */
490
491 if (logfile_number_ > 0) {
492 // TODO (yanqin) investigate whether we should sync the closed logs for
493 // single column family case.
1e59de90
TL
494 VersionEdit synced_wals;
495 mutex_.Unlock();
496 log_io_s = SyncClosedLogs(job_context, &synced_wals);
497 mutex_.Lock();
498 if (log_io_s.ok() && synced_wals.IsWalAddition()) {
499 log_io_s = status_to_io_status(ApplyWALToManifest(&synced_wals));
500 }
501
502 if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() &&
503 !log_io_s.IsColumnFamilyDropped()) {
504 if (total_log_size_ > 0) {
505 error_handler_.SetBGError(log_io_s, BackgroundErrorReason::kFlush);
506 } else {
507 // If the WAL is empty, we use different error reason
508 error_handler_.SetBGError(log_io_s, BackgroundErrorReason::kFlushNoWAL);
509 }
510 }
494da23a 511 }
1e59de90 512 s = log_io_s;
494da23a
TL
513
514 // exec_status stores the execution status of flush_jobs as
515 // <bool /* executed */, Status /* status code */>
516 autovector<std::pair<bool, Status>> exec_status;
1e59de90 517 std::vector<bool> pick_status;
494da23a
TL
518 for (int i = 0; i != num_cfs; ++i) {
519 // Initially all jobs are not executed, with status OK.
520 exec_status.emplace_back(false, Status::OK());
1e59de90 521 pick_status.push_back(false);
494da23a
TL
522 }
523
524 if (s.ok()) {
1e59de90
TL
525 for (int i = 0; i != num_cfs; ++i) {
526 jobs[i]->PickMemTable();
527 pick_status[i] = true;
528 }
529 }
530
531 if (s.ok()) {
532 assert(switched_to_mempurge.size() ==
533 static_cast<long unsigned int>(num_cfs));
494da23a
TL
534 // TODO (yanqin): parallelize jobs with threads.
535 for (int i = 1; i != num_cfs; ++i) {
536 exec_status[i].second =
1e59de90
TL
537 jobs[i]->Run(&logs_with_prep_tracker_, &file_meta[i],
538 &(switched_to_mempurge.at(i)));
494da23a
TL
539 exec_status[i].first = true;
540 }
541 if (num_cfs > 1) {
542 TEST_SYNC_POINT(
543 "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:1");
544 TEST_SYNC_POINT(
545 "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2");
546 }
f67539c2
TL
547 assert(exec_status.size() > 0);
548 assert(!file_meta.empty());
1e59de90
TL
549 exec_status[0].second = jobs[0]->Run(
550 &logs_with_prep_tracker_, file_meta.data() /* &file_meta[0] */,
551 switched_to_mempurge.empty() ? nullptr : &(switched_to_mempurge.at(0)));
494da23a
TL
552 exec_status[0].first = true;
553
554 Status error_status;
555 for (const auto& e : exec_status) {
556 if (!e.second.ok()) {
557 s = e.second;
f67539c2
TL
558 if (!e.second.IsShutdownInProgress() &&
559 !e.second.IsColumnFamilyDropped()) {
494da23a
TL
560 // If a flush job did not return OK, and the CF is not dropped, and
561 // the DB is not shutting down, then we have to return this result to
562 // caller later.
563 error_status = e.second;
564 }
565 }
566 }
567
568 s = error_status.ok() ? s : error_status;
569 }
570
f67539c2
TL
571 if (s.IsColumnFamilyDropped()) {
572 s = Status::OK();
573 }
574
494da23a
TL
575 if (s.ok() || s.IsShutdownInProgress()) {
576 // Sync on all distinct output directories.
577 for (auto dir : distinct_output_dirs) {
578 if (dir != nullptr) {
1e59de90
TL
579 Status error_status = dir->FsyncWithDirOptions(
580 IOOptions(), nullptr,
581 DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
f67539c2
TL
582 if (!error_status.ok()) {
583 s = error_status;
494da23a
TL
584 break;
585 }
586 }
587 }
f67539c2
TL
588 } else {
589 // Need to undo atomic flush if something went wrong, i.e. s is not OK and
590 // it is not because of CF drop.
591 // Have to cancel the flush jobs that have NOT executed because we need to
592 // unref the versions.
593 for (int i = 0; i != num_cfs; ++i) {
1e59de90 594 if (pick_status[i] && !exec_status[i].first) {
f67539c2
TL
595 jobs[i]->Cancel();
596 }
597 }
598 for (int i = 0; i != num_cfs; ++i) {
1e59de90 599 if (exec_status[i].second.ok() && exec_status[i].first) {
f67539c2
TL
600 auto& mems = jobs[i]->GetMemTables();
601 cfds[i]->imm()->RollbackMemtableFlush(mems,
602 file_meta[i].fd.GetNumber());
603 }
604 }
494da23a
TL
605 }
606
607 if (s.ok()) {
1e59de90
TL
608 const auto wait_to_install_func =
609 [&]() -> std::pair<Status, bool /*continue to wait*/> {
610 if (!versions_->io_status().ok()) {
611 // Something went wrong elsewhere, we cannot count on waiting for our
612 // turn to write/sync to MANIFEST or CURRENT. Just return.
613 return std::make_pair(versions_->io_status(), false);
614 } else if (shutting_down_.load(std::memory_order_acquire)) {
615 return std::make_pair(Status::ShutdownInProgress(), false);
616 }
494da23a
TL
617 bool ready = true;
618 for (size_t i = 0; i != cfds.size(); ++i) {
f67539c2 619 const auto& mems = jobs[i]->GetMemTables();
494da23a
TL
620 if (cfds[i]->IsDropped()) {
621 // If the column family is dropped, then do not wait.
622 continue;
623 } else if (!mems.empty() &&
624 cfds[i]->imm()->GetEarliestMemTableID() < mems[0]->GetID()) {
625 // If a flush job needs to install the flush result for mems and
626 // mems[0] is not the earliest memtable, it means another thread must
627 // be installing flush results for the same column family, then the
628 // current thread needs to wait.
629 ready = false;
630 break;
631 } else if (mems.empty() && cfds[i]->imm()->GetEarliestMemTableID() <=
632 bg_flush_args[i].max_memtable_id_) {
633 // If a flush job does not need to install flush results, then it has
634 // to wait until all memtables up to max_memtable_id_ (inclusive) are
635 // installed.
636 ready = false;
637 break;
638 }
639 }
1e59de90 640 return std::make_pair(Status::OK(), !ready);
494da23a
TL
641 };
642
1e59de90
TL
643 bool resuming_from_bg_err =
644 error_handler_.IsDBStopped() ||
645 (cfds[0]->GetFlushReason() == FlushReason::kErrorRecovery ||
646 cfds[0]->GetFlushReason() == FlushReason::kErrorRecoveryRetryFlush);
647 while ((!resuming_from_bg_err || error_handler_.GetRecoveryError().ok())) {
648 std::pair<Status, bool> res = wait_to_install_func();
649
650 TEST_SYNC_POINT_CALLBACK(
651 "DBImpl::AtomicFlushMemTablesToOutputFiles:WaitToCommit", &res);
652
653 if (!res.first.ok()) {
654 s = res.first;
655 break;
656 } else if (!res.second) {
657 break;
658 }
494da23a 659 atomic_flush_install_cv_.Wait();
1e59de90
TL
660
661 resuming_from_bg_err =
662 error_handler_.IsDBStopped() ||
663 (cfds[0]->GetFlushReason() == FlushReason::kErrorRecovery ||
664 cfds[0]->GetFlushReason() == FlushReason::kErrorRecoveryRetryFlush);
494da23a
TL
665 }
666
1e59de90
TL
667 if (!resuming_from_bg_err) {
668 // If not resuming from bg err, then we determine future action based on
669 // whether we hit background error.
670 if (s.ok()) {
671 s = error_handler_.GetBGError();
672 }
673 } else if (s.ok()) {
674 // If resuming from bg err, we still rely on wait_to_install_func()'s
675 // result to determine future action. If wait_to_install_func() returns
676 // non-ok already, then we should not proceed to flush result
677 // installation.
678 s = error_handler_.GetRecoveryError();
679 }
494da23a
TL
680 }
681
682 if (s.ok()) {
683 autovector<ColumnFamilyData*> tmp_cfds;
684 autovector<const autovector<MemTable*>*> mems_list;
685 autovector<const MutableCFOptions*> mutable_cf_options_list;
686 autovector<FileMetaData*> tmp_file_meta;
1e59de90
TL
687 autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
688 committed_flush_jobs_info;
494da23a 689 for (int i = 0; i != num_cfs; ++i) {
f67539c2 690 const auto& mems = jobs[i]->GetMemTables();
494da23a
TL
691 if (!cfds[i]->IsDropped() && !mems.empty()) {
692 tmp_cfds.emplace_back(cfds[i]);
693 mems_list.emplace_back(&mems);
694 mutable_cf_options_list.emplace_back(&all_mutable_cf_options[i]);
695 tmp_file_meta.emplace_back(&file_meta[i]);
1e59de90
TL
696#ifndef ROCKSDB_LITE
697 committed_flush_jobs_info.emplace_back(
698 jobs[i]->GetCommittedFlushJobsInfo());
699#endif //! ROCKSDB_LITE
494da23a
TL
700 }
701 }
702
703 s = InstallMemtableAtomicFlushResults(
704 nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list,
1e59de90
TL
705 versions_.get(), &logs_with_prep_tracker_, &mutex_, tmp_file_meta,
706 committed_flush_jobs_info, &job_context->memtables_to_free,
707 directories_.GetDbDir(), log_buffer);
494da23a
TL
708 }
709
f67539c2 710 if (s.ok()) {
494da23a
TL
711 assert(num_cfs ==
712 static_cast<int>(job_context->superversion_contexts.size()));
713 for (int i = 0; i != num_cfs; ++i) {
20effc67
TL
714 assert(cfds[i]);
715
494da23a
TL
716 if (cfds[i]->IsDropped()) {
717 continue;
718 }
719 InstallSuperVersionAndScheduleWork(cfds[i],
720 &job_context->superversion_contexts[i],
721 all_mutable_cf_options[i]);
20effc67
TL
722
723 const std::string& column_family_name = cfds[i]->GetName();
724
725 Version* const current = cfds[i]->current();
726 assert(current);
727
728 const VersionStorageInfo* const storage_info = current->storage_info();
729 assert(storage_info);
730
494da23a
TL
731 VersionStorageInfo::LevelSummaryStorage tmp;
732 ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
20effc67
TL
733 column_family_name.c_str(),
734 storage_info->LevelSummary(&tmp));
735
736 const auto& blob_files = storage_info->GetBlobFiles();
737 if (!blob_files.empty()) {
1e59de90
TL
738 assert(blob_files.front());
739 assert(blob_files.back());
740
741 ROCKS_LOG_BUFFER(
742 log_buffer,
743 "[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64 "\n",
744 column_family_name.c_str(), blob_files.front()->GetBlobFileNumber(),
745 blob_files.back()->GetBlobFileNumber());
20effc67 746 }
494da23a
TL
747 }
748 if (made_progress) {
749 *made_progress = true;
750 }
751#ifndef ROCKSDB_LITE
752 auto sfm = static_cast<SstFileManagerImpl*>(
753 immutable_db_options_.sst_file_manager.get());
f67539c2 754 assert(all_mutable_cf_options.size() == static_cast<size_t>(num_cfs));
1e59de90
TL
755 for (int i = 0; s.ok() && i != num_cfs; ++i) {
756 // If mempurge happened instead of Flush,
757 // no NotifyOnFlushCompleted call (no SST file created).
758 if (switched_to_mempurge[i]) {
759 continue;
760 }
494da23a
TL
761 if (cfds[i]->IsDropped()) {
762 continue;
763 }
f67539c2
TL
764 NotifyOnFlushCompleted(cfds[i], all_mutable_cf_options[i],
765 jobs[i]->GetCommittedFlushJobsInfo());
494da23a
TL
766 if (sfm) {
767 std::string file_path = MakeTableFileName(
768 cfds[i]->ioptions()->cf_paths[0].path, file_meta[i].fd.GetNumber());
1e59de90
TL
769 // TODO (PR7798). We should only add the file to the FileManager if it
770 // exists. Otherwise, some tests may fail. Ignore the error in the
771 // interim.
772 sfm->OnAddFile(file_path).PermitUncheckedError();
494da23a
TL
773 if (sfm->IsMaxAllowedSpaceReached() &&
774 error_handler_.GetBGError().ok()) {
775 Status new_bg_error =
776 Status::SpaceLimit("Max allowed space was reached");
1e59de90
TL
777 error_handler_.SetBGError(new_bg_error,
778 BackgroundErrorReason::kFlush);
494da23a
TL
779 }
780 }
11fdf7f2 781 }
494da23a
TL
782#endif // ROCKSDB_LITE
783 }
784
20effc67
TL
785 // Need to undo atomic flush if something went wrong, i.e. s is not OK and
786 // it is not because of CF drop.
787 if (!s.ok() && !s.IsColumnFamilyDropped()) {
1e59de90 788 if (log_io_s.ok()) {
20effc67
TL
789 // Error while writing to MANIFEST.
790 // In fact, versions_->io_status() can also be the result of renaming
791 // CURRENT file. With current code, it's just difficult to tell. So just
792 // be pessimistic and try write to a new MANIFEST.
793 // TODO: distinguish between MANIFEST write and CURRENT renaming
794 if (!versions_->io_status().ok()) {
1e59de90
TL
795 // If WAL sync is successful (either WAL size is 0 or there is no IO
796 // error), all the Manifest write will be map to soft error.
797 // TODO: kManifestWriteNoWAL and kFlushNoWAL are misleading. Refactor
798 // is needed.
799 error_handler_.SetBGError(s,
800 BackgroundErrorReason::kManifestWriteNoWAL);
20effc67 801 } else {
1e59de90
TL
802 // If WAL sync is successful (either WAL size is 0 or there is no IO
803 // error), all the other SST file write errors will be set as
804 // kFlushNoWAL.
805 error_handler_.SetBGError(s, BackgroundErrorReason::kFlushNoWAL);
20effc67
TL
806 }
807 } else {
1e59de90 808 assert(s == log_io_s);
20effc67 809 Status new_bg_error = s;
1e59de90 810 error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
20effc67 811 }
11fdf7f2 812 }
494da23a 813
11fdf7f2
TL
814 return s;
815}
816
7c673cae
FG
817void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
818 const MutableCFOptions& mutable_cf_options,
f67539c2 819 int job_id) {
7c673cae
FG
820#ifndef ROCKSDB_LITE
821 if (immutable_db_options_.listeners.size() == 0U) {
822 return;
823 }
824 mutex_.AssertHeld();
825 if (shutting_down_.load(std::memory_order_acquire)) {
826 return;
827 }
828 bool triggered_writes_slowdown =
829 (cfd->current()->storage_info()->NumLevelFiles(0) >=
830 mutable_cf_options.level0_slowdown_writes_trigger);
831 bool triggered_writes_stop =
832 (cfd->current()->storage_info()->NumLevelFiles(0) >=
833 mutable_cf_options.level0_stop_writes_trigger);
834 // release lock while notifying events
835 mutex_.Unlock();
836 {
f67539c2 837 FlushJobInfo info{};
494da23a 838 info.cf_id = cfd->GetID();
7c673cae
FG
839 info.cf_name = cfd->GetName();
840 // TODO(yhchiang): make db_paths dynamic in case flush does not
841 // go to L0 in the future.
f67539c2
TL
842 const uint64_t file_number = file_meta->fd.GetNumber();
843 info.file_path =
844 MakeTableFileName(cfd->ioptions()->cf_paths[0].path, file_number);
845 info.file_number = file_number;
7c673cae
FG
846 info.thread_id = env_->GetThreadID();
847 info.job_id = job_id;
848 info.triggered_writes_slowdown = triggered_writes_slowdown;
849 info.triggered_writes_stop = triggered_writes_stop;
11fdf7f2
TL
850 info.smallest_seqno = file_meta->fd.smallest_seqno;
851 info.largest_seqno = file_meta->fd.largest_seqno;
11fdf7f2 852 info.flush_reason = cfd->GetFlushReason();
7c673cae
FG
853 for (auto listener : immutable_db_options_.listeners) {
854 listener->OnFlushBegin(this, info);
855 }
856 }
857 mutex_.Lock();
858// no need to signal bg_cv_ as it will be signaled at the end of the
859// flush process.
11fdf7f2
TL
860#else
861 (void)cfd;
862 (void)file_meta;
863 (void)mutable_cf_options;
864 (void)job_id;
7c673cae
FG
865#endif // ROCKSDB_LITE
866}
867
f67539c2
TL
868void DBImpl::NotifyOnFlushCompleted(
869 ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
870 std::list<std::unique_ptr<FlushJobInfo>>* flush_jobs_info) {
7c673cae 871#ifndef ROCKSDB_LITE
f67539c2 872 assert(flush_jobs_info != nullptr);
7c673cae
FG
873 if (immutable_db_options_.listeners.size() == 0U) {
874 return;
875 }
876 mutex_.AssertHeld();
877 if (shutting_down_.load(std::memory_order_acquire)) {
878 return;
879 }
880 bool triggered_writes_slowdown =
881 (cfd->current()->storage_info()->NumLevelFiles(0) >=
882 mutable_cf_options.level0_slowdown_writes_trigger);
883 bool triggered_writes_stop =
884 (cfd->current()->storage_info()->NumLevelFiles(0) >=
885 mutable_cf_options.level0_stop_writes_trigger);
886 // release lock while notifying events
887 mutex_.Unlock();
888 {
f67539c2
TL
889 for (auto& info : *flush_jobs_info) {
890 info->triggered_writes_slowdown = triggered_writes_slowdown;
891 info->triggered_writes_stop = triggered_writes_stop;
892 for (auto listener : immutable_db_options_.listeners) {
893 listener->OnFlushCompleted(this, *info);
894 }
1e59de90
TL
895 TEST_SYNC_POINT(
896 "DBImpl::NotifyOnFlushCompleted::PostAllOnFlushCompleted");
7c673cae 897 }
f67539c2 898 flush_jobs_info->clear();
7c673cae
FG
899 }
900 mutex_.Lock();
901 // no need to signal bg_cv_ as it will be signaled at the end of the
902 // flush process.
11fdf7f2
TL
903#else
904 (void)cfd;
11fdf7f2 905 (void)mutable_cf_options;
f67539c2 906 (void)flush_jobs_info;
7c673cae
FG
907#endif // ROCKSDB_LITE
908}
909
910Status DBImpl::CompactRange(const CompactRangeOptions& options,
911 ColumnFamilyHandle* column_family,
1e59de90
TL
912 const Slice* begin_without_ts,
913 const Slice* end_without_ts) {
914 if (manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
915 return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
916 }
917
918 if (options.canceled && options.canceled->load(std::memory_order_acquire)) {
919 return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
920 }
921
922 const Comparator* const ucmp = column_family->GetComparator();
923 assert(ucmp);
924 size_t ts_sz = ucmp->timestamp_size();
925 if (ts_sz == 0) {
926 return CompactRangeInternal(options, column_family, begin_without_ts,
927 end_without_ts, "" /*trim_ts*/);
928 }
929
930 std::string begin_str;
931 std::string end_str;
932
933 // CompactRange compact all keys: [begin, end] inclusively. Add maximum
934 // timestamp to include all `begin` keys, and add minimal timestamp to include
935 // all `end` keys.
936 if (begin_without_ts != nullptr) {
937 AppendKeyWithMaxTimestamp(&begin_str, *begin_without_ts, ts_sz);
938 }
939 if (end_without_ts != nullptr) {
940 AppendKeyWithMinTimestamp(&end_str, *end_without_ts, ts_sz);
941 }
942 Slice begin(begin_str);
943 Slice end(end_str);
944
945 Slice* begin_with_ts = begin_without_ts ? &begin : nullptr;
946 Slice* end_with_ts = end_without_ts ? &end : nullptr;
947
948 return CompactRangeInternal(options, column_family, begin_with_ts,
949 end_with_ts, "" /*trim_ts*/);
950}
951
952Status DBImpl::IncreaseFullHistoryTsLow(ColumnFamilyHandle* column_family,
953 std::string ts_low) {
954 ColumnFamilyData* cfd = nullptr;
955 if (column_family == nullptr) {
956 cfd = default_cf_handle_->cfd();
957 } else {
958 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
959 assert(cfh != nullptr);
960 cfd = cfh->cfd();
961 }
962 assert(cfd != nullptr && cfd->user_comparator() != nullptr);
963 if (cfd->user_comparator()->timestamp_size() == 0) {
964 return Status::InvalidArgument(
965 "Timestamp is not enabled in this column family");
966 }
967 if (cfd->user_comparator()->timestamp_size() != ts_low.size()) {
968 return Status::InvalidArgument("ts_low size mismatch");
969 }
970 return IncreaseFullHistoryTsLowImpl(cfd, ts_low);
971}
972
973Status DBImpl::IncreaseFullHistoryTsLowImpl(ColumnFamilyData* cfd,
974 std::string ts_low) {
975 VersionEdit edit;
976 edit.SetColumnFamily(cfd->GetID());
977 edit.SetFullHistoryTsLow(ts_low);
978 TEST_SYNC_POINT_CALLBACK("DBImpl::IncreaseFullHistoryTsLowImpl:BeforeEdit",
979 &edit);
980
981 InstrumentedMutexLock l(&mutex_);
982 std::string current_ts_low = cfd->GetFullHistoryTsLow();
983 const Comparator* ucmp = cfd->user_comparator();
984 assert(ucmp->timestamp_size() == ts_low.size() && !ts_low.empty());
985 if (!current_ts_low.empty() &&
986 ucmp->CompareTimestamp(ts_low, current_ts_low) < 0) {
987 return Status::InvalidArgument("Cannot decrease full_history_ts_low");
988 }
989
990 Status s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
991 &edit, &mutex_, directories_.GetDbDir());
992 if (!s.ok()) {
993 return s;
994 }
995 current_ts_low = cfd->GetFullHistoryTsLow();
996 if (!current_ts_low.empty() &&
997 ucmp->CompareTimestamp(current_ts_low, ts_low) > 0) {
998 std::stringstream oss;
999 oss << "full_history_ts_low: " << Slice(current_ts_low).ToString(true)
1000 << " is set to be higher than the requested "
1001 "timestamp: "
1002 << Slice(ts_low).ToString(true) << std::endl;
1003 return Status::TryAgain(oss.str());
1004 }
1005 return Status::OK();
1006}
1007
1008Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options,
1009 ColumnFamilyHandle* column_family,
1010 const Slice* begin, const Slice* end,
1011 const std::string& trim_ts) {
20effc67 1012 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
11fdf7f2
TL
1013 auto cfd = cfh->cfd();
1014
1015 if (options.target_path_id >= cfd->ioptions()->cf_paths.size()) {
7c673cae
FG
1016 return Status::InvalidArgument("Invalid target path ID");
1017 }
1018
11fdf7f2 1019 bool flush_needed = true;
1e59de90
TL
1020
1021 // Update full_history_ts_low if it's set
1022 if (options.full_history_ts_low != nullptr &&
1023 !options.full_history_ts_low->empty()) {
1024 std::string ts_low = options.full_history_ts_low->ToString();
1025 if (begin != nullptr || end != nullptr) {
1026 return Status::InvalidArgument(
1027 "Cannot specify compaction range with full_history_ts_low");
1028 }
1029 Status s = IncreaseFullHistoryTsLowImpl(cfd, ts_low);
1030 if (!s.ok()) {
1031 LogFlush(immutable_db_options_.info_log);
1032 return s;
1033 }
1034 }
1035
1036 Status s;
11fdf7f2
TL
1037 if (begin != nullptr && end != nullptr) {
1038 // TODO(ajkr): We could also optimize away the flush in certain cases where
1039 // one/both sides of the interval are unbounded. But it requires more
1040 // changes to RangesOverlapWithMemtables.
1041 Range range(*begin, *end);
f67539c2 1042 SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
1e59de90
TL
1043 s = cfd->RangesOverlapWithMemtables(
1044 {range}, super_version, immutable_db_options_.allow_data_in_errors,
1045 &flush_needed);
11fdf7f2
TL
1046 CleanupSuperVersion(super_version);
1047 }
1048
1e59de90 1049 if (s.ok() && flush_needed) {
11fdf7f2
TL
1050 FlushOptions fo;
1051 fo.allow_write_stall = options.allow_write_stall;
494da23a
TL
1052 if (immutable_db_options_.atomic_flush) {
1053 autovector<ColumnFamilyData*> cfds;
1054 mutex_.Lock();
1055 SelectColumnFamiliesForAtomicFlush(&cfds);
1056 mutex_.Unlock();
1057 s = AtomicFlushMemTables(cfds, fo, FlushReason::kManualCompaction,
1e59de90 1058 false /* entered_write_thread */);
494da23a
TL
1059 } else {
1060 s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction,
1e59de90 1061 false /* entered_write_thread */);
494da23a 1062 }
11fdf7f2
TL
1063 if (!s.ok()) {
1064 LogFlush(immutable_db_options_.info_log);
1065 return s;
1066 }
7c673cae
FG
1067 }
1068
20effc67
TL
1069 constexpr int kInvalidLevel = -1;
1070 int final_output_level = kInvalidLevel;
1071 bool exclusive = options.exclusive_manual_compaction;
7c673cae
FG
1072 if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal &&
1073 cfd->NumberLevels() > 1) {
1074 // Always compact all files together.
7c673cae 1075 final_output_level = cfd->NumberLevels() - 1;
11fdf7f2
TL
1076 // if bottom most level is reserved
1077 if (immutable_db_options_.allow_ingest_behind) {
1078 final_output_level--;
1079 }
1080 s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
f67539c2 1081 final_output_level, options, begin, end, exclusive,
1e59de90
TL
1082 false, std::numeric_limits<uint64_t>::max(),
1083 trim_ts);
7c673cae 1084 } else {
20effc67
TL
1085 int first_overlapped_level = kInvalidLevel;
1086 int max_overlapped_level = kInvalidLevel;
1087 {
1088 SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
1089 Version* current_version = super_version->current;
1090 ReadOptions ro;
1091 ro.total_order_seek = true;
1092 bool overlap;
1093 for (int level = 0;
1094 level < current_version->storage_info()->num_non_empty_levels();
1095 level++) {
1096 overlap = true;
1097 if (begin != nullptr && end != nullptr) {
1098 Status status = current_version->OverlapWithLevelIterator(
1099 ro, file_options_, *begin, *end, level, &overlap);
1100 if (!status.ok()) {
1101 overlap = current_version->storage_info()->OverlapInLevel(
1102 level, begin, end);
1103 }
1104 } else {
1105 overlap = current_version->storage_info()->OverlapInLevel(level,
1106 begin, end);
7c673cae 1107 }
20effc67
TL
1108 if (overlap) {
1109 if (first_overlapped_level == kInvalidLevel) {
1110 first_overlapped_level = level;
1111 }
1112 max_overlapped_level = level;
7c673cae
FG
1113 }
1114 }
20effc67
TL
1115 CleanupSuperVersion(super_version);
1116 }
1117 if (s.ok() && first_overlapped_level != kInvalidLevel) {
1118 // max_file_num_to_ignore can be used to filter out newly created SST
1119 // files, useful for bottom level compaction in a manual compaction
1e59de90 1120 uint64_t max_file_num_to_ignore = std::numeric_limits<uint64_t>::max();
20effc67
TL
1121 uint64_t next_file_number = versions_->current_next_file_number();
1122 final_output_level = max_overlapped_level;
1123 int output_level;
1124 for (int level = first_overlapped_level; level <= max_overlapped_level;
1125 level++) {
1126 bool disallow_trivial_move = false;
1127 // in case the compaction is universal or if we're compacting the
1128 // bottom-most level, the output level will be the same as input one.
1129 // level 0 can never be the bottommost level (i.e. if all files are in
1130 // level 0, we will compact to level 1)
1131 if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
1132 cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1133 output_level = level;
1134 } else if (level == max_overlapped_level && level > 0) {
1135 if (options.bottommost_level_compaction ==
1136 BottommostLevelCompaction::kSkip) {
1137 // Skip bottommost level compaction
1138 continue;
1139 } else if (options.bottommost_level_compaction ==
1140 BottommostLevelCompaction::kIfHaveCompactionFilter &&
1141 cfd->ioptions()->compaction_filter == nullptr &&
1142 cfd->ioptions()->compaction_filter_factory == nullptr) {
1143 // Skip bottommost level compaction since we don't have a compaction
1144 // filter
1145 continue;
1146 }
1147 output_level = level;
1148 // update max_file_num_to_ignore only for bottom level compaction
1149 // because data in newly compacted files in middle levels may still
1150 // need to be pushed down
1151 max_file_num_to_ignore = next_file_number;
1152 } else {
1153 output_level = level + 1;
1154 if (cfd->ioptions()->compaction_style == kCompactionStyleLevel &&
1155 cfd->ioptions()->level_compaction_dynamic_level_bytes &&
1156 level == 0) {
1157 output_level = ColumnFamilyData::kCompactToBaseLevel;
1158 }
1159 // if it's a BottommostLevel compaction and `kForce*` compaction is
1160 // set, disallow trivial move
1161 if (level == max_overlapped_level &&
1162 (options.bottommost_level_compaction ==
1163 BottommostLevelCompaction::kForce ||
1164 options.bottommost_level_compaction ==
1165 BottommostLevelCompaction::kForceOptimized)) {
1166 disallow_trivial_move = true;
1167 }
1168 }
1e59de90
TL
1169 // trim_ts need real compaction to remove latest record
1170 if (!trim_ts.empty()) {
1171 disallow_trivial_move = true;
1172 }
20effc67
TL
1173 s = RunManualCompaction(cfd, level, output_level, options, begin, end,
1174 exclusive, disallow_trivial_move,
1e59de90 1175 max_file_num_to_ignore, trim_ts);
20effc67
TL
1176 if (!s.ok()) {
1177 break;
1178 }
1179 if (output_level == ColumnFamilyData::kCompactToBaseLevel) {
1180 final_output_level = cfd->NumberLevels() - 1;
1181 } else if (output_level > final_output_level) {
1182 final_output_level = output_level;
1183 }
1184 TEST_SYNC_POINT("DBImpl::RunManualCompaction()::1");
1185 TEST_SYNC_POINT("DBImpl::RunManualCompaction()::2");
7c673cae 1186 }
7c673cae
FG
1187 }
1188 }
20effc67 1189 if (!s.ok() || final_output_level == kInvalidLevel) {
7c673cae
FG
1190 LogFlush(immutable_db_options_.info_log);
1191 return s;
1192 }
1193
1194 if (options.change_level) {
20effc67
TL
1195 TEST_SYNC_POINT("DBImpl::CompactRange:BeforeRefit:1");
1196 TEST_SYNC_POINT("DBImpl::CompactRange:BeforeRefit:2");
1197
7c673cae
FG
1198 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1199 "[RefitLevel] waiting for background threads to stop");
20effc67 1200 DisableManualCompaction();
7c673cae
FG
1201 s = PauseBackgroundWork();
1202 if (s.ok()) {
20effc67 1203 TEST_SYNC_POINT("DBImpl::CompactRange:PreRefitLevel");
7c673cae 1204 s = ReFitLevel(cfd, final_output_level, options.target_level);
20effc67
TL
1205 TEST_SYNC_POINT("DBImpl::CompactRange:PostRefitLevel");
1206 // ContinueBackgroundWork always return Status::OK().
1207 Status temp_s = ContinueBackgroundWork();
1208 assert(temp_s.ok());
7c673cae 1209 }
20effc67 1210 EnableManualCompaction();
1e59de90
TL
1211 TEST_SYNC_POINT(
1212 "DBImpl::CompactRange:PostRefitLevel:ManualCompactionEnabled");
7c673cae
FG
1213 }
1214 LogFlush(immutable_db_options_.info_log);
1215
1216 {
1217 InstrumentedMutexLock l(&mutex_);
1218 // an automatic compaction that has been scheduled might have been
1219 // preempted by the manual compactions. Need to schedule it back.
1220 MaybeScheduleFlushOrCompaction();
1221 }
1222
1223 return s;
1224}
1225
11fdf7f2
TL
1226Status DBImpl::CompactFiles(const CompactionOptions& compact_options,
1227 ColumnFamilyHandle* column_family,
1228 const std::vector<std::string>& input_file_names,
1229 const int output_level, const int output_path_id,
494da23a
TL
1230 std::vector<std::string>* const output_file_names,
1231 CompactionJobInfo* compaction_job_info) {
7c673cae 1232#ifdef ROCKSDB_LITE
11fdf7f2
TL
1233 (void)compact_options;
1234 (void)column_family;
1235 (void)input_file_names;
1236 (void)output_level;
1237 (void)output_path_id;
1238 (void)output_file_names;
494da23a 1239 (void)compaction_job_info;
11fdf7f2 1240 // not supported in lite version
7c673cae
FG
1241 return Status::NotSupported("Not supported in ROCKSDB LITE");
1242#else
1243 if (column_family == nullptr) {
1244 return Status::InvalidArgument("ColumnFamilyHandle must be non-null.");
1245 }
1246
20effc67
TL
1247 auto cfd =
1248 static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
7c673cae
FG
1249 assert(cfd);
1250
1251 Status s;
1e59de90 1252 JobContext job_context(next_job_id_.fetch_add(1), true);
7c673cae
FG
1253 LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
1254 immutable_db_options_.info_log.get());
1255
1256 // Perform CompactFiles
494da23a 1257 TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile2");
1e59de90
TL
1258 TEST_SYNC_POINT_CALLBACK(
1259 "TestCompactFiles:PausingManualCompaction:3",
1260 reinterpret_cast<void*>(
1261 const_cast<std::atomic<int>*>(&manual_compaction_paused_)));
7c673cae
FG
1262 {
1263 InstrumentedMutexLock l(&mutex_);
1264
1265 // This call will unlock/lock the mutex to wait for current running
1266 // IngestExternalFile() calls to finish.
1267 WaitForIngestFile();
1268
494da23a
TL
1269 // We need to get current after `WaitForIngestFile`, because
1270 // `IngestExternalFile` may add files that overlap with `input_file_names`
1271 auto* current = cfd->current();
1272 current->Ref();
1273
1274 s = CompactFilesImpl(compact_options, cfd, current, input_file_names,
11fdf7f2 1275 output_file_names, output_level, output_path_id,
494da23a
TL
1276 &job_context, &log_buffer, compaction_job_info);
1277
1278 current->Unref();
7c673cae
FG
1279 }
1280
1281 // Find and delete obsolete files
1282 {
1283 InstrumentedMutexLock l(&mutex_);
1284 // If !s.ok(), this means that Compaction failed. In that case, we want
1285 // to delete all obsolete files we might have created and we force
1286 // FindObsoleteFiles(). This is because job_context does not
1287 // catch all created files if compaction failed.
1288 FindObsoleteFiles(&job_context, !s.ok());
1289 } // release the mutex
1290
1291 // delete unnecessary files if any, this is done outside the mutex
11fdf7f2
TL
1292 if (job_context.HaveSomethingToClean() ||
1293 job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
7c673cae
FG
1294 // Have to flush the info logs before bg_compaction_scheduled_--
1295 // because if bg_flush_scheduled_ becomes 0 and the lock is
1296 // released, the deconstructor of DB can kick in and destroy all the
1297 // states of DB so info_log might not be available after that point.
1298 // It also applies to access other states that DB owns.
1299 log_buffer.FlushBufferToLog();
1300 if (job_context.HaveSomethingToDelete()) {
1301 // no mutex is locked here. No need to Unlock() and Lock() here.
1302 PurgeObsoleteFiles(job_context);
1303 }
1304 job_context.Clean();
1305 }
1306
1307 return s;
1308#endif // ROCKSDB_LITE
1309}
1310
1311#ifndef ROCKSDB_LITE
1312Status DBImpl::CompactFilesImpl(
1313 const CompactionOptions& compact_options, ColumnFamilyData* cfd,
1314 Version* version, const std::vector<std::string>& input_file_names,
11fdf7f2 1315 std::vector<std::string>* const output_file_names, const int output_level,
494da23a
TL
1316 int output_path_id, JobContext* job_context, LogBuffer* log_buffer,
1317 CompactionJobInfo* compaction_job_info) {
7c673cae
FG
1318 mutex_.AssertHeld();
1319
1320 if (shutting_down_.load(std::memory_order_acquire)) {
1321 return Status::ShutdownInProgress();
1322 }
20effc67 1323 if (manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
f67539c2
TL
1324 return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
1325 }
7c673cae
FG
1326
1327 std::unordered_set<uint64_t> input_set;
494da23a 1328 for (const auto& file_name : input_file_names) {
7c673cae
FG
1329 input_set.insert(TableFileNameToNumber(file_name));
1330 }
1331
1332 ColumnFamilyMetaData cf_meta;
1333 // TODO(yhchiang): can directly use version here if none of the
1334 // following functions call is pluggable to external developers.
1335 version->GetColumnFamilyMetaData(&cf_meta);
1336
1337 if (output_path_id < 0) {
11fdf7f2 1338 if (cfd->ioptions()->cf_paths.size() == 1U) {
7c673cae
FG
1339 output_path_id = 0;
1340 } else {
1341 return Status::NotSupported(
1342 "Automatic output path selection is not "
1343 "yet supported in CompactFiles()");
1344 }
1345 }
1346
1347 Status s = cfd->compaction_picker()->SanitizeCompactionInputFiles(
1348 &input_set, cf_meta, output_level);
1349 if (!s.ok()) {
1350 return s;
1351 }
1352
1353 std::vector<CompactionInputFiles> input_files;
1354 s = cfd->compaction_picker()->GetCompactionInputsFromFileNumbers(
1355 &input_files, &input_set, version->storage_info(), compact_options);
1356 if (!s.ok()) {
1357 return s;
1358 }
1359
494da23a 1360 for (const auto& inputs : input_files) {
7c673cae
FG
1361 if (cfd->compaction_picker()->AreFilesInCompaction(inputs.files)) {
1362 return Status::Aborted(
1363 "Some of the necessary compaction input "
1364 "files are already being compacted");
1365 }
1366 }
11fdf7f2
TL
1367 bool sfm_reserved_compact_space = false;
1368 // First check if we have enough room to do the compaction
1369 bool enough_room = EnoughRoomForCompaction(
1370 cfd, input_files, &sfm_reserved_compact_space, log_buffer);
1371
1372 if (!enough_room) {
1373 // m's vars will get set properly at the end of this function,
1374 // as long as status == CompactionTooLarge
1375 return Status::CompactionTooLarge();
1376 }
7c673cae
FG
1377
1378 // At this point, CompactFiles will be run.
1379 bg_compaction_scheduled_++;
1380
494da23a 1381 std::unique_ptr<Compaction> c;
7c673cae
FG
1382 assert(cfd->compaction_picker());
1383 c.reset(cfd->compaction_picker()->CompactFiles(
1384 compact_options, input_files, output_level, version->storage_info(),
20effc67 1385 *cfd->GetLatestMutableCFOptions(), mutable_db_options_, output_path_id));
11fdf7f2
TL
1386 // we already sanitized the set of input files and checked for conflicts
1387 // without releasing the lock, so we're guaranteed a compaction can be formed.
1388 assert(c != nullptr);
1389
7c673cae
FG
1390 c->SetInputVersion(version);
1391 // deletion compaction currently not allowed in CompactFiles.
1392 assert(!c->deletion_compaction());
1393
494da23a 1394 std::vector<SequenceNumber> snapshot_seqs;
7c673cae 1395 SequenceNumber earliest_write_conflict_snapshot;
494da23a
TL
1396 SnapshotChecker* snapshot_checker;
1397 GetSnapshotContext(job_context, &snapshot_seqs,
1398 &earliest_write_conflict_snapshot, &snapshot_checker);
7c673cae 1399
f67539c2
TL
1400 std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem(
1401 new std::list<uint64_t>::iterator(
1402 CaptureCurrentFileNumberInPendingOutputs()));
7c673cae
FG
1403
1404 assert(is_snapshot_supported_ || snapshots_.empty());
494da23a 1405 CompactionJobStats compaction_job_stats;
7c673cae 1406 CompactionJob compaction_job(
1e59de90 1407 job_context->job_id, c.get(), immutable_db_options_, mutable_db_options_,
f67539c2 1408 file_options_for_compaction_, versions_.get(), &shutting_down_,
1e59de90 1409 log_buffer, directories_.GetDbDir(),
20effc67
TL
1410 GetDataDir(c->column_family_data(), c->output_path_id()),
1411 GetDataDir(c->column_family_data(), 0), stats_, &mutex_, &error_handler_,
1412 snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
1e59de90 1413 job_context, table_cache_, &event_logger_,
11fdf7f2 1414 c->mutable_cf_options()->paranoid_file_checks,
7c673cae 1415 c->mutable_cf_options()->report_bg_io_stats, dbname_,
20effc67 1416 &compaction_job_stats, Env::Priority::USER, io_tracer_,
1e59de90
TL
1417 kManualCompactionCanceledFalse_, db_id_, db_session_id_,
1418 c->column_family_data()->GetFullHistoryTsLow(), c->trim_ts(),
1419 &blob_callback_, &bg_compaction_scheduled_,
1420 &bg_bottom_compaction_scheduled_);
7c673cae
FG
1421
1422 // Creating a compaction influences the compaction score because the score
1423 // takes running compactions into account (by skipping files that are already
1424 // being compacted). Since we just changed compaction score, we recalculate it
1425 // here.
1426 version->storage_info()->ComputeCompactionScore(*cfd->ioptions(),
1427 *c->mutable_cf_options());
1428
1429 compaction_job.Prepare();
1430
1431 mutex_.Unlock();
1432 TEST_SYNC_POINT("CompactFilesImpl:0");
1433 TEST_SYNC_POINT("CompactFilesImpl:1");
1e59de90
TL
1434 // Ignore the status here, as it will be checked in the Install down below...
1435 compaction_job.Run().PermitUncheckedError();
7c673cae
FG
1436 TEST_SYNC_POINT("CompactFilesImpl:2");
1437 TEST_SYNC_POINT("CompactFilesImpl:3");
1438 mutex_.Lock();
1439
1440 Status status = compaction_job.Install(*c->mutable_cf_options());
1441 if (status.ok()) {
20effc67 1442 assert(compaction_job.io_status().ok());
494da23a
TL
1443 InstallSuperVersionAndScheduleWork(c->column_family_data(),
1444 &job_context->superversion_contexts[0],
1445 *c->mutable_cf_options());
7c673cae 1446 }
20effc67
TL
1447 // status above captures any error during compaction_job.Install, so its ok
1448 // not check compaction_job.io_status() explicitly if we're not calling
1449 // SetBGError
1450 compaction_job.io_status().PermitUncheckedError();
7c673cae 1451 c->ReleaseCompactionFiles(s);
11fdf7f2
TL
1452#ifndef ROCKSDB_LITE
1453 // Need to make sure SstFileManager does its bookkeeping
1454 auto sfm = static_cast<SstFileManagerImpl*>(
1455 immutable_db_options_.sst_file_manager.get());
1456 if (sfm && sfm_reserved_compact_space) {
1457 sfm->OnCompactionCompletion(c.get());
1458 }
1459#endif // ROCKSDB_LITE
7c673cae
FG
1460
1461 ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
1462
494da23a
TL
1463 if (compaction_job_info != nullptr) {
1464 BuildCompactionJobInfo(cfd, c.get(), s, compaction_job_stats,
1465 job_context->job_id, version, compaction_job_info);
1466 }
1467
7c673cae
FG
1468 if (status.ok()) {
1469 // Done
f67539c2 1470 } else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) {
7c673cae 1471 // Ignore compaction errors found during shutting down
f67539c2
TL
1472 } else if (status.IsManualCompactionPaused()) {
1473 // Don't report stopping manual compaction as error
1474 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1475 "[%s] [JOB %d] Stopping manual compaction",
1476 c->column_family_data()->GetName().c_str(),
1477 job_context->job_id);
7c673cae
FG
1478 } else {
1479 ROCKS_LOG_WARN(immutable_db_options_.info_log,
1480 "[%s] [JOB %d] Compaction error: %s",
1481 c->column_family_data()->GetName().c_str(),
1482 job_context->job_id, status.ToString().c_str());
20effc67
TL
1483 IOStatus io_s = compaction_job.io_status();
1484 if (!io_s.ok()) {
1e59de90 1485 error_handler_.SetBGError(io_s, BackgroundErrorReason::kCompaction);
20effc67 1486 } else {
1e59de90 1487 error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
20effc67 1488 }
11fdf7f2
TL
1489 }
1490
1491 if (output_file_names != nullptr) {
20effc67 1492 for (const auto& newf : c->edit()->GetNewFiles()) {
1e59de90
TL
1493 output_file_names->push_back(TableFileName(
1494 c->immutable_options()->cf_paths, newf.second.fd.GetNumber(),
1495 newf.second.fd.GetPathId()));
1496 }
1497
1498 for (const auto& blob_file : c->edit()->GetBlobFileAdditions()) {
1499 output_file_names->push_back(
1500 BlobFileName(c->immutable_options()->cf_paths.front().path,
1501 blob_file.GetBlobFileNumber()));
7c673cae
FG
1502 }
1503 }
1504
1505 c.reset();
1506
1507 bg_compaction_scheduled_--;
1508 if (bg_compaction_scheduled_ == 0) {
1509 bg_cv_.SignalAll();
1510 }
494da23a 1511 MaybeScheduleFlushOrCompaction();
11fdf7f2 1512 TEST_SYNC_POINT("CompactFilesImpl:End");
7c673cae
FG
1513
1514 return status;
1515}
1516#endif // ROCKSDB_LITE
1517
1518Status DBImpl::PauseBackgroundWork() {
1519 InstrumentedMutexLock guard_lock(&mutex_);
1520 bg_compaction_paused_++;
11fdf7f2
TL
1521 while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0 ||
1522 bg_flush_scheduled_ > 0) {
7c673cae
FG
1523 bg_cv_.Wait();
1524 }
1525 bg_work_paused_++;
1526 return Status::OK();
1527}
1528
1529Status DBImpl::ContinueBackgroundWork() {
1530 InstrumentedMutexLock guard_lock(&mutex_);
1531 if (bg_work_paused_ == 0) {
1532 return Status::InvalidArgument();
1533 }
1534 assert(bg_work_paused_ > 0);
1535 assert(bg_compaction_paused_ > 0);
1536 bg_compaction_paused_--;
1537 bg_work_paused_--;
1538 // It's sufficient to check just bg_work_paused_ here since
1539 // bg_work_paused_ is always no greater than bg_compaction_paused_
1540 if (bg_work_paused_ == 0) {
1541 MaybeScheduleFlushOrCompaction();
1542 }
1543 return Status::OK();
1544}
1545
494da23a
TL
1546void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c,
1547 const Status& st,
1548 const CompactionJobStats& job_stats,
1549 int job_id) {
7c673cae 1550#ifndef ROCKSDB_LITE
494da23a 1551 if (immutable_db_options_.listeners.empty()) {
7c673cae
FG
1552 return;
1553 }
1554 mutex_.AssertHeld();
1555 if (shutting_down_.load(std::memory_order_acquire)) {
1556 return;
1557 }
f67539c2 1558 if (c->is_manual_compaction() &&
20effc67 1559 manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
f67539c2
TL
1560 return;
1561 }
1e59de90
TL
1562
1563 c->SetNotifyOnCompactionCompleted();
11fdf7f2
TL
1564 Version* current = cfd->current();
1565 current->Ref();
7c673cae
FG
1566 // release lock while notifying events
1567 mutex_.Unlock();
494da23a 1568 TEST_SYNC_POINT("DBImpl::NotifyOnCompactionBegin::UnlockMutex");
7c673cae 1569 {
f67539c2 1570 CompactionJobInfo info{};
20effc67 1571 BuildCompactionJobInfo(cfd, c, st, job_stats, job_id, current, &info);
494da23a
TL
1572 for (auto listener : immutable_db_options_.listeners) {
1573 listener->OnCompactionBegin(this, info);
1574 }
20effc67 1575 info.status.PermitUncheckedError();
494da23a
TL
1576 }
1577 mutex_.Lock();
1578 current->Unref();
1579#else
1580 (void)cfd;
1581 (void)c;
1582 (void)st;
1583 (void)job_stats;
1584 (void)job_id;
1585#endif // ROCKSDB_LITE
1586}
1587
1588void DBImpl::NotifyOnCompactionCompleted(
1589 ColumnFamilyData* cfd, Compaction* c, const Status& st,
1590 const CompactionJobStats& compaction_job_stats, const int job_id) {
1591#ifndef ROCKSDB_LITE
1592 if (immutable_db_options_.listeners.size() == 0U) {
1593 return;
1594 }
1595 mutex_.AssertHeld();
1596 if (shutting_down_.load(std::memory_order_acquire)) {
1597 return;
1598 }
1e59de90
TL
1599
1600 if (c->ShouldNotifyOnCompactionCompleted() == false) {
f67539c2
TL
1601 return;
1602 }
1e59de90 1603
494da23a
TL
1604 Version* current = cfd->current();
1605 current->Ref();
1606 // release lock while notifying events
1607 mutex_.Unlock();
1608 TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex");
1609 {
f67539c2 1610 CompactionJobInfo info{};
494da23a
TL
1611 BuildCompactionJobInfo(cfd, c, st, compaction_job_stats, job_id, current,
1612 &info);
7c673cae
FG
1613 for (auto listener : immutable_db_options_.listeners) {
1614 listener->OnCompactionCompleted(this, info);
1615 }
1616 }
1617 mutex_.Lock();
11fdf7f2 1618 current->Unref();
7c673cae
FG
1619 // no need to signal bg_cv_ as it will be signaled at the end of the
1620 // flush process.
11fdf7f2
TL
1621#else
1622 (void)cfd;
1623 (void)c;
1624 (void)st;
1625 (void)compaction_job_stats;
1626 (void)job_id;
7c673cae
FG
1627#endif // ROCKSDB_LITE
1628}
1629
1630// REQUIREMENT: block all background work by calling PauseBackgroundWork()
1631// before calling this function
1632Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
1633 assert(level < cfd->NumberLevels());
1634 if (target_level >= cfd->NumberLevels()) {
1635 return Status::InvalidArgument("Target level exceeds number of levels");
1636 }
1637
11fdf7f2 1638 SuperVersionContext sv_context(/* create_superversion */ true);
7c673cae 1639
7c673cae
FG
1640 InstrumentedMutexLock guard_lock(&mutex_);
1641
1642 // only allow one thread refitting
1643 if (refitting_level_) {
1644 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1645 "[ReFitLevel] another thread is refitting");
1646 return Status::NotSupported("another thread is refitting");
1647 }
1648 refitting_level_ = true;
1649
1650 const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
1651 // move to a smaller level
1652 int to_level = target_level;
1653 if (target_level < 0) {
1654 to_level = FindMinimumEmptyLevelFitting(cfd, mutable_cf_options, level);
1655 }
1656
1657 auto* vstorage = cfd->current()->storage_info();
20effc67
TL
1658 if (to_level != level) {
1659 if (to_level > level) {
1660 if (level == 0) {
1661 refitting_level_ = false;
7c673cae 1662 return Status::NotSupported(
20effc67
TL
1663 "Cannot change from level 0 to other levels.");
1664 }
1665 // Check levels are empty for a trivial move
1666 for (int l = level + 1; l <= to_level; l++) {
1667 if (vstorage->NumLevelFiles(l) > 0) {
1668 refitting_level_ = false;
1669 return Status::NotSupported(
1670 "Levels between source and target are not empty for a move.");
1671 }
1672 }
1673 } else {
1674 // to_level < level
1675 // Check levels are empty for a trivial move
1676 for (int l = to_level; l < level; l++) {
1677 if (vstorage->NumLevelFiles(l) > 0) {
1678 refitting_level_ = false;
1679 return Status::NotSupported(
1680 "Levels between source and target are not empty for a move.");
1681 }
7c673cae
FG
1682 }
1683 }
7c673cae
FG
1684 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1685 "[%s] Before refitting:\n%s", cfd->GetName().c_str(),
1686 cfd->current()->DebugString().data());
1687
1688 VersionEdit edit;
1689 edit.SetColumnFamily(cfd->GetID());
1690 for (const auto& f : vstorage->LevelFiles(level)) {
1691 edit.DeleteFile(level, f->fd.GetNumber());
1e59de90
TL
1692 edit.AddFile(
1693 to_level, f->fd.GetNumber(), f->fd.GetPathId(), f->fd.GetFileSize(),
1694 f->smallest, f->largest, f->fd.smallest_seqno, f->fd.largest_seqno,
1695 f->marked_for_compaction, f->temperature, f->oldest_blob_file_number,
1696 f->oldest_ancester_time, f->file_creation_time, f->file_checksum,
1697 f->file_checksum_func_name, f->unique_id);
7c673cae
FG
1698 }
1699 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1700 "[%s] Apply version edit:\n%s", cfd->GetName().c_str(),
1701 edit.DebugString().data());
1702
1e59de90
TL
1703 Status status = versions_->LogAndApply(cfd, mutable_cf_options, &edit,
1704 &mutex_, directories_.GetDbDir());
1705
11fdf7f2 1706 InstallSuperVersionAndScheduleWork(cfd, &sv_context, mutable_cf_options);
7c673cae
FG
1707
1708 ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] LogAndApply: %s\n",
1709 cfd->GetName().c_str(), status.ToString().data());
1710
1711 if (status.ok()) {
1712 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1713 "[%s] After refitting:\n%s", cfd->GetName().c_str(),
1714 cfd->current()->DebugString().data());
1715 }
1e59de90
TL
1716 sv_context.Clean();
1717 refitting_level_ = false;
1718
1719 return status;
7c673cae
FG
1720 }
1721
1722 refitting_level_ = false;
1e59de90 1723 return Status::OK();
7c673cae
FG
1724}
1725
1726int DBImpl::NumberLevels(ColumnFamilyHandle* column_family) {
20effc67 1727 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
7c673cae
FG
1728 return cfh->cfd()->NumberLevels();
1729}
1730
11fdf7f2 1731int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle* /*column_family*/) {
7c673cae
FG
1732 return 0;
1733}
1734
1735int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) {
20effc67 1736 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
7c673cae 1737 InstrumentedMutexLock l(&mutex_);
11fdf7f2
TL
1738 return cfh->cfd()
1739 ->GetSuperVersion()
1740 ->mutable_cf_options.level0_stop_writes_trigger;
7c673cae
FG
1741}
1742
1743Status DBImpl::Flush(const FlushOptions& flush_options,
1744 ColumnFamilyHandle* column_family) {
20effc67 1745 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
11fdf7f2
TL
1746 ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.",
1747 cfh->GetName().c_str());
494da23a
TL
1748 Status s;
1749 if (immutable_db_options_.atomic_flush) {
1750 s = AtomicFlushMemTables({cfh->cfd()}, flush_options,
1751 FlushReason::kManualFlush);
1752 } else {
1753 s = FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualFlush);
1754 }
1755
11fdf7f2
TL
1756 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1757 "[%s] Manual flush finished, status: %s\n",
1758 cfh->GetName().c_str(), s.ToString().c_str());
1759 return s;
1760}
1761
494da23a
TL
1762Status DBImpl::Flush(const FlushOptions& flush_options,
1763 const std::vector<ColumnFamilyHandle*>& column_families) {
11fdf7f2 1764 Status s;
494da23a
TL
1765 if (!immutable_db_options_.atomic_flush) {
1766 for (auto cfh : column_families) {
1767 s = Flush(flush_options, cfh);
1768 if (!s.ok()) {
1769 break;
1770 }
11fdf7f2 1771 }
494da23a
TL
1772 } else {
1773 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1774 "Manual atomic flush start.\n"
1775 "=====Column families:=====");
1776 for (auto cfh : column_families) {
1777 auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh);
1778 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",
1779 cfhi->GetName().c_str());
11fdf7f2 1780 }
494da23a
TL
1781 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1782 "=====End of column families list=====");
1783 autovector<ColumnFamilyData*> cfds;
1784 std::for_each(column_families.begin(), column_families.end(),
1785 [&cfds](ColumnFamilyHandle* elem) {
1786 auto cfh = static_cast<ColumnFamilyHandleImpl*>(elem);
1787 cfds.emplace_back(cfh->cfd());
1788 });
1789 s = AtomicFlushMemTables(cfds, flush_options, FlushReason::kManualFlush);
1790 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1791 "Manual atomic flush finished, status: %s\n"
1792 "=====Column families:=====",
1793 s.ToString().c_str());
1794 for (auto cfh : column_families) {
1795 auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh);
1796 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",
1797 cfhi->GetName().c_str());
11fdf7f2 1798 }
494da23a
TL
1799 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1800 "=====End of column families list=====");
11fdf7f2 1801 }
11fdf7f2 1802 return s;
7c673cae
FG
1803}
1804
f67539c2
TL
1805Status DBImpl::RunManualCompaction(
1806 ColumnFamilyData* cfd, int input_level, int output_level,
1807 const CompactRangeOptions& compact_range_options, const Slice* begin,
1808 const Slice* end, bool exclusive, bool disallow_trivial_move,
1e59de90 1809 uint64_t max_file_num_to_ignore, const std::string& trim_ts) {
7c673cae
FG
1810 assert(input_level == ColumnFamilyData::kCompactAllLevels ||
1811 input_level >= 0);
1812
1813 InternalKey begin_storage, end_storage;
1e59de90 1814 CompactionArg* ca = nullptr;
7c673cae
FG
1815
1816 bool scheduled = false;
1e59de90
TL
1817 bool unscheduled = false;
1818 Env::Priority thread_pool_priority = Env::Priority::TOTAL;
7c673cae 1819 bool manual_conflict = false;
1e59de90
TL
1820
1821 ManualCompactionState manual(
1822 cfd, input_level, output_level, compact_range_options.target_path_id,
1823 exclusive, disallow_trivial_move, compact_range_options.canceled);
7c673cae
FG
1824 // For universal compaction, we enforce every manual compaction to compact
1825 // all files.
1826 if (begin == nullptr ||
1827 cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
1828 cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1829 manual.begin = nullptr;
1830 } else {
11fdf7f2 1831 begin_storage.SetMinPossibleForUserKey(*begin);
7c673cae
FG
1832 manual.begin = &begin_storage;
1833 }
1834 if (end == nullptr ||
1835 cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
1836 cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1837 manual.end = nullptr;
1838 } else {
11fdf7f2 1839 end_storage.SetMaxPossibleForUserKey(*end);
7c673cae
FG
1840 manual.end = &end_storage;
1841 }
1842
1843 TEST_SYNC_POINT("DBImpl::RunManualCompaction:0");
1844 TEST_SYNC_POINT("DBImpl::RunManualCompaction:1");
1845 InstrumentedMutexLock l(&mutex_);
1846
1e59de90
TL
1847 if (manual_compaction_paused_ > 0) {
1848 // Does not make sense to `AddManualCompaction()` in this scenario since
1849 // `DisableManualCompaction()` just waited for the manual compaction queue
1850 // to drain. So return immediately.
1851 TEST_SYNC_POINT("DBImpl::RunManualCompaction:PausedAtStart");
1852 manual.status =
1853 Status::Incomplete(Status::SubCode::kManualCompactionPaused);
1854 manual.done = true;
1855 return manual.status;
1856 }
1857
7c673cae
FG
1858 // When a manual compaction arrives, temporarily disable scheduling of
1859 // non-manual compactions and wait until the number of scheduled compaction
1e59de90
TL
1860 // jobs drops to zero. This used to be needed to ensure that this manual
1861 // compaction can compact any range of keys/files. Now it is optional
1862 // (see `CompactRangeOptions::exclusive_manual_compaction`). The use case for
1863 // `exclusive_manual_compaction=true` is unclear beyond not trusting the code.
7c673cae
FG
1864 //
1865 // HasPendingManualCompaction() is true when at least one thread is inside
1866 // RunManualCompaction(), i.e. during that time no other compaction will
1867 // get scheduled (see MaybeScheduleFlushOrCompaction).
1868 //
1869 // Note that the following loop doesn't stop more that one thread calling
1870 // RunManualCompaction() from getting to the second while loop below.
1871 // However, only one of them will actually schedule compaction, while
1872 // others will wait on a condition variable until it completes.
1873
1874 AddManualCompaction(&manual);
1875 TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_);
1876 if (exclusive) {
1e59de90
TL
1877 // Limitation: there's no way to wake up the below loop when user sets
1878 // `*manual.canceled`. So `CompactRangeOptions::exclusive_manual_compaction`
1879 // and `CompactRangeOptions::canceled` might not work well together.
11fdf7f2
TL
1880 while (bg_bottom_compaction_scheduled_ > 0 ||
1881 bg_compaction_scheduled_ > 0) {
1e59de90
TL
1882 if (manual_compaction_paused_ > 0 || manual.canceled == true) {
1883 // Pretend the error came from compaction so the below cleanup/error
1884 // handling code can process it.
1885 manual.done = true;
1886 manual.status =
1887 Status::Incomplete(Status::SubCode::kManualCompactionPaused);
1888 break;
1889 }
11fdf7f2 1890 TEST_SYNC_POINT("DBImpl::RunManualCompaction:WaitScheduled");
7c673cae
FG
1891 ROCKS_LOG_INFO(
1892 immutable_db_options_.info_log,
1893 "[%s] Manual compaction waiting for all other scheduled background "
1894 "compactions to finish",
1895 cfd->GetName().c_str());
1896 bg_cv_.Wait();
1897 }
1898 }
1899
494da23a
TL
1900 LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
1901 immutable_db_options_.info_log.get());
1e59de90
TL
1902
1903 ROCKS_LOG_BUFFER(&log_buffer, "[%s] Manual compaction starting",
1904 cfd->GetName().c_str());
1905
7c673cae
FG
1906 // We don't check bg_error_ here, because if we get the error in compaction,
1907 // the compaction will set manual.status to bg_error_ and set manual.done to
1908 // true.
1909 while (!manual.done) {
1910 assert(HasPendingManualCompaction());
1911 manual_conflict = false;
11fdf7f2 1912 Compaction* compaction = nullptr;
7c673cae
FG
1913 if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) ||
1914 scheduled ||
11fdf7f2
TL
1915 (((manual.manual_end = &manual.tmp_storage1) != nullptr) &&
1916 ((compaction = manual.cfd->CompactRange(
20effc67
TL
1917 *manual.cfd->GetLatestMutableCFOptions(), mutable_db_options_,
1918 manual.input_level, manual.output_level, compact_range_options,
1919 manual.begin, manual.end, &manual.manual_end, &manual_conflict,
1e59de90 1920 max_file_num_to_ignore, trim_ts)) == nullptr &&
11fdf7f2 1921 manual_conflict))) {
7c673cae
FG
1922 // exclusive manual compactions should not see a conflict during
1923 // CompactRange
1924 assert(!exclusive || !manual_conflict);
1925 // Running either this or some other manual compaction
1926 bg_cv_.Wait();
1e59de90
TL
1927 if (manual_compaction_paused_ > 0 && scheduled && !unscheduled) {
1928 assert(thread_pool_priority != Env::Priority::TOTAL);
1929 // unschedule all manual compactions
1930 auto unscheduled_task_num = env_->UnSchedule(
1931 GetTaskTag(TaskType::kManualCompaction), thread_pool_priority);
1932 if (unscheduled_task_num > 0) {
1933 ROCKS_LOG_INFO(
1934 immutable_db_options_.info_log,
1935 "[%s] Unscheduled %d number of manual compactions from the "
1936 "thread-pool",
1937 cfd->GetName().c_str(), unscheduled_task_num);
1938 // it may unschedule other manual compactions, notify others.
1939 bg_cv_.SignalAll();
1940 }
1941 unscheduled = true;
1942 TEST_SYNC_POINT("DBImpl::RunManualCompaction:Unscheduled");
1943 }
7c673cae
FG
1944 if (scheduled && manual.incomplete == true) {
1945 assert(!manual.in_progress);
1946 scheduled = false;
1947 manual.incomplete = false;
1948 }
1949 } else if (!scheduled) {
11fdf7f2 1950 if (compaction == nullptr) {
7c673cae
FG
1951 manual.done = true;
1952 bg_cv_.SignalAll();
1953 continue;
1954 }
1955 ca = new CompactionArg;
1956 ca->db = this;
11fdf7f2
TL
1957 ca->prepicked_compaction = new PrepickedCompaction;
1958 ca->prepicked_compaction->manual_compaction_state = &manual;
1959 ca->prepicked_compaction->compaction = compaction;
494da23a
TL
1960 if (!RequestCompactionToken(
1961 cfd, true, &ca->prepicked_compaction->task_token, &log_buffer)) {
1962 // Don't throttle manual compaction, only count outstanding tasks.
1963 assert(false);
1964 }
7c673cae 1965 manual.incomplete = false;
20effc67
TL
1966 if (compaction->bottommost_level() &&
1967 env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
1e59de90
TL
1968 bg_bottom_compaction_scheduled_++;
1969 ca->compaction_pri_ = Env::Priority::BOTTOM;
1970 env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca,
1971 Env::Priority::BOTTOM,
1972 GetTaskTag(TaskType::kManualCompaction),
1973 &DBImpl::UnscheduleCompactionCallback);
1974 thread_pool_priority = Env::Priority::BOTTOM;
1975 } else {
1976 bg_compaction_scheduled_++;
1977 ca->compaction_pri_ = Env::Priority::LOW;
1978 env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW,
1979 GetTaskTag(TaskType::kManualCompaction),
1980 &DBImpl::UnscheduleCompactionCallback);
1981 thread_pool_priority = Env::Priority::LOW;
20effc67 1982 }
7c673cae 1983 scheduled = true;
1e59de90 1984 TEST_SYNC_POINT("DBImpl::RunManualCompaction:Scheduled");
7c673cae
FG
1985 }
1986 }
1987
494da23a 1988 log_buffer.FlushBufferToLog();
7c673cae
FG
1989 assert(!manual.in_progress);
1990 assert(HasPendingManualCompaction());
1991 RemoveManualCompaction(&manual);
1e59de90
TL
1992 // if the manual job is unscheduled, try schedule other jobs in case there's
1993 // any unscheduled compaction job which was blocked by exclusive manual
1994 // compaction.
1995 if (manual.status.IsIncomplete() &&
1996 manual.status.subcode() == Status::SubCode::kManualCompactionPaused) {
1997 MaybeScheduleFlushOrCompaction();
1998 }
7c673cae
FG
1999 bg_cv_.SignalAll();
2000 return manual.status;
2001}
2002
494da23a
TL
2003void DBImpl::GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
2004 FlushRequest* req) {
2005 assert(req != nullptr);
2006 req->reserve(cfds.size());
2007 for (const auto cfd : cfds) {
2008 if (nullptr == cfd) {
2009 // cfd may be null, see DBImpl::ScheduleFlushes
2010 continue;
2011 }
2012 uint64_t max_memtable_id = cfd->imm()->GetLatestMemTableID();
2013 req->emplace_back(cfd, max_memtable_id);
2014 }
2015}
2016
7c673cae
FG
2017Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
2018 const FlushOptions& flush_options,
1e59de90
TL
2019 FlushReason flush_reason,
2020 bool entered_write_thread) {
2021 // This method should not be called if atomic_flush is true.
2022 assert(!immutable_db_options_.atomic_flush);
2023 if (!flush_options.wait && write_controller_.IsStopped()) {
2024 std::ostringstream oss;
2025 oss << "Writes have been stopped, thus unable to perform manual flush. "
2026 "Please try again later after writes are resumed";
2027 return Status::TryAgain(oss.str());
2028 }
7c673cae 2029 Status s;
11fdf7f2
TL
2030 if (!flush_options.allow_write_stall) {
2031 bool flush_needed = true;
2032 s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
2033 TEST_SYNC_POINT("DBImpl::FlushMemTable:StallWaitDone");
2034 if (!s.ok() || !flush_needed) {
2035 return s;
2036 }
2037 }
20effc67 2038
1e59de90
TL
2039 const bool needs_to_join_write_thread = !entered_write_thread;
2040 autovector<FlushRequest> flush_reqs;
2041 autovector<uint64_t> memtable_ids_to_wait;
7c673cae
FG
2042 {
2043 WriteContext context;
2044 InstrumentedMutexLock guard_lock(&mutex_);
2045
7c673cae 2046 WriteThread::Writer w;
f67539c2 2047 WriteThread::Writer nonmem_w;
1e59de90 2048 if (needs_to_join_write_thread) {
7c673cae 2049 write_thread_.EnterUnbatched(&w, &mutex_);
f67539c2
TL
2050 if (two_write_queues_) {
2051 nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
2052 }
7c673cae 2053 }
f67539c2 2054 WaitForPendingWrites();
7c673cae 2055
1e59de90
TL
2056 if (flush_reason != FlushReason::kErrorRecoveryRetryFlush &&
2057 (!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load())) {
2058 // Note that, when flush reason is kErrorRecoveryRetryFlush, during the
2059 // auto retry resume, we want to avoid creating new small memtables.
2060 // Therefore, SwitchMemtable will not be called. Also, since ResumeImpl
2061 // will iterate through all the CFs and call FlushMemtable during auto
2062 // retry resume, it is possible that in some CFs,
2063 // cfd->imm()->NumNotFlushed() = 0. In this case, so no flush request will
2064 // be created and scheduled, status::OK() will be returned.
2065 s = SwitchMemtable(cfd, &context);
494da23a 2066 }
1e59de90 2067 const uint64_t flush_memtable_id = std::numeric_limits<uint64_t>::max();
494da23a
TL
2068 if (s.ok()) {
2069 if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
2070 !cached_recoverable_state_empty_.load()) {
1e59de90
TL
2071 FlushRequest req{{cfd, flush_memtable_id}};
2072 flush_reqs.emplace_back(std::move(req));
2073 memtable_ids_to_wait.emplace_back(cfd->imm()->GetLatestMemTableID());
494da23a 2074 }
20effc67
TL
2075 if (immutable_db_options_.persist_stats_to_disk &&
2076 flush_reason != FlushReason::kErrorRecoveryRetryFlush) {
f67539c2
TL
2077 ColumnFamilyData* cfd_stats =
2078 versions_->GetColumnFamilySet()->GetColumnFamily(
2079 kPersistentStatsColumnFamilyName);
2080 if (cfd_stats != nullptr && cfd_stats != cfd &&
2081 !cfd_stats->mem()->IsEmpty()) {
2082 // only force flush stats CF when it will be the only CF lagging
2083 // behind after the current flush
2084 bool stats_cf_flush_needed = true;
2085 for (auto* loop_cfd : *versions_->GetColumnFamilySet()) {
2086 if (loop_cfd == cfd_stats || loop_cfd == cfd) {
2087 continue;
2088 }
2089 if (loop_cfd->GetLogNumber() <= cfd_stats->GetLogNumber()) {
2090 stats_cf_flush_needed = false;
2091 }
2092 }
2093 if (stats_cf_flush_needed) {
2094 ROCKS_LOG_INFO(immutable_db_options_.info_log,
2095 "Force flushing stats CF with manual flush of %s "
2096 "to avoid holding old logs",
2097 cfd->GetName().c_str());
2098 s = SwitchMemtable(cfd_stats, &context);
1e59de90
TL
2099 FlushRequest req{{cfd_stats, flush_memtable_id}};
2100 flush_reqs.emplace_back(std::move(req));
2101 memtable_ids_to_wait.emplace_back(
2102 cfd->imm()->GetLatestMemTableID());
f67539c2
TL
2103 }
2104 }
2105 }
11fdf7f2 2106 }
1e59de90
TL
2107
2108 if (s.ok() && !flush_reqs.empty()) {
2109 for (const auto& req : flush_reqs) {
2110 assert(req.size() == 1);
2111 ColumnFamilyData* loop_cfd = req[0].first;
11fdf7f2
TL
2112 loop_cfd->imm()->FlushRequested();
2113 }
f67539c2
TL
2114 // If the caller wants to wait for this flush to complete, it indicates
2115 // that the caller expects the ColumnFamilyData not to be free'ed by
2116 // other threads which may drop the column family concurrently.
2117 // Therefore, we increase the cfd's ref count.
2118 if (flush_options.wait) {
1e59de90
TL
2119 for (const auto& req : flush_reqs) {
2120 assert(req.size() == 1);
2121 ColumnFamilyData* loop_cfd = req[0].first;
f67539c2
TL
2122 loop_cfd->Ref();
2123 }
2124 }
1e59de90
TL
2125 for (const auto& req : flush_reqs) {
2126 SchedulePendingFlush(req, flush_reason);
2127 }
11fdf7f2
TL
2128 MaybeScheduleFlushOrCompaction();
2129 }
7c673cae 2130
1e59de90 2131 if (needs_to_join_write_thread) {
7c673cae 2132 write_thread_.ExitUnbatched(&w);
f67539c2
TL
2133 if (two_write_queues_) {
2134 nonmem_write_thread_.ExitUnbatched(&nonmem_w);
2135 }
7c673cae 2136 }
7c673cae 2137 }
f67539c2
TL
2138 TEST_SYNC_POINT("DBImpl::FlushMemTable:AfterScheduleFlush");
2139 TEST_SYNC_POINT("DBImpl::FlushMemTable:BeforeWaitForBgFlush");
7c673cae 2140 if (s.ok() && flush_options.wait) {
11fdf7f2
TL
2141 autovector<ColumnFamilyData*> cfds;
2142 autovector<const uint64_t*> flush_memtable_ids;
1e59de90
TL
2143 assert(flush_reqs.size() == memtable_ids_to_wait.size());
2144 for (size_t i = 0; i < flush_reqs.size(); ++i) {
2145 assert(flush_reqs[i].size() == 1);
2146 cfds.push_back(flush_reqs[i][0].first);
2147 flush_memtable_ids.push_back(&(memtable_ids_to_wait[i]));
11fdf7f2 2148 }
20effc67
TL
2149 s = WaitForFlushMemTables(
2150 cfds, flush_memtable_ids,
2151 (flush_reason == FlushReason::kErrorRecovery ||
2152 flush_reason == FlushReason::kErrorRecoveryRetryFlush));
f67539c2
TL
2153 InstrumentedMutexLock lock_guard(&mutex_);
2154 for (auto* tmp_cfd : cfds) {
2155 tmp_cfd->UnrefAndTryDelete();
2156 }
7c673cae 2157 }
f67539c2 2158 TEST_SYNC_POINT("DBImpl::FlushMemTable:FlushMemTableFinished");
7c673cae
FG
2159 return s;
2160}
2161
f67539c2 2162// Flush all elements in 'column_family_datas'
494da23a
TL
2163// and atomically record the result to the MANIFEST.
2164Status DBImpl::AtomicFlushMemTables(
2165 const autovector<ColumnFamilyData*>& column_family_datas,
2166 const FlushOptions& flush_options, FlushReason flush_reason,
1e59de90
TL
2167 bool entered_write_thread) {
2168 assert(immutable_db_options_.atomic_flush);
2169 if (!flush_options.wait && write_controller_.IsStopped()) {
2170 std::ostringstream oss;
2171 oss << "Writes have been stopped, thus unable to perform manual flush. "
2172 "Please try again later after writes are resumed";
2173 return Status::TryAgain(oss.str());
2174 }
494da23a
TL
2175 Status s;
2176 if (!flush_options.allow_write_stall) {
2177 int num_cfs_to_flush = 0;
2178 for (auto cfd : column_family_datas) {
2179 bool flush_needed = true;
2180 s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
2181 if (!s.ok()) {
2182 return s;
2183 } else if (flush_needed) {
2184 ++num_cfs_to_flush;
2185 }
2186 }
2187 if (0 == num_cfs_to_flush) {
2188 return s;
2189 }
2190 }
1e59de90 2191 const bool needs_to_join_write_thread = !entered_write_thread;
494da23a
TL
2192 FlushRequest flush_req;
2193 autovector<ColumnFamilyData*> cfds;
2194 {
2195 WriteContext context;
2196 InstrumentedMutexLock guard_lock(&mutex_);
2197
2198 WriteThread::Writer w;
f67539c2 2199 WriteThread::Writer nonmem_w;
1e59de90 2200 if (needs_to_join_write_thread) {
494da23a 2201 write_thread_.EnterUnbatched(&w, &mutex_);
f67539c2
TL
2202 if (two_write_queues_) {
2203 nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
2204 }
494da23a 2205 }
f67539c2 2206 WaitForPendingWrites();
494da23a
TL
2207
2208 for (auto cfd : column_family_datas) {
2209 if (cfd->IsDropped()) {
2210 continue;
2211 }
2212 if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
2213 !cached_recoverable_state_empty_.load()) {
2214 cfds.emplace_back(cfd);
2215 }
2216 }
2217 for (auto cfd : cfds) {
20effc67
TL
2218 if ((cfd->mem()->IsEmpty() && cached_recoverable_state_empty_.load()) ||
2219 flush_reason == FlushReason::kErrorRecoveryRetryFlush) {
494da23a
TL
2220 continue;
2221 }
2222 cfd->Ref();
2223 s = SwitchMemtable(cfd, &context);
f67539c2 2224 cfd->UnrefAndTryDelete();
494da23a
TL
2225 if (!s.ok()) {
2226 break;
2227 }
2228 }
2229 if (s.ok()) {
2230 AssignAtomicFlushSeq(cfds);
2231 for (auto cfd : cfds) {
2232 cfd->imm()->FlushRequested();
2233 }
f67539c2
TL
2234 // If the caller wants to wait for this flush to complete, it indicates
2235 // that the caller expects the ColumnFamilyData not to be free'ed by
2236 // other threads which may drop the column family concurrently.
2237 // Therefore, we increase the cfd's ref count.
2238 if (flush_options.wait) {
2239 for (auto cfd : cfds) {
2240 cfd->Ref();
2241 }
2242 }
494da23a
TL
2243 GenerateFlushRequest(cfds, &flush_req);
2244 SchedulePendingFlush(flush_req, flush_reason);
2245 MaybeScheduleFlushOrCompaction();
2246 }
2247
1e59de90 2248 if (needs_to_join_write_thread) {
494da23a 2249 write_thread_.ExitUnbatched(&w);
f67539c2
TL
2250 if (two_write_queues_) {
2251 nonmem_write_thread_.ExitUnbatched(&nonmem_w);
2252 }
494da23a
TL
2253 }
2254 }
2255 TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:AfterScheduleFlush");
f67539c2 2256 TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush");
494da23a
TL
2257 if (s.ok() && flush_options.wait) {
2258 autovector<const uint64_t*> flush_memtable_ids;
2259 for (auto& iter : flush_req) {
2260 flush_memtable_ids.push_back(&(iter.second));
2261 }
20effc67
TL
2262 s = WaitForFlushMemTables(
2263 cfds, flush_memtable_ids,
2264 (flush_reason == FlushReason::kErrorRecovery ||
2265 flush_reason == FlushReason::kErrorRecoveryRetryFlush));
f67539c2
TL
2266 InstrumentedMutexLock lock_guard(&mutex_);
2267 for (auto* cfd : cfds) {
2268 cfd->UnrefAndTryDelete();
2269 }
494da23a
TL
2270 }
2271 return s;
2272}
2273
11fdf7f2
TL
2274// Calling FlushMemTable(), whether from DB::Flush() or from Backup Engine, can
2275// cause write stall, for example if one memtable is being flushed already.
2276// This method tries to avoid write stall (similar to CompactRange() behavior)
2277// it emulates how the SuperVersion / LSM would change if flush happens, checks
2278// it against various constrains and delays flush if it'd cause write stall.
1e59de90 2279// Caller should check status and flush_needed to see if flush already happened.
11fdf7f2 2280Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
494da23a 2281 bool* flush_needed) {
11fdf7f2
TL
2282 {
2283 *flush_needed = true;
2284 InstrumentedMutexLock l(&mutex_);
2285 uint64_t orig_active_memtable_id = cfd->mem()->GetID();
2286 WriteStallCondition write_stall_condition = WriteStallCondition::kNormal;
2287 do {
2288 if (write_stall_condition != WriteStallCondition::kNormal) {
494da23a
TL
2289 // Same error handling as user writes: Don't wait if there's a
2290 // background error, even if it's a soft error. We might wait here
2291 // indefinitely as the pending flushes/compactions may never finish
2292 // successfully, resulting in the stall condition lasting indefinitely
2293 if (error_handler_.IsBGWorkStopped()) {
2294 return error_handler_.GetBGError();
2295 }
2296
11fdf7f2
TL
2297 TEST_SYNC_POINT("DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait");
2298 ROCKS_LOG_INFO(immutable_db_options_.info_log,
2299 "[%s] WaitUntilFlushWouldNotStallWrites"
2300 " waiting on stall conditions to clear",
2301 cfd->GetName().c_str());
2302 bg_cv_.Wait();
2303 }
f67539c2
TL
2304 if (cfd->IsDropped()) {
2305 return Status::ColumnFamilyDropped();
2306 }
2307 if (shutting_down_.load(std::memory_order_acquire)) {
11fdf7f2
TL
2308 return Status::ShutdownInProgress();
2309 }
2310
2311 uint64_t earliest_memtable_id =
2312 std::min(cfd->mem()->GetID(), cfd->imm()->GetEarliestMemTableID());
2313 if (earliest_memtable_id > orig_active_memtable_id) {
2314 // We waited so long that the memtable we were originally waiting on was
2315 // flushed.
2316 *flush_needed = false;
2317 return Status::OK();
2318 }
2319
2320 const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions();
2321 const auto* vstorage = cfd->current()->storage_info();
2322
2323 // Skip stalling check if we're below auto-flush and auto-compaction
2324 // triggers. If it stalled in these conditions, that'd mean the stall
2325 // triggers are so low that stalling is needed for any background work. In
2326 // that case we shouldn't wait since background work won't be scheduled.
2327 if (cfd->imm()->NumNotFlushed() <
2328 cfd->ioptions()->min_write_buffer_number_to_merge &&
2329 vstorage->l0_delay_trigger_count() <
2330 mutable_cf_options.level0_file_num_compaction_trigger) {
2331 break;
2332 }
2333
2334 // check whether one extra immutable memtable or an extra L0 file would
2335 // cause write stalling mode to be entered. It could still enter stall
2336 // mode due to pending compaction bytes, but that's less common
1e59de90
TL
2337 write_stall_condition = ColumnFamilyData::GetWriteStallConditionAndCause(
2338 cfd->imm()->NumNotFlushed() + 1,
2339 vstorage->l0_delay_trigger_count() + 1,
2340 vstorage->estimated_compaction_needed_bytes(),
2341 mutable_cf_options, *cfd->ioptions())
2342 .first;
11fdf7f2
TL
2343 } while (write_stall_condition != WriteStallCondition::kNormal);
2344 }
2345 return Status::OK();
2346}
2347
2348// Wait for memtables to be flushed for multiple column families.
2349// let N = cfds.size()
2350// for i in [0, N),
2351// 1) if flush_memtable_ids[i] is not null, then the memtables with lower IDs
2352// have to be flushed for THIS column family;
2353// 2) if flush_memtable_ids[i] is null, then all memtables in THIS column
2354// family have to be flushed.
2355// Finish waiting when ALL column families finish flushing memtables.
494da23a
TL
2356// resuming_from_bg_err indicates whether the caller is trying to resume from
2357// background error or in normal processing.
11fdf7f2
TL
2358Status DBImpl::WaitForFlushMemTables(
2359 const autovector<ColumnFamilyData*>& cfds,
494da23a
TL
2360 const autovector<const uint64_t*>& flush_memtable_ids,
2361 bool resuming_from_bg_err) {
11fdf7f2 2362 int num = static_cast<int>(cfds.size());
7c673cae
FG
2363 // Wait until the compaction completes
2364 InstrumentedMutexLock l(&mutex_);
1e59de90 2365 Status s;
494da23a
TL
2366 // If the caller is trying to resume from bg error, then
2367 // error_handler_.IsDBStopped() is true.
2368 while (resuming_from_bg_err || !error_handler_.IsDBStopped()) {
7c673cae 2369 if (shutting_down_.load(std::memory_order_acquire)) {
1e59de90
TL
2370 s = Status::ShutdownInProgress();
2371 return s;
7c673cae 2372 }
494da23a 2373 // If an error has occurred during resumption, then no need to wait.
1e59de90
TL
2374 // But flush operation may fail because of this error, so need to
2375 // return the status.
494da23a 2376 if (!error_handler_.GetRecoveryError().ok()) {
1e59de90 2377 s = error_handler_.GetRecoveryError();
494da23a
TL
2378 break;
2379 }
20effc67
TL
2380 // If BGWorkStopped, which indicate that there is a BG error and
2381 // 1) soft error but requires no BG work, 2) no in auto_recovery_
2382 if (!resuming_from_bg_err && error_handler_.IsBGWorkStopped() &&
2383 error_handler_.GetBGError().severity() < Status::Severity::kHardError) {
1e59de90
TL
2384 s = error_handler_.GetBGError();
2385 return s;
20effc67
TL
2386 }
2387
11fdf7f2
TL
2388 // Number of column families that have been dropped.
2389 int num_dropped = 0;
2390 // Number of column families that have finished flush.
2391 int num_finished = 0;
2392 for (int i = 0; i < num; ++i) {
2393 if (cfds[i]->IsDropped()) {
2394 ++num_dropped;
2395 } else if (cfds[i]->imm()->NumNotFlushed() == 0 ||
2396 (flush_memtable_ids[i] != nullptr &&
2397 cfds[i]->imm()->GetEarliestMemTableID() >
2398 *flush_memtable_ids[i])) {
2399 ++num_finished;
2400 }
2401 }
2402 if (1 == num_dropped && 1 == num) {
1e59de90
TL
2403 s = Status::ColumnFamilyDropped();
2404 return s;
7c673cae 2405 }
11fdf7f2
TL
2406 // Column families involved in this flush request have either been dropped
2407 // or finished flush. Then it's time to finish waiting.
2408 if (num_dropped + num_finished == num) {
2409 break;
2410 }
7c673cae
FG
2411 bg_cv_.Wait();
2412 }
494da23a
TL
2413 // If not resuming from bg error, and an error has caused the DB to stop,
2414 // then report the bg error to caller.
2415 if (!resuming_from_bg_err && error_handler_.IsDBStopped()) {
11fdf7f2 2416 s = error_handler_.GetBGError();
7c673cae
FG
2417 }
2418 return s;
2419}
2420
2421Status DBImpl::EnableAutoCompaction(
2422 const std::vector<ColumnFamilyHandle*>& column_family_handles) {
2423 Status s;
2424 for (auto cf_ptr : column_family_handles) {
2425 Status status =
2426 this->SetOptions(cf_ptr, {{"disable_auto_compactions", "false"}});
2427 if (!status.ok()) {
2428 s = status;
2429 }
2430 }
2431
2432 return s;
2433}
2434
1e59de90
TL
2435// NOTE: Calling DisableManualCompaction() may overwrite the
2436// user-provided canceled variable in CompactRangeOptions
f67539c2 2437void DBImpl::DisableManualCompaction() {
20effc67
TL
2438 InstrumentedMutexLock l(&mutex_);
2439 manual_compaction_paused_.fetch_add(1, std::memory_order_release);
1e59de90
TL
2440
2441 // Mark the canceled as true when the cancellation is triggered by
2442 // manual_compaction_paused (may overwrite user-provided `canceled`)
2443 for (const auto& manual_compaction : manual_compaction_dequeue_) {
2444 manual_compaction->canceled = true;
2445 }
2446
2447 // Wake up manual compactions waiting to start.
2448 bg_cv_.SignalAll();
2449
20effc67
TL
2450 // Wait for any pending manual compactions to finish (typically through
2451 // failing with `Status::Incomplete`) prior to returning. This way we are
2452 // guaranteed no pending manual compaction will commit while manual
2453 // compactions are "disabled".
2454 while (HasPendingManualCompaction()) {
2455 bg_cv_.Wait();
2456 }
f67539c2
TL
2457}
2458
1e59de90
TL
2459// NOTE: In contrast to DisableManualCompaction(), calling
2460// EnableManualCompaction() does NOT overwrite the user-provided *canceled
2461// variable to be false since there is NO CHANCE a canceled compaction
2462// is uncanceled. In other words, a canceled compaction must have been
2463// dropped out of the manual compaction queue, when we disable it.
f67539c2 2464void DBImpl::EnableManualCompaction() {
20effc67
TL
2465 InstrumentedMutexLock l(&mutex_);
2466 assert(manual_compaction_paused_ > 0);
2467 manual_compaction_paused_.fetch_sub(1, std::memory_order_release);
f67539c2
TL
2468}
2469
7c673cae
FG
2470void DBImpl::MaybeScheduleFlushOrCompaction() {
2471 mutex_.AssertHeld();
2472 if (!opened_successfully_) {
2473 // Compaction may introduce data race to DB open
2474 return;
2475 }
2476 if (bg_work_paused_ > 0) {
2477 // we paused the background work
2478 return;
11fdf7f2 2479 } else if (error_handler_.IsBGWorkStopped() &&
494da23a 2480 !error_handler_.IsRecoveryInProgress()) {
11fdf7f2
TL
2481 // There has been a hard error and this call is not part of the recovery
2482 // sequence. Bail out here so we don't get into an endless loop of
2483 // scheduling BG work which will again call this function
2484 return;
7c673cae
FG
2485 } else if (shutting_down_.load(std::memory_order_acquire)) {
2486 // DB is being deleted; no more background compactions
2487 return;
2488 }
11fdf7f2
TL
2489 auto bg_job_limits = GetBGJobLimits();
2490 bool is_flush_pool_empty =
2491 env_->GetBackgroundThreads(Env::Priority::HIGH) == 0;
2492 while (!is_flush_pool_empty && unscheduled_flushes_ > 0 &&
2493 bg_flush_scheduled_ < bg_job_limits.max_flushes) {
7c673cae 2494 bg_flush_scheduled_++;
494da23a
TL
2495 FlushThreadArg* fta = new FlushThreadArg;
2496 fta->db_ = this;
2497 fta->thread_pri_ = Env::Priority::HIGH;
2498 env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::HIGH, this,
2499 &DBImpl::UnscheduleFlushCallback);
f67539c2
TL
2500 --unscheduled_flushes_;
2501 TEST_SYNC_POINT_CALLBACK(
2502 "DBImpl::MaybeScheduleFlushOrCompaction:AfterSchedule:0",
2503 &unscheduled_flushes_);
7c673cae
FG
2504 }
2505
11fdf7f2
TL
2506 // special case -- if high-pri (flush) thread pool is empty, then schedule
2507 // flushes in low-pri (compaction) thread pool.
2508 if (is_flush_pool_empty) {
7c673cae
FG
2509 while (unscheduled_flushes_ > 0 &&
2510 bg_flush_scheduled_ + bg_compaction_scheduled_ <
11fdf7f2 2511 bg_job_limits.max_flushes) {
7c673cae 2512 bg_flush_scheduled_++;
494da23a
TL
2513 FlushThreadArg* fta = new FlushThreadArg;
2514 fta->db_ = this;
2515 fta->thread_pri_ = Env::Priority::LOW;
2516 env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::LOW, this,
2517 &DBImpl::UnscheduleFlushCallback);
f67539c2 2518 --unscheduled_flushes_;
7c673cae
FG
2519 }
2520 }
2521
2522 if (bg_compaction_paused_ > 0) {
2523 // we paused the background compaction
2524 return;
11fdf7f2
TL
2525 } else if (error_handler_.IsBGWorkStopped()) {
2526 // Compaction is not part of the recovery sequence from a hard error. We
2527 // might get here because recovery might do a flush and install a new
2528 // super version, which will try to schedule pending compactions. Bail
2529 // out here and let the higher level recovery handle compactions
2530 return;
7c673cae
FG
2531 }
2532
2533 if (HasExclusiveManualCompaction()) {
2534 // only manual compactions are allowed to run. don't schedule automatic
2535 // compactions
11fdf7f2 2536 TEST_SYNC_POINT("DBImpl::MaybeScheduleFlushOrCompaction:Conflict");
7c673cae
FG
2537 return;
2538 }
2539
1e59de90
TL
2540 while (bg_compaction_scheduled_ + bg_bottom_compaction_scheduled_ <
2541 bg_job_limits.max_compactions &&
7c673cae
FG
2542 unscheduled_compactions_ > 0) {
2543 CompactionArg* ca = new CompactionArg;
2544 ca->db = this;
1e59de90 2545 ca->compaction_pri_ = Env::Priority::LOW;
11fdf7f2 2546 ca->prepicked_compaction = nullptr;
7c673cae
FG
2547 bg_compaction_scheduled_++;
2548 unscheduled_compactions_--;
2549 env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
494da23a 2550 &DBImpl::UnscheduleCompactionCallback);
7c673cae
FG
2551 }
2552}
2553
11fdf7f2 2554DBImpl::BGJobLimits DBImpl::GetBGJobLimits() const {
7c673cae 2555 mutex_.AssertHeld();
20effc67 2556 return GetBGJobLimits(mutable_db_options_.max_background_flushes,
11fdf7f2
TL
2557 mutable_db_options_.max_background_compactions,
2558 mutable_db_options_.max_background_jobs,
2559 write_controller_.NeedSpeedupCompaction());
2560}
2561
2562DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes,
2563 int max_background_compactions,
2564 int max_background_jobs,
2565 bool parallelize_compactions) {
2566 BGJobLimits res;
2567 if (max_background_flushes == -1 && max_background_compactions == -1) {
2568 // for our first stab implementing max_background_jobs, simply allocate a
2569 // quarter of the threads to flushes.
2570 res.max_flushes = std::max(1, max_background_jobs / 4);
2571 res.max_compactions = std::max(1, max_background_jobs - res.max_flushes);
7c673cae 2572 } else {
11fdf7f2
TL
2573 // compatibility code in case users haven't migrated to max_background_jobs,
2574 // which automatically computes flush/compaction limits
2575 res.max_flushes = std::max(1, max_background_flushes);
2576 res.max_compactions = std::max(1, max_background_compactions);
7c673cae 2577 }
11fdf7f2
TL
2578 if (!parallelize_compactions) {
2579 // throttle background compactions until we deem necessary
2580 res.max_compactions = 1;
2581 }
2582 return res;
7c673cae
FG
2583}
2584
2585void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) {
11fdf7f2 2586 assert(!cfd->queued_for_compaction());
7c673cae
FG
2587 cfd->Ref();
2588 compaction_queue_.push_back(cfd);
11fdf7f2 2589 cfd->set_queued_for_compaction(true);
7c673cae
FG
2590}
2591
2592ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() {
2593 assert(!compaction_queue_.empty());
2594 auto cfd = *compaction_queue_.begin();
2595 compaction_queue_.pop_front();
11fdf7f2
TL
2596 assert(cfd->queued_for_compaction());
2597 cfd->set_queued_for_compaction(false);
7c673cae
FG
2598 return cfd;
2599}
2600
11fdf7f2 2601DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() {
7c673cae 2602 assert(!flush_queue_.empty());
11fdf7f2 2603 FlushRequest flush_req = flush_queue_.front();
7c673cae 2604 flush_queue_.pop_front();
1e59de90
TL
2605 if (!immutable_db_options_.atomic_flush) {
2606 assert(flush_req.size() == 1);
2607 }
2608 for (const auto& elem : flush_req) {
2609 if (!immutable_db_options_.atomic_flush) {
2610 ColumnFamilyData* cfd = elem.first;
2611 assert(cfd);
2612 assert(cfd->queued_for_flush());
2613 cfd->set_queued_for_flush(false);
2614 }
2615 }
11fdf7f2
TL
2616 // TODO: need to unset flush reason?
2617 return flush_req;
7c673cae
FG
2618}
2619
494da23a
TL
2620ColumnFamilyData* DBImpl::PickCompactionFromQueue(
2621 std::unique_ptr<TaskLimiterToken>* token, LogBuffer* log_buffer) {
2622 assert(!compaction_queue_.empty());
2623 assert(*token == nullptr);
2624 autovector<ColumnFamilyData*> throttled_candidates;
2625 ColumnFamilyData* cfd = nullptr;
2626 while (!compaction_queue_.empty()) {
2627 auto first_cfd = *compaction_queue_.begin();
2628 compaction_queue_.pop_front();
2629 assert(first_cfd->queued_for_compaction());
2630 if (!RequestCompactionToken(first_cfd, false, token, log_buffer)) {
2631 throttled_candidates.push_back(first_cfd);
2632 continue;
2633 }
2634 cfd = first_cfd;
2635 cfd->set_queued_for_compaction(false);
2636 break;
2637 }
2638 // Add throttled compaction candidates back to queue in the original order.
2639 for (auto iter = throttled_candidates.rbegin();
2640 iter != throttled_candidates.rend(); ++iter) {
2641 compaction_queue_.push_front(*iter);
2642 }
2643 return cfd;
2644}
2645
11fdf7f2
TL
2646void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req,
2647 FlushReason flush_reason) {
1e59de90 2648 mutex_.AssertHeld();
11fdf7f2
TL
2649 if (flush_req.empty()) {
2650 return;
2651 }
1e59de90
TL
2652 if (!immutable_db_options_.atomic_flush) {
2653 // For the non-atomic flush case, we never schedule multiple column
2654 // families in the same flush request.
2655 assert(flush_req.size() == 1);
2656 ColumnFamilyData* cfd = flush_req[0].first;
2657 assert(cfd);
2658
2659 if (!cfd->queued_for_flush() && cfd->imm()->IsFlushPending()) {
2660 cfd->Ref();
2661 cfd->set_queued_for_flush(true);
2662 cfd->SetFlushReason(flush_reason);
2663 ++unscheduled_flushes_;
2664 flush_queue_.push_back(flush_req);
2665 }
2666 } else {
2667 for (auto& iter : flush_req) {
2668 ColumnFamilyData* cfd = iter.first;
2669 cfd->Ref();
2670 cfd->SetFlushReason(flush_reason);
2671 }
2672 ++unscheduled_flushes_;
2673 flush_queue_.push_back(flush_req);
7c673cae
FG
2674 }
2675}
2676
2677void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
1e59de90 2678 mutex_.AssertHeld();
11fdf7f2 2679 if (!cfd->queued_for_compaction() && cfd->NeedsCompaction()) {
7c673cae
FG
2680 AddToCompactionQueue(cfd);
2681 ++unscheduled_compactions_;
2682 }
2683}
2684
11fdf7f2
TL
2685void DBImpl::SchedulePendingPurge(std::string fname, std::string dir_to_sync,
2686 FileType type, uint64_t number, int job_id) {
7c673cae 2687 mutex_.AssertHeld();
11fdf7f2 2688 PurgeFileInfo file_info(fname, dir_to_sync, type, number, job_id);
f67539c2 2689 purge_files_.insert({{number, std::move(file_info)}});
7c673cae
FG
2690}
2691
494da23a
TL
2692void DBImpl::BGWorkFlush(void* arg) {
2693 FlushThreadArg fta = *(reinterpret_cast<FlushThreadArg*>(arg));
2694 delete reinterpret_cast<FlushThreadArg*>(arg);
2695
2696 IOSTATS_SET_THREAD_POOL_ID(fta.thread_pri_);
7c673cae 2697 TEST_SYNC_POINT("DBImpl::BGWorkFlush");
20effc67 2698 static_cast_with_check<DBImpl>(fta.db_)->BackgroundCallFlush(fta.thread_pri_);
7c673cae
FG
2699 TEST_SYNC_POINT("DBImpl::BGWorkFlush:done");
2700}
2701
2702void DBImpl::BGWorkCompaction(void* arg) {
2703 CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
2704 delete reinterpret_cast<CompactionArg*>(arg);
2705 IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW);
2706 TEST_SYNC_POINT("DBImpl::BGWorkCompaction");
11fdf7f2
TL
2707 auto prepicked_compaction =
2708 static_cast<PrepickedCompaction*>(ca.prepicked_compaction);
20effc67 2709 static_cast_with_check<DBImpl>(ca.db)->BackgroundCallCompaction(
11fdf7f2
TL
2710 prepicked_compaction, Env::Priority::LOW);
2711 delete prepicked_compaction;
2712}
2713
2714void DBImpl::BGWorkBottomCompaction(void* arg) {
2715 CompactionArg ca = *(static_cast<CompactionArg*>(arg));
2716 delete static_cast<CompactionArg*>(arg);
2717 IOSTATS_SET_THREAD_POOL_ID(Env::Priority::BOTTOM);
2718 TEST_SYNC_POINT("DBImpl::BGWorkBottomCompaction");
2719 auto* prepicked_compaction = ca.prepicked_compaction;
1e59de90 2720 assert(prepicked_compaction && prepicked_compaction->compaction);
11fdf7f2
TL
2721 ca.db->BackgroundCallCompaction(prepicked_compaction, Env::Priority::BOTTOM);
2722 delete prepicked_compaction;
7c673cae
FG
2723}
2724
2725void DBImpl::BGWorkPurge(void* db) {
2726 IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
2727 TEST_SYNC_POINT("DBImpl::BGWorkPurge:start");
2728 reinterpret_cast<DBImpl*>(db)->BackgroundCallPurge();
2729 TEST_SYNC_POINT("DBImpl::BGWorkPurge:end");
2730}
2731
494da23a 2732void DBImpl::UnscheduleCompactionCallback(void* arg) {
1e59de90
TL
2733 CompactionArg* ca_ptr = reinterpret_cast<CompactionArg*>(arg);
2734 Env::Priority compaction_pri = ca_ptr->compaction_pri_;
2735 if (Env::Priority::BOTTOM == compaction_pri) {
2736 // Decrement bg_bottom_compaction_scheduled_ if priority is BOTTOM
2737 ca_ptr->db->bg_bottom_compaction_scheduled_--;
2738 } else if (Env::Priority::LOW == compaction_pri) {
2739 // Decrement bg_compaction_scheduled_ if priority is LOW
2740 ca_ptr->db->bg_compaction_scheduled_--;
2741 }
2742 CompactionArg ca = *(ca_ptr);
7c673cae 2743 delete reinterpret_cast<CompactionArg*>(arg);
11fdf7f2 2744 if (ca.prepicked_compaction != nullptr) {
1e59de90
TL
2745 // if it's a manual compaction, set status to ManualCompactionPaused
2746 if (ca.prepicked_compaction->manual_compaction_state) {
2747 ca.prepicked_compaction->manual_compaction_state->done = true;
2748 ca.prepicked_compaction->manual_compaction_state->status =
2749 Status::Incomplete(Status::SubCode::kManualCompactionPaused);
2750 }
11fdf7f2 2751 if (ca.prepicked_compaction->compaction != nullptr) {
1e59de90
TL
2752 ca.prepicked_compaction->compaction->ReleaseCompactionFiles(
2753 Status::Incomplete(Status::SubCode::kManualCompactionPaused));
11fdf7f2
TL
2754 delete ca.prepicked_compaction->compaction;
2755 }
2756 delete ca.prepicked_compaction;
7c673cae 2757 }
494da23a
TL
2758 TEST_SYNC_POINT("DBImpl::UnscheduleCompactionCallback");
2759}
2760
2761void DBImpl::UnscheduleFlushCallback(void* arg) {
1e59de90
TL
2762 // Decrement bg_flush_scheduled_ in flush callback
2763 reinterpret_cast<FlushThreadArg*>(arg)->db_->bg_flush_scheduled_--;
2764 Env::Priority flush_pri = reinterpret_cast<FlushThreadArg*>(arg)->thread_pri_;
2765 if (Env::Priority::LOW == flush_pri) {
2766 TEST_SYNC_POINT("DBImpl::UnscheduleLowFlushCallback");
2767 } else if (Env::Priority::HIGH == flush_pri) {
2768 TEST_SYNC_POINT("DBImpl::UnscheduleHighFlushCallback");
2769 }
494da23a
TL
2770 delete reinterpret_cast<FlushThreadArg*>(arg);
2771 TEST_SYNC_POINT("DBImpl::UnscheduleFlushCallback");
7c673cae
FG
2772}
2773
2774Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
494da23a
TL
2775 LogBuffer* log_buffer, FlushReason* reason,
2776 Env::Priority thread_pri) {
7c673cae
FG
2777 mutex_.AssertHeld();
2778
11fdf7f2
TL
2779 Status status;
2780 *reason = FlushReason::kOthers;
2781 // If BG work is stopped due to an error, but a recovery is in progress,
2782 // that means this flush is part of the recovery. So allow it to go through
2783 if (!error_handler_.IsBGWorkStopped()) {
2784 if (shutting_down_.load(std::memory_order_acquire)) {
2785 status = Status::ShutdownInProgress();
2786 }
2787 } else if (!error_handler_.IsRecoveryInProgress()) {
2788 status = error_handler_.GetBGError();
7c673cae
FG
2789 }
2790
2791 if (!status.ok()) {
2792 return status;
2793 }
2794
11fdf7f2
TL
2795 autovector<BGFlushArg> bg_flush_args;
2796 std::vector<SuperVersionContext>& superversion_contexts =
2797 job_context->superversion_contexts;
f67539c2 2798 autovector<ColumnFamilyData*> column_families_not_to_flush;
7c673cae
FG
2799 while (!flush_queue_.empty()) {
2800 // This cfd is already referenced
11fdf7f2
TL
2801 const FlushRequest& flush_req = PopFirstFromFlushQueue();
2802 superversion_contexts.clear();
2803 superversion_contexts.reserve(flush_req.size());
2804
2805 for (const auto& iter : flush_req) {
2806 ColumnFamilyData* cfd = iter.first;
1e59de90
TL
2807 if (cfd->GetMempurgeUsed()) {
2808 // If imm() contains silent memtables (e.g.: because
2809 // MemPurge was activated), requesting a flush will
2810 // mark the imm_needed as true.
2811 cfd->imm()->FlushRequested();
2812 }
2813
11fdf7f2
TL
2814 if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) {
2815 // can't flush this CF, try next one
f67539c2 2816 column_families_not_to_flush.push_back(cfd);
11fdf7f2 2817 continue;
7c673cae 2818 }
11fdf7f2
TL
2819 superversion_contexts.emplace_back(SuperVersionContext(true));
2820 bg_flush_args.emplace_back(cfd, iter.second,
2821 &(superversion_contexts.back()));
2822 }
2823 if (!bg_flush_args.empty()) {
2824 break;
7c673cae 2825 }
7c673cae
FG
2826 }
2827
11fdf7f2
TL
2828 if (!bg_flush_args.empty()) {
2829 auto bg_job_limits = GetBGJobLimits();
2830 for (const auto& arg : bg_flush_args) {
2831 ColumnFamilyData* cfd = arg.cfd_;
2832 ROCKS_LOG_BUFFER(
2833 log_buffer,
2834 "Calling FlushMemTableToOutputFile with column "
2835 "family [%s], flush slots available %d, compaction slots available "
2836 "%d, "
2837 "flush slots scheduled %d, compaction slots scheduled %d",
2838 cfd->GetName().c_str(), bg_job_limits.max_flushes,
2839 bg_job_limits.max_compactions, bg_flush_scheduled_,
2840 bg_compaction_scheduled_);
2841 }
2842 status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress,
494da23a 2843 job_context, log_buffer, thread_pri);
f67539c2 2844 TEST_SYNC_POINT("DBImpl::BackgroundFlush:BeforeFlush");
11fdf7f2
TL
2845 // All the CFDs in the FlushReq must have the same flush reason, so just
2846 // grab the first one
2847 *reason = bg_flush_args[0].cfd_->GetFlushReason();
2848 for (auto& arg : bg_flush_args) {
2849 ColumnFamilyData* cfd = arg.cfd_;
f67539c2 2850 if (cfd->UnrefAndTryDelete()) {
11fdf7f2
TL
2851 arg.cfd_ = nullptr;
2852 }
7c673cae
FG
2853 }
2854 }
f67539c2
TL
2855 for (auto cfd : column_families_not_to_flush) {
2856 cfd->UnrefAndTryDelete();
2857 }
7c673cae
FG
2858 return status;
2859}
2860
494da23a 2861void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
7c673cae
FG
2862 bool made_progress = false;
2863 JobContext job_context(next_job_id_.fetch_add(1), true);
7c673cae 2864
1e59de90 2865 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCallFlush:start", nullptr);
7c673cae
FG
2866
2867 LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
2868 immutable_db_options_.info_log.get());
1e59de90
TL
2869 TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:Start:1");
2870 TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:Start:2");
7c673cae
FG
2871 {
2872 InstrumentedMutexLock l(&mutex_);
11fdf7f2 2873 assert(bg_flush_scheduled_);
7c673cae
FG
2874 num_running_flushes_++;
2875
f67539c2
TL
2876 std::unique_ptr<std::list<uint64_t>::iterator>
2877 pending_outputs_inserted_elem(new std::list<uint64_t>::iterator(
2878 CaptureCurrentFileNumberInPendingOutputs()));
11fdf7f2 2879 FlushReason reason;
7c673cae 2880
494da23a
TL
2881 Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer,
2882 &reason, thread_pri);
f67539c2 2883 if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped() &&
11fdf7f2 2884 reason != FlushReason::kErrorRecovery) {
7c673cae
FG
2885 // Wait a little bit before retrying background flush in
2886 // case this is an environmental problem and we do not want to
2887 // chew up resources for failed flushes for the duration of
2888 // the problem.
2889 uint64_t error_cnt =
11fdf7f2 2890 default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
7c673cae
FG
2891 bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
2892 mutex_.Unlock();
2893 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2894 "Waiting after background flush error: %s"
2895 "Accumulated background error counts: %" PRIu64,
2896 s.ToString().c_str(), error_cnt);
2897 log_buffer.FlushBufferToLog();
2898 LogFlush(immutable_db_options_.info_log);
1e59de90 2899 immutable_db_options_.clock->SleepForMicroseconds(1000000);
7c673cae
FG
2900 mutex_.Lock();
2901 }
2902
494da23a 2903 TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FlushFinish:0");
7c673cae
FG
2904 ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
2905
2906 // If flush failed, we want to delete all temporary files that we might have
2907 // created. Thus, we force full scan in FindObsoleteFiles()
f67539c2
TL
2908 FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
2909 !s.IsColumnFamilyDropped());
7c673cae 2910 // delete unnecessary files if any, this is done outside the mutex
11fdf7f2
TL
2911 if (job_context.HaveSomethingToClean() ||
2912 job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
7c673cae 2913 mutex_.Unlock();
11fdf7f2 2914 TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FilesFound");
7c673cae
FG
2915 // Have to flush the info logs before bg_flush_scheduled_--
2916 // because if bg_flush_scheduled_ becomes 0 and the lock is
2917 // released, the deconstructor of DB can kick in and destroy all the
2918 // states of DB so info_log might not be available after that point.
2919 // It also applies to access other states that DB owns.
2920 log_buffer.FlushBufferToLog();
2921 if (job_context.HaveSomethingToDelete()) {
2922 PurgeObsoleteFiles(job_context);
2923 }
2924 job_context.Clean();
2925 mutex_.Lock();
2926 }
494da23a 2927 TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:ContextCleanedUp");
7c673cae
FG
2928
2929 assert(num_running_flushes_ > 0);
2930 num_running_flushes_--;
2931 bg_flush_scheduled_--;
2932 // See if there's more work to be done
2933 MaybeScheduleFlushOrCompaction();
494da23a 2934 atomic_flush_install_cv_.SignalAll();
7c673cae
FG
2935 bg_cv_.SignalAll();
2936 // IMPORTANT: there should be no code after calling SignalAll. This call may
2937 // signal the DB destructor that it's OK to proceed with destruction. In
2938 // that case, all DB variables will be dealloacated and referencing them
2939 // will cause trouble.
2940 }
2941}
2942
11fdf7f2
TL
2943void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
2944 Env::Priority bg_thread_pri) {
7c673cae 2945 bool made_progress = false;
7c673cae
FG
2946 JobContext job_context(next_job_id_.fetch_add(1), true);
2947 TEST_SYNC_POINT("BackgroundCallCompaction:0");
7c673cae
FG
2948 LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
2949 immutable_db_options_.info_log.get());
2950 {
2951 InstrumentedMutexLock l(&mutex_);
2952
2953 // This call will unlock/lock the mutex to wait for current running
2954 // IngestExternalFile() calls to finish.
2955 WaitForIngestFile();
2956
2957 num_running_compactions_++;
2958
f67539c2
TL
2959 std::unique_ptr<std::list<uint64_t>::iterator>
2960 pending_outputs_inserted_elem(new std::list<uint64_t>::iterator(
2961 CaptureCurrentFileNumberInPendingOutputs()));
7c673cae 2962
11fdf7f2
TL
2963 assert((bg_thread_pri == Env::Priority::BOTTOM &&
2964 bg_bottom_compaction_scheduled_) ||
2965 (bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_));
2966 Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer,
494da23a 2967 prepicked_compaction, bg_thread_pri);
7c673cae 2968 TEST_SYNC_POINT("BackgroundCallCompaction:1");
494da23a
TL
2969 if (s.IsBusy()) {
2970 bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
2971 mutex_.Unlock();
1e59de90
TL
2972 immutable_db_options_.clock->SleepForMicroseconds(
2973 10000); // prevent hot loop
494da23a 2974 mutex_.Lock();
f67539c2
TL
2975 } else if (!s.ok() && !s.IsShutdownInProgress() &&
2976 !s.IsManualCompactionPaused() && !s.IsColumnFamilyDropped()) {
7c673cae
FG
2977 // Wait a little bit before retrying background compaction in
2978 // case this is an environmental problem and we do not want to
2979 // chew up resources for failed compactions for the duration of
2980 // the problem.
2981 uint64_t error_cnt =
2982 default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
2983 bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
2984 mutex_.Unlock();
2985 log_buffer.FlushBufferToLog();
2986 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2987 "Waiting after background compaction error: %s, "
2988 "Accumulated background error counts: %" PRIu64,
2989 s.ToString().c_str(), error_cnt);
2990 LogFlush(immutable_db_options_.info_log);
1e59de90 2991 immutable_db_options_.clock->SleepForMicroseconds(1000000);
7c673cae 2992 mutex_.Lock();
f67539c2 2993 } else if (s.IsManualCompactionPaused()) {
1e59de90 2994 assert(prepicked_compaction);
f67539c2
TL
2995 ManualCompactionState* m = prepicked_compaction->manual_compaction_state;
2996 assert(m);
2997 ROCKS_LOG_BUFFER(&log_buffer, "[%s] [JOB %d] Manual compaction paused",
2998 m->cfd->GetName().c_str(), job_context.job_id);
7c673cae
FG
2999 }
3000
3001 ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
3002
1e59de90
TL
3003 // If compaction failed, we want to delete all temporary files that we
3004 // might have created (they might not be all recorded in job_context in
3005 // case of a failure). Thus, we force full scan in FindObsoleteFiles()
f67539c2
TL
3006 FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
3007 !s.IsManualCompactionPaused() &&
1e59de90
TL
3008 !s.IsColumnFamilyDropped() &&
3009 !s.IsBusy());
11fdf7f2 3010 TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles");
7c673cae
FG
3011
3012 // delete unnecessary files if any, this is done outside the mutex
11fdf7f2
TL
3013 if (job_context.HaveSomethingToClean() ||
3014 job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
7c673cae
FG
3015 mutex_.Unlock();
3016 // Have to flush the info logs before bg_compaction_scheduled_--
3017 // because if bg_flush_scheduled_ becomes 0 and the lock is
3018 // released, the deconstructor of DB can kick in and destroy all the
3019 // states of DB so info_log might not be available after that point.
3020 // It also applies to access other states that DB owns.
3021 log_buffer.FlushBufferToLog();
3022 if (job_context.HaveSomethingToDelete()) {
3023 PurgeObsoleteFiles(job_context);
11fdf7f2 3024 TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles");
7c673cae
FG
3025 }
3026 job_context.Clean();
3027 mutex_.Lock();
3028 }
3029
3030 assert(num_running_compactions_ > 0);
3031 num_running_compactions_--;
1e59de90 3032
11fdf7f2
TL
3033 if (bg_thread_pri == Env::Priority::LOW) {
3034 bg_compaction_scheduled_--;
3035 } else {
3036 assert(bg_thread_pri == Env::Priority::BOTTOM);
3037 bg_bottom_compaction_scheduled_--;
3038 }
7c673cae 3039
7c673cae
FG
3040 // See if there's more work to be done
3041 MaybeScheduleFlushOrCompaction();
1e59de90
TL
3042
3043 if (prepicked_compaction != nullptr &&
3044 prepicked_compaction->task_token != nullptr) {
3045 // Releasing task tokens affects (and asserts on) the DB state, so
3046 // must be done before we potentially signal the DB close process to
3047 // proceed below.
3048 prepicked_compaction->task_token.reset();
3049 }
3050
11fdf7f2
TL
3051 if (made_progress ||
3052 (bg_compaction_scheduled_ == 0 &&
3053 bg_bottom_compaction_scheduled_ == 0) ||
3054 HasPendingManualCompaction() || unscheduled_compactions_ == 0) {
7c673cae
FG
3055 // signal if
3056 // * made_progress -- need to wakeup DelayWrite
11fdf7f2 3057 // * bg_{bottom,}_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
7c673cae
FG
3058 // * HasPendingManualCompaction -- need to wakeup RunManualCompaction
3059 // If none of this is true, there is no need to signal since nobody is
3060 // waiting for it
3061 bg_cv_.SignalAll();
3062 }
3063 // IMPORTANT: there should be no code after calling SignalAll. This call may
3064 // signal the DB destructor that it's OK to proceed with destruction. In
3065 // that case, all DB variables will be dealloacated and referencing them
3066 // will cause trouble.
3067 }
3068}
3069
3070Status DBImpl::BackgroundCompaction(bool* made_progress,
3071 JobContext* job_context,
11fdf7f2 3072 LogBuffer* log_buffer,
494da23a
TL
3073 PrepickedCompaction* prepicked_compaction,
3074 Env::Priority thread_pri) {
11fdf7f2
TL
3075 ManualCompactionState* manual_compaction =
3076 prepicked_compaction == nullptr
3077 ? nullptr
3078 : prepicked_compaction->manual_compaction_state;
7c673cae
FG
3079 *made_progress = false;
3080 mutex_.AssertHeld();
3081 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Start");
3082
3083 bool is_manual = (manual_compaction != nullptr);
494da23a 3084 std::unique_ptr<Compaction> c;
11fdf7f2
TL
3085 if (prepicked_compaction != nullptr &&
3086 prepicked_compaction->compaction != nullptr) {
3087 c.reset(prepicked_compaction->compaction);
3088 }
3089 bool is_prepicked = is_manual || c;
7c673cae
FG
3090
3091 // (manual_compaction->in_progress == false);
3092 bool trivial_move_disallowed =
3093 is_manual && manual_compaction->disallow_trivial_move;
3094
3095 CompactionJobStats compaction_job_stats;
11fdf7f2
TL
3096 Status status;
3097 if (!error_handler_.IsBGWorkStopped()) {
3098 if (shutting_down_.load(std::memory_order_acquire)) {
3099 status = Status::ShutdownInProgress();
f67539c2 3100 } else if (is_manual &&
1e59de90 3101 manual_compaction->canceled.load(std::memory_order_acquire)) {
f67539c2 3102 status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
11fdf7f2
TL
3103 }
3104 } else {
3105 status = error_handler_.GetBGError();
3106 // If we get here, it means a hard error happened after this compaction
3107 // was scheduled by MaybeScheduleFlushOrCompaction(), but before it got
3108 // a chance to execute. Since we didn't pop a cfd from the compaction
3109 // queue, increment unscheduled_compactions_
3110 unscheduled_compactions_++;
7c673cae
FG
3111 }
3112
3113 if (!status.ok()) {
3114 if (is_manual) {
3115 manual_compaction->status = status;
3116 manual_compaction->done = true;
3117 manual_compaction->in_progress = false;
7c673cae
FG
3118 manual_compaction = nullptr;
3119 }
494da23a
TL
3120 if (c) {
3121 c->ReleaseCompactionFiles(status);
3122 c.reset();
3123 }
7c673cae
FG
3124 return status;
3125 }
3126
3127 if (is_manual) {
3128 // another thread cannot pick up the same work
3129 manual_compaction->in_progress = true;
3130 }
3131
1e59de90
TL
3132 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:InProgress");
3133
494da23a
TL
3134 std::unique_ptr<TaskLimiterToken> task_token;
3135
7c673cae
FG
3136 // InternalKey manual_end_storage;
3137 // InternalKey* manual_end = &manual_end_storage;
11fdf7f2 3138 bool sfm_reserved_compact_space = false;
7c673cae 3139 if (is_manual) {
11fdf7f2 3140 ManualCompactionState* m = manual_compaction;
7c673cae 3141 assert(m->in_progress);
7c673cae
FG
3142 if (!c) {
3143 m->done = true;
3144 m->manual_end = nullptr;
20effc67
TL
3145 ROCKS_LOG_BUFFER(
3146 log_buffer,
3147 "[%s] Manual compaction from level-%d from %s .. "
3148 "%s; nothing to do\n",
3149 m->cfd->GetName().c_str(), m->input_level,
3150 (m->begin ? m->begin->DebugString(true).c_str() : "(begin)"),
3151 (m->end ? m->end->DebugString(true).c_str() : "(end)"));
7c673cae 3152 } else {
11fdf7f2
TL
3153 // First check if we have enough room to do the compaction
3154 bool enough_room = EnoughRoomForCompaction(
3155 m->cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer);
3156
3157 if (!enough_room) {
3158 // Then don't do the compaction
3159 c->ReleaseCompactionFiles(status);
3160 c.reset();
3161 // m's vars will get set properly at the end of this function,
3162 // as long as status == CompactionTooLarge
3163 status = Status::CompactionTooLarge();
3164 } else {
3165 ROCKS_LOG_BUFFER(
3166 log_buffer,
3167 "[%s] Manual compaction from level-%d to level-%d from %s .. "
3168 "%s; will stop at %s\n",
3169 m->cfd->GetName().c_str(), m->input_level, c->output_level(),
20effc67
TL
3170 (m->begin ? m->begin->DebugString(true).c_str() : "(begin)"),
3171 (m->end ? m->end->DebugString(true).c_str() : "(end)"),
11fdf7f2
TL
3172 ((m->done || m->manual_end == nullptr)
3173 ? "(end)"
20effc67 3174 : m->manual_end->DebugString(true).c_str()));
11fdf7f2
TL
3175 }
3176 }
3177 } else if (!is_prepicked && !compaction_queue_.empty()) {
3178 if (HasExclusiveManualCompaction()) {
3179 // Can't compact right now, but try again later
3180 TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict");
3181
3182 // Stay in the compaction queue.
3183 unscheduled_compactions_++;
3184
3185 return Status::OK();
3186 }
3187
494da23a
TL
3188 auto cfd = PickCompactionFromQueue(&task_token, log_buffer);
3189 if (cfd == nullptr) {
3190 // Can't find any executable task from the compaction queue.
3191 // All tasks have been throttled by compaction thread limiter.
3192 ++unscheduled_compactions_;
3193 return Status::Busy();
3194 }
3195
7c673cae
FG
3196 // We unreference here because the following code will take a Ref() on
3197 // this cfd if it is going to use it (Compaction class holds a
3198 // reference).
3199 // This will all happen under a mutex so we don't have to be afraid of
3200 // somebody else deleting it.
f67539c2 3201 if (cfd->UnrefAndTryDelete()) {
7c673cae
FG
3202 // This was the last reference of the column family, so no need to
3203 // compact.
3204 return Status::OK();
3205 }
3206
7c673cae
FG
3207 // Pick up latest mutable CF Options and use it throughout the
3208 // compaction job
3209 // Compaction makes a copy of the latest MutableCFOptions. It should be used
3210 // throughout the compaction procedure to make sure consistency. It will
3211 // eventually be installed into SuperVersion
3212 auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
3213 if (!mutable_cf_options->disable_auto_compactions && !cfd->IsDropped()) {
3214 // NOTE: try to avoid unnecessary copy of MutableCFOptions if
3215 // compaction is not necessary. Need to make sure mutex is held
3216 // until we make a copy in the following code
3217 TEST_SYNC_POINT("DBImpl::BackgroundCompaction():BeforePickCompaction");
20effc67
TL
3218 c.reset(cfd->PickCompaction(*mutable_cf_options, mutable_db_options_,
3219 log_buffer));
7c673cae 3220 TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction");
11fdf7f2 3221
7c673cae 3222 if (c != nullptr) {
11fdf7f2
TL
3223 bool enough_room = EnoughRoomForCompaction(
3224 cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer);
3225
3226 if (!enough_room) {
3227 // Then don't do the compaction
3228 c->ReleaseCompactionFiles(status);
3229 c->column_family_data()
3230 ->current()
3231 ->storage_info()
1e59de90 3232 ->ComputeCompactionScore(*(c->immutable_options()),
11fdf7f2 3233 *(c->mutable_cf_options()));
7c673cae
FG
3234 AddToCompactionQueue(cfd);
3235 ++unscheduled_compactions_;
11fdf7f2
TL
3236
3237 c.reset();
3238 // Don't need to sleep here, because BackgroundCallCompaction
3239 // will sleep if !s.ok()
3240 status = Status::CompactionTooLarge();
3241 } else {
3242 // update statistics
1e59de90
TL
3243 size_t num_files = 0;
3244 for (auto& each_level : *c->inputs()) {
3245 num_files += each_level.files.size();
3246 }
3247 RecordInHistogram(stats_, NUM_FILES_IN_SINGLE_COMPACTION, num_files);
3248
11fdf7f2
TL
3249 // There are three things that can change compaction score:
3250 // 1) When flush or compaction finish. This case is covered by
3251 // InstallSuperVersionAndScheduleWork
3252 // 2) When MutableCFOptions changes. This case is also covered by
3253 // InstallSuperVersionAndScheduleWork, because this is when the new
3254 // options take effect.
3255 // 3) When we Pick a new compaction, we "remove" those files being
3256 // compacted from the calculation, which then influences compaction
3257 // score. Here we check if we need the new compaction even without the
3258 // files that are currently being compacted. If we need another
3259 // compaction, we might be able to execute it in parallel, so we add
3260 // it to the queue and schedule a new thread.
3261 if (cfd->NeedsCompaction()) {
3262 // Yes, we need more compactions!
3263 AddToCompactionQueue(cfd);
3264 ++unscheduled_compactions_;
3265 MaybeScheduleFlushOrCompaction();
3266 }
7c673cae
FG
3267 }
3268 }
3269 }
3270 }
3271
20effc67 3272 IOStatus io_s;
7c673cae
FG
3273 if (!c) {
3274 // Nothing to do
3275 ROCKS_LOG_BUFFER(log_buffer, "Compaction nothing to do");
3276 } else if (c->deletion_compaction()) {
3277 // TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old
3278 // file if there is alive snapshot pointing to it
494da23a
TL
3279 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
3280 c->column_family_data());
7c673cae 3281 assert(c->num_input_files(1) == 0);
7c673cae
FG
3282 assert(c->column_family_data()->ioptions()->compaction_style ==
3283 kCompactionStyleFIFO);
3284
3285 compaction_job_stats.num_input_files = c->num_input_files(0);
3286
494da23a
TL
3287 NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
3288 compaction_job_stats, job_context->job_id);
3289
7c673cae
FG
3290 for (const auto& f : *c->inputs(0)) {
3291 c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
3292 }
3293 status = versions_->LogAndApply(c->column_family_data(),
3294 *c->mutable_cf_options(), c->edit(),
3295 &mutex_, directories_.GetDbDir());
20effc67 3296 io_s = versions_->io_status();
494da23a
TL
3297 InstallSuperVersionAndScheduleWork(c->column_family_data(),
3298 &job_context->superversion_contexts[0],
3299 *c->mutable_cf_options());
7c673cae
FG
3300 ROCKS_LOG_BUFFER(log_buffer, "[%s] Deleted %d files\n",
3301 c->column_family_data()->GetName().c_str(),
3302 c->num_input_files(0));
3303 *made_progress = true;
494da23a
TL
3304 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
3305 c->column_family_data());
7c673cae
FG
3306 } else if (!trivial_move_disallowed && c->IsTrivialMove()) {
3307 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:TrivialMove");
494da23a
TL
3308 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
3309 c->column_family_data());
7c673cae
FG
3310 // Instrument for event update
3311 // TODO(yhchiang): add op details for showing trivial-move.
3312 ThreadStatusUtil::SetColumnFamily(
3313 c->column_family_data(), c->column_family_data()->ioptions()->env,
3314 immutable_db_options_.enable_thread_tracking);
3315 ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
3316
3317 compaction_job_stats.num_input_files = c->num_input_files(0);
3318
494da23a
TL
3319 NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
3320 compaction_job_stats, job_context->job_id);
3321
7c673cae
FG
3322 // Move files to next level
3323 int32_t moved_files = 0;
3324 int64_t moved_bytes = 0;
3325 for (unsigned int l = 0; l < c->num_input_levels(); l++) {
3326 if (c->level(l) == c->output_level()) {
3327 continue;
3328 }
3329 for (size_t i = 0; i < c->num_input_files(l); i++) {
3330 FileMetaData* f = c->input(l, i);
3331 c->edit()->DeleteFile(c->level(l), f->fd.GetNumber());
1e59de90
TL
3332 c->edit()->AddFile(
3333 c->output_level(), f->fd.GetNumber(), f->fd.GetPathId(),
3334 f->fd.GetFileSize(), f->smallest, f->largest, f->fd.smallest_seqno,
3335 f->fd.largest_seqno, f->marked_for_compaction, f->temperature,
3336 f->oldest_blob_file_number, f->oldest_ancester_time,
3337 f->file_creation_time, f->file_checksum, f->file_checksum_func_name,
3338 f->unique_id);
11fdf7f2
TL
3339
3340 ROCKS_LOG_BUFFER(
3341 log_buffer,
3342 "[%s] Moving #%" PRIu64 " to level-%d %" PRIu64 " bytes\n",
3343 c->column_family_data()->GetName().c_str(), f->fd.GetNumber(),
3344 c->output_level(), f->fd.GetFileSize());
7c673cae
FG
3345 ++moved_files;
3346 moved_bytes += f->fd.GetFileSize();
3347 }
3348 }
1e59de90
TL
3349 if (c->compaction_reason() == CompactionReason::kLevelMaxLevelSize &&
3350 c->immutable_options()->compaction_pri == kRoundRobin) {
3351 int start_level = c->start_level();
3352 if (start_level > 0) {
3353 auto vstorage = c->input_version()->storage_info();
3354 c->edit()->AddCompactCursor(
3355 start_level,
3356 vstorage->GetNextCompactCursor(start_level, c->num_input_files(0)));
3357 }
3358 }
7c673cae
FG
3359 status = versions_->LogAndApply(c->column_family_data(),
3360 *c->mutable_cf_options(), c->edit(),
3361 &mutex_, directories_.GetDbDir());
20effc67 3362 io_s = versions_->io_status();
7c673cae 3363 // Use latest MutableCFOptions
494da23a
TL
3364 InstallSuperVersionAndScheduleWork(c->column_family_data(),
3365 &job_context->superversion_contexts[0],
3366 *c->mutable_cf_options());
7c673cae
FG
3367
3368 VersionStorageInfo::LevelSummaryStorage tmp;
3369 c->column_family_data()->internal_stats()->IncBytesMoved(c->output_level(),
3370 moved_bytes);
3371 {
3372 event_logger_.LogToBuffer(log_buffer)
3373 << "job" << job_context->job_id << "event"
3374 << "trivial_move"
3375 << "destination_level" << c->output_level() << "files" << moved_files
3376 << "total_files_size" << moved_bytes;
3377 }
3378 ROCKS_LOG_BUFFER(
3379 log_buffer,
3380 "[%s] Moved #%d files to level-%d %" PRIu64 " bytes %s: %s\n",
3381 c->column_family_data()->GetName().c_str(), moved_files,
3382 c->output_level(), moved_bytes, status.ToString().c_str(),
3383 c->column_family_data()->current()->storage_info()->LevelSummary(&tmp));
3384 *made_progress = true;
3385
3386 // Clear Instrument
3387 ThreadStatusUtil::ResetThreadStatus();
494da23a
TL
3388 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
3389 c->column_family_data());
11fdf7f2
TL
3390 } else if (!is_prepicked && c->output_level() > 0 &&
3391 c->output_level() ==
3392 c->column_family_data()
3393 ->current()
3394 ->storage_info()
3395 ->MaxOutputLevel(
3396 immutable_db_options_.allow_ingest_behind) &&
3397 env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
3398 // Forward compactions involving last level to the bottom pool if it exists,
3399 // such that compactions unlikely to contribute to write stalls can be
3400 // delayed or deprioritized.
3401 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:ForwardToBottomPriPool");
3402 CompactionArg* ca = new CompactionArg;
3403 ca->db = this;
1e59de90 3404 ca->compaction_pri_ = Env::Priority::BOTTOM;
11fdf7f2
TL
3405 ca->prepicked_compaction = new PrepickedCompaction;
3406 ca->prepicked_compaction->compaction = c.release();
3407 ca->prepicked_compaction->manual_compaction_state = nullptr;
494da23a
TL
3408 // Transfer requested token, so it doesn't need to do it again.
3409 ca->prepicked_compaction->task_token = std::move(task_token);
11fdf7f2
TL
3410 ++bg_bottom_compaction_scheduled_;
3411 env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, Env::Priority::BOTTOM,
494da23a 3412 this, &DBImpl::UnscheduleCompactionCallback);
7c673cae 3413 } else {
494da23a
TL
3414 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
3415 c->column_family_data());
11fdf7f2
TL
3416 int output_level __attribute__((__unused__));
3417 output_level = c->output_level();
7c673cae
FG
3418 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial",
3419 &output_level);
494da23a 3420 std::vector<SequenceNumber> snapshot_seqs;
7c673cae 3421 SequenceNumber earliest_write_conflict_snapshot;
494da23a
TL
3422 SnapshotChecker* snapshot_checker;
3423 GetSnapshotContext(job_context, &snapshot_seqs,
3424 &earliest_write_conflict_snapshot, &snapshot_checker);
7c673cae 3425 assert(is_snapshot_supported_ || snapshots_.empty());
1e59de90 3426
7c673cae 3427 CompactionJob compaction_job(
11fdf7f2 3428 job_context->job_id, c.get(), immutable_db_options_,
1e59de90
TL
3429 mutable_db_options_, file_options_for_compaction_, versions_.get(),
3430 &shutting_down_, log_buffer, directories_.GetDbDir(),
20effc67
TL
3431 GetDataDir(c->column_family_data(), c->output_path_id()),
3432 GetDataDir(c->column_family_data(), 0), stats_, &mutex_,
3433 &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot,
1e59de90 3434 snapshot_checker, job_context, table_cache_, &event_logger_,
20effc67 3435 c->mutable_cf_options()->paranoid_file_checks,
7c673cae 3436 c->mutable_cf_options()->report_bg_io_stats, dbname_,
20effc67 3437 &compaction_job_stats, thread_pri, io_tracer_,
1e59de90
TL
3438 is_manual ? manual_compaction->canceled
3439 : kManualCompactionCanceledFalse_,
3440 db_id_, db_session_id_, c->column_family_data()->GetFullHistoryTsLow(),
3441 c->trim_ts(), &blob_callback_, &bg_compaction_scheduled_,
3442 &bg_bottom_compaction_scheduled_);
7c673cae
FG
3443 compaction_job.Prepare();
3444
494da23a
TL
3445 NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
3446 compaction_job_stats, job_context->job_id);
7c673cae 3447 mutex_.Unlock();
f67539c2
TL
3448 TEST_SYNC_POINT_CALLBACK(
3449 "DBImpl::BackgroundCompaction:NonTrivial:BeforeRun", nullptr);
20effc67
TL
3450 // Should handle erorr?
3451 compaction_job.Run().PermitUncheckedError();
7c673cae
FG
3452 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun");
3453 mutex_.Lock();
3454
3455 status = compaction_job.Install(*c->mutable_cf_options());
20effc67 3456 io_s = compaction_job.io_status();
7c673cae 3457 if (status.ok()) {
494da23a
TL
3458 InstallSuperVersionAndScheduleWork(c->column_family_data(),
3459 &job_context->superversion_contexts[0],
3460 *c->mutable_cf_options());
7c673cae
FG
3461 }
3462 *made_progress = true;
494da23a
TL
3463 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
3464 c->column_family_data());
7c673cae 3465 }
20effc67
TL
3466
3467 if (status.ok() && !io_s.ok()) {
3468 status = io_s;
3469 } else {
3470 io_s.PermitUncheckedError();
3471 }
3472
7c673cae
FG
3473 if (c != nullptr) {
3474 c->ReleaseCompactionFiles(status);
3475 *made_progress = true;
11fdf7f2
TL
3476
3477#ifndef ROCKSDB_LITE
3478 // Need to make sure SstFileManager does its bookkeeping
3479 auto sfm = static_cast<SstFileManagerImpl*>(
3480 immutable_db_options_.sst_file_manager.get());
3481 if (sfm && sfm_reserved_compact_space) {
3482 sfm->OnCompactionCompletion(c.get());
3483 }
3484#endif // ROCKSDB_LITE
3485
3486 NotifyOnCompactionCompleted(c->column_family_data(), c.get(), status,
3487 compaction_job_stats, job_context->job_id);
7c673cae 3488 }
7c673cae 3489
f67539c2
TL
3490 if (status.ok() || status.IsCompactionTooLarge() ||
3491 status.IsManualCompactionPaused()) {
7c673cae 3492 // Done
f67539c2 3493 } else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) {
7c673cae
FG
3494 // Ignore compaction errors found during shutting down
3495 } else {
3496 ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s",
3497 status.ToString().c_str());
20effc67
TL
3498 if (!io_s.ok()) {
3499 // Error while writing to MANIFEST.
3500 // In fact, versions_->io_status() can also be the result of renaming
3501 // CURRENT file. With current code, it's just difficult to tell. So just
3502 // be pessimistic and try write to a new MANIFEST.
3503 // TODO: distinguish between MANIFEST write and CURRENT renaming
3504 auto err_reason = versions_->io_status().ok()
3505 ? BackgroundErrorReason::kCompaction
3506 : BackgroundErrorReason::kManifestWrite;
1e59de90 3507 error_handler_.SetBGError(io_s, err_reason);
20effc67 3508 } else {
1e59de90 3509 error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
20effc67 3510 }
11fdf7f2
TL
3511 if (c != nullptr && !is_manual && !error_handler_.IsBGWorkStopped()) {
3512 // Put this cfd back in the compaction queue so we can retry after some
3513 // time
3514 auto cfd = c->column_family_data();
3515 assert(cfd != nullptr);
3516 // Since this compaction failed, we need to recompute the score so it
3517 // takes the original input files into account
3518 c->column_family_data()
3519 ->current()
3520 ->storage_info()
1e59de90 3521 ->ComputeCompactionScore(*(c->immutable_options()),
11fdf7f2
TL
3522 *(c->mutable_cf_options()));
3523 if (!cfd->queued_for_compaction()) {
3524 AddToCompactionQueue(cfd);
3525 ++unscheduled_compactions_;
3526 }
7c673cae
FG
3527 }
3528 }
11fdf7f2
TL
3529 // this will unref its input_version and column_family_data
3530 c.reset();
7c673cae
FG
3531
3532 if (is_manual) {
11fdf7f2 3533 ManualCompactionState* m = manual_compaction;
7c673cae
FG
3534 if (!status.ok()) {
3535 m->status = status;
3536 m->done = true;
3537 }
3538 // For universal compaction:
3539 // Because universal compaction always happens at level 0, so one
3540 // compaction will pick up all overlapped files. No files will be
3541 // filtered out due to size limit and left for a successive compaction.
3542 // So we can safely conclude the current compaction.
3543 //
3544 // Also note that, if we don't stop here, then the current compaction
3545 // writes a new file back to level 0, which will be used in successive
3546 // compaction. Hence the manual compaction will never finish.
3547 //
3548 // Stop the compaction if manual_end points to nullptr -- this means
3549 // that we compacted the whole range. manual_end should always point
3550 // to nullptr in case of universal compaction
3551 if (m->manual_end == nullptr) {
3552 m->done = true;
3553 }
3554 if (!m->done) {
3555 // We only compacted part of the requested range. Update *m
3556 // to the range that is left to be compacted.
3557 // Universal and FIFO compactions should always compact the whole range
3558 assert(m->cfd->ioptions()->compaction_style !=
3559 kCompactionStyleUniversal ||
3560 m->cfd->ioptions()->num_levels > 1);
3561 assert(m->cfd->ioptions()->compaction_style != kCompactionStyleFIFO);
3562 m->tmp_storage = *m->manual_end;
3563 m->begin = &m->tmp_storage;
3564 m->incomplete = true;
3565 }
11fdf7f2 3566 m->in_progress = false; // not being processed anymore
7c673cae
FG
3567 }
3568 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Finish");
3569 return status;
3570}
3571
3572bool DBImpl::HasPendingManualCompaction() {
3573 return (!manual_compaction_dequeue_.empty());
3574}
3575
11fdf7f2 3576void DBImpl::AddManualCompaction(DBImpl::ManualCompactionState* m) {
1e59de90 3577 assert(manual_compaction_paused_ == 0);
7c673cae
FG
3578 manual_compaction_dequeue_.push_back(m);
3579}
3580
11fdf7f2 3581void DBImpl::RemoveManualCompaction(DBImpl::ManualCompactionState* m) {
7c673cae 3582 // Remove from queue
11fdf7f2 3583 std::deque<ManualCompactionState*>::iterator it =
7c673cae
FG
3584 manual_compaction_dequeue_.begin();
3585 while (it != manual_compaction_dequeue_.end()) {
3586 if (m == (*it)) {
3587 it = manual_compaction_dequeue_.erase(it);
3588 return;
3589 }
f67539c2 3590 ++it;
7c673cae
FG
3591 }
3592 assert(false);
3593 return;
3594}
3595
11fdf7f2 3596bool DBImpl::ShouldntRunManualCompaction(ManualCompactionState* m) {
7c673cae
FG
3597 if (num_running_ingest_file_ > 0) {
3598 // We need to wait for other IngestExternalFile() calls to finish
3599 // before running a manual compaction.
3600 return true;
3601 }
3602 if (m->exclusive) {
11fdf7f2
TL
3603 return (bg_bottom_compaction_scheduled_ > 0 ||
3604 bg_compaction_scheduled_ > 0);
7c673cae 3605 }
11fdf7f2 3606 std::deque<ManualCompactionState*>::iterator it =
7c673cae
FG
3607 manual_compaction_dequeue_.begin();
3608 bool seen = false;
3609 while (it != manual_compaction_dequeue_.end()) {
3610 if (m == (*it)) {
f67539c2 3611 ++it;
7c673cae
FG
3612 seen = true;
3613 continue;
3614 } else if (MCOverlap(m, (*it)) && (!seen && !(*it)->in_progress)) {
3615 // Consider the other manual compaction *it, conflicts if:
3616 // overlaps with m
3617 // and (*it) is ahead in the queue and is not yet in progress
3618 return true;
3619 }
f67539c2 3620 ++it;
7c673cae
FG
3621 }
3622 return false;
3623}
3624
3625bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) {
3626 // Remove from priority queue
11fdf7f2 3627 std::deque<ManualCompactionState*>::iterator it =
7c673cae
FG
3628 manual_compaction_dequeue_.begin();
3629 while (it != manual_compaction_dequeue_.end()) {
3630 if ((*it)->exclusive) {
3631 return true;
3632 }
3633 if ((cfd == (*it)->cfd) && (!((*it)->in_progress || (*it)->done))) {
3634 // Allow automatic compaction if manual compaction is
11fdf7f2 3635 // in progress
7c673cae
FG
3636 return true;
3637 }
f67539c2 3638 ++it;
7c673cae
FG
3639 }
3640 return false;
3641}
3642
3643bool DBImpl::HasExclusiveManualCompaction() {
3644 // Remove from priority queue
11fdf7f2 3645 std::deque<ManualCompactionState*>::iterator it =
7c673cae
FG
3646 manual_compaction_dequeue_.begin();
3647 while (it != manual_compaction_dequeue_.end()) {
3648 if ((*it)->exclusive) {
3649 return true;
3650 }
f67539c2 3651 ++it;
7c673cae
FG
3652 }
3653 return false;
3654}
3655
11fdf7f2 3656bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) {
7c673cae
FG
3657 if ((m->exclusive) || (m1->exclusive)) {
3658 return true;
3659 }
3660 if (m->cfd != m1->cfd) {
3661 return false;
3662 }
1e59de90 3663 return false;
7c673cae
FG
3664}
3665
494da23a
TL
3666#ifndef ROCKSDB_LITE
3667void DBImpl::BuildCompactionJobInfo(
3668 const ColumnFamilyData* cfd, Compaction* c, const Status& st,
3669 const CompactionJobStats& compaction_job_stats, const int job_id,
3670 const Version* current, CompactionJobInfo* compaction_job_info) const {
3671 assert(compaction_job_info != nullptr);
3672 compaction_job_info->cf_id = cfd->GetID();
3673 compaction_job_info->cf_name = cfd->GetName();
3674 compaction_job_info->status = st;
3675 compaction_job_info->thread_id = env_->GetThreadID();
3676 compaction_job_info->job_id = job_id;
3677 compaction_job_info->base_input_level = c->start_level();
3678 compaction_job_info->output_level = c->output_level();
3679 compaction_job_info->stats = compaction_job_stats;
3680 compaction_job_info->table_properties = c->GetOutputTableProperties();
3681 compaction_job_info->compaction_reason = c->compaction_reason();
3682 compaction_job_info->compression = c->output_compression();
3683 for (size_t i = 0; i < c->num_input_levels(); ++i) {
3684 for (const auto fmd : *c->inputs(i)) {
f67539c2
TL
3685 const FileDescriptor& desc = fmd->fd;
3686 const uint64_t file_number = desc.GetNumber();
1e59de90 3687 auto fn = TableFileName(c->immutable_options()->cf_paths, file_number,
f67539c2 3688 desc.GetPathId());
494da23a 3689 compaction_job_info->input_files.push_back(fn);
f67539c2
TL
3690 compaction_job_info->input_file_infos.push_back(CompactionFileInfo{
3691 static_cast<int>(i), file_number, fmd->oldest_blob_file_number});
494da23a
TL
3692 if (compaction_job_info->table_properties.count(fn) == 0) {
3693 std::shared_ptr<const TableProperties> tp;
3694 auto s = current->GetTableProperties(&tp, fmd, &fn);
3695 if (s.ok()) {
3696 compaction_job_info->table_properties[fn] = tp;
3697 }
3698 }
3699 }
3700 }
3701 for (const auto& newf : c->edit()->GetNewFiles()) {
f67539c2
TL
3702 const FileMetaData& meta = newf.second;
3703 const FileDescriptor& desc = meta.fd;
3704 const uint64_t file_number = desc.GetNumber();
3705 compaction_job_info->output_files.push_back(TableFileName(
1e59de90 3706 c->immutable_options()->cf_paths, file_number, desc.GetPathId()));
f67539c2
TL
3707 compaction_job_info->output_file_infos.push_back(CompactionFileInfo{
3708 newf.first, file_number, meta.oldest_blob_file_number});
494da23a 3709 }
1e59de90
TL
3710 compaction_job_info->blob_compression_type =
3711 c->mutable_cf_options()->blob_compression_type;
3712
3713 // Update BlobFilesInfo.
3714 for (const auto& blob_file : c->edit()->GetBlobFileAdditions()) {
3715 BlobFileAdditionInfo blob_file_addition_info(
3716 BlobFileName(c->immutable_options()->cf_paths.front().path,
3717 blob_file.GetBlobFileNumber()) /*blob_file_path*/,
3718 blob_file.GetBlobFileNumber(), blob_file.GetTotalBlobCount(),
3719 blob_file.GetTotalBlobBytes());
3720 compaction_job_info->blob_file_addition_infos.emplace_back(
3721 std::move(blob_file_addition_info));
3722 }
3723
3724 // Update BlobFilesGarbageInfo.
3725 for (const auto& blob_file : c->edit()->GetBlobFileGarbages()) {
3726 BlobFileGarbageInfo blob_file_garbage_info(
3727 BlobFileName(c->immutable_options()->cf_paths.front().path,
3728 blob_file.GetBlobFileNumber()) /*blob_file_path*/,
3729 blob_file.GetBlobFileNumber(), blob_file.GetGarbageBlobCount(),
3730 blob_file.GetGarbageBlobBytes());
3731 compaction_job_info->blob_file_garbage_infos.emplace_back(
3732 std::move(blob_file_garbage_info));
3733 }
494da23a
TL
3734}
3735#endif
3736
11fdf7f2
TL
3737// SuperVersionContext gets created and destructed outside of the lock --
3738// we use this conveniently to:
7c673cae
FG
3739// * malloc one SuperVersion() outside of the lock -- new_superversion
3740// * delete SuperVersion()s outside of the lock -- superversions_to_free
3741//
3742// However, if InstallSuperVersionAndScheduleWork() gets called twice with the
11fdf7f2 3743// same sv_context, we can't reuse the SuperVersion() that got
7c673cae
FG
3744// malloced because
3745// first call already used it. In that rare case, we take a hit and create a
3746// new SuperVersion() inside of the mutex. We do similar thing
3747// for superversion_to_free
7c673cae 3748
11fdf7f2
TL
3749void DBImpl::InstallSuperVersionAndScheduleWork(
3750 ColumnFamilyData* cfd, SuperVersionContext* sv_context,
494da23a 3751 const MutableCFOptions& mutable_cf_options) {
7c673cae
FG
3752 mutex_.AssertHeld();
3753
3754 // Update max_total_in_memory_state_
3755 size_t old_memtable_size = 0;
3756 auto* old_sv = cfd->GetSuperVersion();
3757 if (old_sv) {
3758 old_memtable_size = old_sv->mutable_cf_options.write_buffer_size *
3759 old_sv->mutable_cf_options.max_write_buffer_number;
3760 }
3761
11fdf7f2
TL
3762 // this branch is unlikely to step in
3763 if (UNLIKELY(sv_context->new_superversion == nullptr)) {
3764 sv_context->NewSuperVersion();
3765 }
1e59de90 3766 cfd->InstallSuperVersion(sv_context, mutable_cf_options);
7c673cae 3767
494da23a
TL
3768 // There may be a small data race here. The snapshot tricking bottommost
3769 // compaction may already be released here. But assuming there will always be
3770 // newer snapshot created and released frequently, the compaction will be
3771 // triggered soon anyway.
3772 bottommost_files_mark_threshold_ = kMaxSequenceNumber;
3773 for (auto* my_cfd : *versions_->GetColumnFamilySet()) {
1e59de90
TL
3774 if (!my_cfd->ioptions()->allow_ingest_behind) {
3775 bottommost_files_mark_threshold_ = std::min(
3776 bottommost_files_mark_threshold_,
3777 my_cfd->current()->storage_info()->bottommost_files_mark_threshold());
3778 }
494da23a
TL
3779 }
3780
7c673cae
FG
3781 // Whenever we install new SuperVersion, we might need to issue new flushes or
3782 // compactions.
7c673cae
FG
3783 SchedulePendingCompaction(cfd);
3784 MaybeScheduleFlushOrCompaction();
3785
3786 // Update max_total_in_memory_state_
11fdf7f2
TL
3787 max_total_in_memory_state_ = max_total_in_memory_state_ - old_memtable_size +
3788 mutable_cf_options.write_buffer_size *
3789 mutable_cf_options.max_write_buffer_number;
3790}
3791
3792// ShouldPurge is called by FindObsoleteFiles when doing a full scan,
f67539c2 3793// and db mutex (mutex_) should already be held.
11fdf7f2
TL
3794// Actually, the current implementation of FindObsoleteFiles with
3795// full_scan=true can issue I/O requests to obtain list of files in
3796// directories, e.g. env_->getChildren while holding db mutex.
11fdf7f2 3797bool DBImpl::ShouldPurge(uint64_t file_number) const {
f67539c2
TL
3798 return files_grabbed_for_purge_.find(file_number) ==
3799 files_grabbed_for_purge_.end() &&
3800 purge_files_.find(file_number) == purge_files_.end();
11fdf7f2
TL
3801}
3802
3803// MarkAsGrabbedForPurge is called by FindObsoleteFiles, and db mutex
3804// (mutex_) should already be held.
3805void DBImpl::MarkAsGrabbedForPurge(uint64_t file_number) {
f67539c2 3806 files_grabbed_for_purge_.insert(file_number);
11fdf7f2
TL
3807}
3808
3809void DBImpl::SetSnapshotChecker(SnapshotChecker* snapshot_checker) {
3810 InstrumentedMutexLock l(&mutex_);
3811 // snapshot_checker_ should only set once. If we need to set it multiple
3812 // times, we need to make sure the old one is not deleted while it is still
3813 // using by a compaction job.
3814 assert(!snapshot_checker_);
3815 snapshot_checker_.reset(snapshot_checker);
7c673cae 3816}
494da23a
TL
3817
3818void DBImpl::GetSnapshotContext(
3819 JobContext* job_context, std::vector<SequenceNumber>* snapshot_seqs,
3820 SequenceNumber* earliest_write_conflict_snapshot,
3821 SnapshotChecker** snapshot_checker_ptr) {
3822 mutex_.AssertHeld();
3823 assert(job_context != nullptr);
3824 assert(snapshot_seqs != nullptr);
3825 assert(earliest_write_conflict_snapshot != nullptr);
3826 assert(snapshot_checker_ptr != nullptr);
3827
3828 *snapshot_checker_ptr = snapshot_checker_.get();
3829 if (use_custom_gc_ && *snapshot_checker_ptr == nullptr) {
3830 *snapshot_checker_ptr = DisableGCSnapshotChecker::Instance();
3831 }
3832 if (*snapshot_checker_ptr != nullptr) {
3833 // If snapshot_checker is used, that means the flush/compaction may
3834 // contain values not visible to snapshot taken after
3835 // flush/compaction job starts. Take a snapshot and it will appear
3836 // in snapshot_seqs and force compaction iterator to consider such
3837 // snapshots.
3838 const Snapshot* job_snapshot =
3839 GetSnapshotImpl(false /*write_conflict_boundary*/, false /*lock*/);
3840 job_context->job_snapshot.reset(new ManagedSnapshot(this, job_snapshot));
3841 }
3842 *snapshot_seqs = snapshots_.GetAll(earliest_write_conflict_snapshot);
3843}
1e59de90
TL
3844
3845Status DBImpl::WaitForCompact(bool wait_unscheduled) {
3846 // Wait until the compaction completes
3847 InstrumentedMutexLock l(&mutex_);
3848 while ((bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
3849 bg_flush_scheduled_ ||
3850 (wait_unscheduled && unscheduled_compactions_)) &&
3851 (error_handler_.GetBGError().ok())) {
3852 bg_cv_.Wait();
3853 }
3854 return error_handler_.GetBGError();
3855}
3856
f67539c2 3857} // namespace ROCKSDB_NAMESPACE