]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_impl/db_impl_compaction_flush.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / db_impl / db_impl_compaction_flush.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_impl/db_impl.h"
10
11 #include <cinttypes>
12
13 #include "db/builder.h"
14 #include "db/error_handler.h"
15 #include "db/event_helpers.h"
16 #include "file/sst_file_manager_impl.h"
17 #include "monitoring/iostats_context_imp.h"
18 #include "monitoring/perf_context_imp.h"
19 #include "monitoring/thread_status_updater.h"
20 #include "monitoring/thread_status_util.h"
21 #include "test_util/sync_point.h"
22 #include "util/cast_util.h"
23 #include "util/concurrent_task_limiter_impl.h"
24
25 namespace ROCKSDB_NAMESPACE {
26
27 bool DBImpl::EnoughRoomForCompaction(
28 ColumnFamilyData* cfd, const std::vector<CompactionInputFiles>& inputs,
29 bool* sfm_reserved_compact_space, LogBuffer* log_buffer) {
30 // Check if we have enough room to do the compaction
31 bool enough_room = true;
32 #ifndef ROCKSDB_LITE
33 auto sfm = static_cast<SstFileManagerImpl*>(
34 immutable_db_options_.sst_file_manager.get());
35 if (sfm) {
36 // Pass the current bg_error_ to SFM so it can decide what checks to
37 // perform. If this DB instance hasn't seen any error yet, the SFM can be
38 // optimistic and not do disk space checks
39 enough_room =
40 sfm->EnoughRoomForCompaction(cfd, inputs, error_handler_.GetBGError());
41 if (enough_room) {
42 *sfm_reserved_compact_space = true;
43 }
44 }
45 #else
46 (void)cfd;
47 (void)inputs;
48 (void)sfm_reserved_compact_space;
49 #endif // ROCKSDB_LITE
50 if (!enough_room) {
51 // Just in case tests want to change the value of enough_room
52 TEST_SYNC_POINT_CALLBACK(
53 "DBImpl::BackgroundCompaction():CancelledCompaction", &enough_room);
54 ROCKS_LOG_BUFFER(log_buffer,
55 "Cancelled compaction because not enough room");
56 RecordTick(stats_, COMPACTION_CANCELLED, 1);
57 }
58 return enough_room;
59 }
60
61 bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force,
62 std::unique_ptr<TaskLimiterToken>* token,
63 LogBuffer* log_buffer) {
64 assert(*token == nullptr);
65 auto limiter = static_cast<ConcurrentTaskLimiterImpl*>(
66 cfd->ioptions()->compaction_thread_limiter.get());
67 if (limiter == nullptr) {
68 return true;
69 }
70 *token = limiter->GetToken(force);
71 if (*token != nullptr) {
72 ROCKS_LOG_BUFFER(log_buffer,
73 "Thread limiter [%s] increase [%s] compaction task, "
74 "force: %s, tasks after: %d",
75 limiter->GetName().c_str(), cfd->GetName().c_str(),
76 force ? "true" : "false", limiter->GetOutstandingTask());
77 return true;
78 }
79 return false;
80 }
81
82 Status DBImpl::SyncClosedLogs(JobContext* job_context) {
83 TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
84 mutex_.AssertHeld();
85 autovector<log::Writer*, 1> logs_to_sync;
86 uint64_t current_log_number = logfile_number_;
87 while (logs_.front().number < current_log_number &&
88 logs_.front().getting_synced) {
89 log_sync_cv_.Wait();
90 }
91 for (auto it = logs_.begin();
92 it != logs_.end() && it->number < current_log_number; ++it) {
93 auto& log = *it;
94 assert(!log.getting_synced);
95 log.getting_synced = true;
96 logs_to_sync.push_back(log.writer);
97 }
98
99 Status s;
100 if (!logs_to_sync.empty()) {
101 mutex_.Unlock();
102
103 for (log::Writer* log : logs_to_sync) {
104 ROCKS_LOG_INFO(immutable_db_options_.info_log,
105 "[JOB %d] Syncing log #%" PRIu64, job_context->job_id,
106 log->get_log_number());
107 s = log->file()->Sync(immutable_db_options_.use_fsync);
108 if (!s.ok()) {
109 break;
110 }
111
112 if (immutable_db_options_.recycle_log_file_num > 0) {
113 s = log->Close();
114 if (!s.ok()) {
115 break;
116 }
117 }
118 }
119 if (s.ok()) {
120 s = directories_.GetWalDir()->Fsync();
121 }
122
123 mutex_.Lock();
124
125 // "number <= current_log_number - 1" is equivalent to
126 // "number < current_log_number".
127 MarkLogsSynced(current_log_number - 1, true, s);
128 if (!s.ok()) {
129 error_handler_.SetBGError(s, BackgroundErrorReason::kFlush);
130 TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
131 return s;
132 }
133 }
134 return s;
135 }
136
137 Status DBImpl::FlushMemTableToOutputFile(
138 ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
139 bool* made_progress, JobContext* job_context,
140 SuperVersionContext* superversion_context,
141 std::vector<SequenceNumber>& snapshot_seqs,
142 SequenceNumber earliest_write_conflict_snapshot,
143 SnapshotChecker* snapshot_checker, LogBuffer* log_buffer,
144 Env::Priority thread_pri) {
145 mutex_.AssertHeld();
146 assert(cfd->imm()->NumNotFlushed() != 0);
147 assert(cfd->imm()->IsFlushPending());
148
149 FlushJob flush_job(
150 dbname_, cfd, immutable_db_options_, mutable_cf_options,
151 nullptr /* memtable_id */, file_options_for_compaction_, versions_.get(),
152 &mutex_, &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot,
153 snapshot_checker, job_context, log_buffer, directories_.GetDbDir(),
154 GetDataDir(cfd, 0U),
155 GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
156 &event_logger_, mutable_cf_options.report_bg_io_stats,
157 true /* sync_output_directory */, true /* write_manifest */, thread_pri);
158
159 FileMetaData file_meta;
160
161 TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables");
162 flush_job.PickMemTable();
163 TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:AfterPickMemtables");
164
165 #ifndef ROCKSDB_LITE
166 // may temporarily unlock and lock the mutex.
167 NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id);
168 #endif // ROCKSDB_LITE
169
170 Status s;
171 if (logfile_number_ > 0 &&
172 versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1) {
173 // If there are more than one column families, we need to make sure that
174 // all the log files except the most recent one are synced. Otherwise if
175 // the host crashes after flushing and before WAL is persistent, the
176 // flushed SST may contain data from write batches whose updates to
177 // other column families are missing.
178 // SyncClosedLogs() may unlock and re-lock the db_mutex.
179 s = SyncClosedLogs(job_context);
180 } else {
181 TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Skip");
182 }
183
184 // Within flush_job.Run, rocksdb may call event listener to notify
185 // file creation and deletion.
186 //
187 // Note that flush_job.Run will unlock and lock the db_mutex,
188 // and EventListener callback will be called when the db_mutex
189 // is unlocked by the current thread.
190 if (s.ok()) {
191 s = flush_job.Run(&logs_with_prep_tracker_, &file_meta);
192 } else {
193 flush_job.Cancel();
194 }
195
196 if (s.ok()) {
197 InstallSuperVersionAndScheduleWork(cfd, superversion_context,
198 mutable_cf_options);
199 if (made_progress) {
200 *made_progress = true;
201 }
202 VersionStorageInfo::LevelSummaryStorage tmp;
203 ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
204 cfd->GetName().c_str(),
205 cfd->current()->storage_info()->LevelSummary(&tmp));
206 }
207
208 if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
209 Status new_bg_error = s;
210 error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
211 }
212 if (s.ok()) {
213 #ifndef ROCKSDB_LITE
214 // may temporarily unlock and lock the mutex.
215 NotifyOnFlushCompleted(cfd, mutable_cf_options,
216 flush_job.GetCommittedFlushJobsInfo());
217 auto sfm = static_cast<SstFileManagerImpl*>(
218 immutable_db_options_.sst_file_manager.get());
219 if (sfm) {
220 // Notify sst_file_manager that a new file was added
221 std::string file_path = MakeTableFileName(
222 cfd->ioptions()->cf_paths[0].path, file_meta.fd.GetNumber());
223 sfm->OnAddFile(file_path);
224 if (sfm->IsMaxAllowedSpaceReached()) {
225 Status new_bg_error =
226 Status::SpaceLimit("Max allowed space was reached");
227 TEST_SYNC_POINT_CALLBACK(
228 "DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached",
229 &new_bg_error);
230 error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
231 }
232 }
233 #endif // ROCKSDB_LITE
234 }
235 TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:Finish");
236 return s;
237 }
238
239 Status DBImpl::FlushMemTablesToOutputFiles(
240 const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
241 JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) {
242 if (immutable_db_options_.atomic_flush) {
243 return AtomicFlushMemTablesToOutputFiles(
244 bg_flush_args, made_progress, job_context, log_buffer, thread_pri);
245 }
246 std::vector<SequenceNumber> snapshot_seqs;
247 SequenceNumber earliest_write_conflict_snapshot;
248 SnapshotChecker* snapshot_checker;
249 GetSnapshotContext(job_context, &snapshot_seqs,
250 &earliest_write_conflict_snapshot, &snapshot_checker);
251 Status status;
252 for (auto& arg : bg_flush_args) {
253 ColumnFamilyData* cfd = arg.cfd_;
254 MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
255 SuperVersionContext* superversion_context = arg.superversion_context_;
256 Status s = FlushMemTableToOutputFile(
257 cfd, mutable_cf_options, made_progress, job_context,
258 superversion_context, snapshot_seqs, earliest_write_conflict_snapshot,
259 snapshot_checker, log_buffer, thread_pri);
260 if (!s.ok()) {
261 status = s;
262 if (!s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
263 // At this point, DB is not shutting down, nor is cfd dropped.
264 // Something is wrong, thus we break out of the loop.
265 break;
266 }
267 }
268 }
269 return status;
270 }
271
272 /*
273 * Atomically flushes multiple column families.
274 *
275 * For each column family, all memtables with ID smaller than or equal to the
276 * ID specified in bg_flush_args will be flushed. Only after all column
277 * families finish flush will this function commit to MANIFEST. If any of the
278 * column families are not flushed successfully, this function does not have
279 * any side-effect on the state of the database.
280 */
281 Status DBImpl::AtomicFlushMemTablesToOutputFiles(
282 const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
283 JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) {
284 mutex_.AssertHeld();
285
286 autovector<ColumnFamilyData*> cfds;
287 for (const auto& arg : bg_flush_args) {
288 cfds.emplace_back(arg.cfd_);
289 }
290
291 #ifndef NDEBUG
292 for (const auto cfd : cfds) {
293 assert(cfd->imm()->NumNotFlushed() != 0);
294 assert(cfd->imm()->IsFlushPending());
295 }
296 #endif /* !NDEBUG */
297
298 std::vector<SequenceNumber> snapshot_seqs;
299 SequenceNumber earliest_write_conflict_snapshot;
300 SnapshotChecker* snapshot_checker;
301 GetSnapshotContext(job_context, &snapshot_seqs,
302 &earliest_write_conflict_snapshot, &snapshot_checker);
303
304 autovector<Directory*> distinct_output_dirs;
305 autovector<std::string> distinct_output_dir_paths;
306 std::vector<std::unique_ptr<FlushJob>> jobs;
307 std::vector<MutableCFOptions> all_mutable_cf_options;
308 int num_cfs = static_cast<int>(cfds.size());
309 all_mutable_cf_options.reserve(num_cfs);
310 for (int i = 0; i < num_cfs; ++i) {
311 auto cfd = cfds[i];
312 Directory* data_dir = GetDataDir(cfd, 0U);
313 const std::string& curr_path = cfd->ioptions()->cf_paths[0].path;
314
315 // Add to distinct output directories if eligible. Use linear search. Since
316 // the number of elements in the vector is not large, performance should be
317 // tolerable.
318 bool found = false;
319 for (const auto& path : distinct_output_dir_paths) {
320 if (path == curr_path) {
321 found = true;
322 break;
323 }
324 }
325 if (!found) {
326 distinct_output_dir_paths.emplace_back(curr_path);
327 distinct_output_dirs.emplace_back(data_dir);
328 }
329
330 all_mutable_cf_options.emplace_back(*cfd->GetLatestMutableCFOptions());
331 const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.back();
332 const uint64_t* max_memtable_id = &(bg_flush_args[i].max_memtable_id_);
333 jobs.emplace_back(new FlushJob(
334 dbname_, cfd, immutable_db_options_, mutable_cf_options,
335 max_memtable_id, file_options_for_compaction_, versions_.get(), &mutex_,
336 &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot,
337 snapshot_checker, job_context, log_buffer, directories_.GetDbDir(),
338 data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
339 stats_, &event_logger_, mutable_cf_options.report_bg_io_stats,
340 false /* sync_output_directory */, false /* write_manifest */,
341 thread_pri));
342 jobs.back()->PickMemTable();
343 }
344
345 std::vector<FileMetaData> file_meta(num_cfs);
346 Status s;
347 assert(num_cfs == static_cast<int>(jobs.size()));
348
349 #ifndef ROCKSDB_LITE
350 for (int i = 0; i != num_cfs; ++i) {
351 const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.at(i);
352 // may temporarily unlock and lock the mutex.
353 NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options,
354 job_context->job_id);
355 }
356 #endif /* !ROCKSDB_LITE */
357
358 if (logfile_number_ > 0) {
359 // TODO (yanqin) investigate whether we should sync the closed logs for
360 // single column family case.
361 s = SyncClosedLogs(job_context);
362 }
363
364 // exec_status stores the execution status of flush_jobs as
365 // <bool /* executed */, Status /* status code */>
366 autovector<std::pair<bool, Status>> exec_status;
367 for (int i = 0; i != num_cfs; ++i) {
368 // Initially all jobs are not executed, with status OK.
369 exec_status.emplace_back(false, Status::OK());
370 }
371
372 if (s.ok()) {
373 // TODO (yanqin): parallelize jobs with threads.
374 for (int i = 1; i != num_cfs; ++i) {
375 exec_status[i].second =
376 jobs[i]->Run(&logs_with_prep_tracker_, &file_meta[i]);
377 exec_status[i].first = true;
378 }
379 if (num_cfs > 1) {
380 TEST_SYNC_POINT(
381 "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:1");
382 TEST_SYNC_POINT(
383 "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2");
384 }
385 assert(exec_status.size() > 0);
386 assert(!file_meta.empty());
387 exec_status[0].second =
388 jobs[0]->Run(&logs_with_prep_tracker_, &file_meta[0]);
389 exec_status[0].first = true;
390
391 Status error_status;
392 for (const auto& e : exec_status) {
393 if (!e.second.ok()) {
394 s = e.second;
395 if (!e.second.IsShutdownInProgress() &&
396 !e.second.IsColumnFamilyDropped()) {
397 // If a flush job did not return OK, and the CF is not dropped, and
398 // the DB is not shutting down, then we have to return this result to
399 // caller later.
400 error_status = e.second;
401 }
402 }
403 }
404
405 s = error_status.ok() ? s : error_status;
406 }
407
408 if (s.IsColumnFamilyDropped()) {
409 s = Status::OK();
410 }
411
412 if (s.ok() || s.IsShutdownInProgress()) {
413 // Sync on all distinct output directories.
414 for (auto dir : distinct_output_dirs) {
415 if (dir != nullptr) {
416 Status error_status = dir->Fsync();
417 if (!error_status.ok()) {
418 s = error_status;
419 break;
420 }
421 }
422 }
423 } else {
424 // Need to undo atomic flush if something went wrong, i.e. s is not OK and
425 // it is not because of CF drop.
426 // Have to cancel the flush jobs that have NOT executed because we need to
427 // unref the versions.
428 for (int i = 0; i != num_cfs; ++i) {
429 if (!exec_status[i].first) {
430 jobs[i]->Cancel();
431 }
432 }
433 for (int i = 0; i != num_cfs; ++i) {
434 if (exec_status[i].first && exec_status[i].second.ok()) {
435 auto& mems = jobs[i]->GetMemTables();
436 cfds[i]->imm()->RollbackMemtableFlush(mems,
437 file_meta[i].fd.GetNumber());
438 }
439 }
440 }
441
442 if (s.ok()) {
443 auto wait_to_install_func = [&]() {
444 bool ready = true;
445 for (size_t i = 0; i != cfds.size(); ++i) {
446 const auto& mems = jobs[i]->GetMemTables();
447 if (cfds[i]->IsDropped()) {
448 // If the column family is dropped, then do not wait.
449 continue;
450 } else if (!mems.empty() &&
451 cfds[i]->imm()->GetEarliestMemTableID() < mems[0]->GetID()) {
452 // If a flush job needs to install the flush result for mems and
453 // mems[0] is not the earliest memtable, it means another thread must
454 // be installing flush results for the same column family, then the
455 // current thread needs to wait.
456 ready = false;
457 break;
458 } else if (mems.empty() && cfds[i]->imm()->GetEarliestMemTableID() <=
459 bg_flush_args[i].max_memtable_id_) {
460 // If a flush job does not need to install flush results, then it has
461 // to wait until all memtables up to max_memtable_id_ (inclusive) are
462 // installed.
463 ready = false;
464 break;
465 }
466 }
467 return ready;
468 };
469
470 bool resuming_from_bg_err = error_handler_.IsDBStopped();
471 while ((!error_handler_.IsDBStopped() ||
472 error_handler_.GetRecoveryError().ok()) &&
473 !wait_to_install_func()) {
474 atomic_flush_install_cv_.Wait();
475 }
476
477 s = resuming_from_bg_err ? error_handler_.GetRecoveryError()
478 : error_handler_.GetBGError();
479 }
480
481 if (s.ok()) {
482 autovector<ColumnFamilyData*> tmp_cfds;
483 autovector<const autovector<MemTable*>*> mems_list;
484 autovector<const MutableCFOptions*> mutable_cf_options_list;
485 autovector<FileMetaData*> tmp_file_meta;
486 for (int i = 0; i != num_cfs; ++i) {
487 const auto& mems = jobs[i]->GetMemTables();
488 if (!cfds[i]->IsDropped() && !mems.empty()) {
489 tmp_cfds.emplace_back(cfds[i]);
490 mems_list.emplace_back(&mems);
491 mutable_cf_options_list.emplace_back(&all_mutable_cf_options[i]);
492 tmp_file_meta.emplace_back(&file_meta[i]);
493 }
494 }
495
496 s = InstallMemtableAtomicFlushResults(
497 nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list,
498 versions_.get(), &mutex_, tmp_file_meta,
499 &job_context->memtables_to_free, directories_.GetDbDir(), log_buffer);
500 }
501
502 if (s.ok()) {
503 assert(num_cfs ==
504 static_cast<int>(job_context->superversion_contexts.size()));
505 for (int i = 0; i != num_cfs; ++i) {
506 if (cfds[i]->IsDropped()) {
507 continue;
508 }
509 InstallSuperVersionAndScheduleWork(cfds[i],
510 &job_context->superversion_contexts[i],
511 all_mutable_cf_options[i]);
512 VersionStorageInfo::LevelSummaryStorage tmp;
513 ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
514 cfds[i]->GetName().c_str(),
515 cfds[i]->current()->storage_info()->LevelSummary(&tmp));
516 }
517 if (made_progress) {
518 *made_progress = true;
519 }
520 #ifndef ROCKSDB_LITE
521 auto sfm = static_cast<SstFileManagerImpl*>(
522 immutable_db_options_.sst_file_manager.get());
523 assert(all_mutable_cf_options.size() == static_cast<size_t>(num_cfs));
524 for (int i = 0; i != num_cfs; ++i) {
525 if (cfds[i]->IsDropped()) {
526 continue;
527 }
528 NotifyOnFlushCompleted(cfds[i], all_mutable_cf_options[i],
529 jobs[i]->GetCommittedFlushJobsInfo());
530 if (sfm) {
531 std::string file_path = MakeTableFileName(
532 cfds[i]->ioptions()->cf_paths[0].path, file_meta[i].fd.GetNumber());
533 sfm->OnAddFile(file_path);
534 if (sfm->IsMaxAllowedSpaceReached() &&
535 error_handler_.GetBGError().ok()) {
536 Status new_bg_error =
537 Status::SpaceLimit("Max allowed space was reached");
538 error_handler_.SetBGError(new_bg_error,
539 BackgroundErrorReason::kFlush);
540 }
541 }
542 }
543 #endif // ROCKSDB_LITE
544 }
545
546 if (!s.ok() && !s.IsShutdownInProgress()) {
547 Status new_bg_error = s;
548 error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
549 }
550
551 return s;
552 }
553
554 void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
555 const MutableCFOptions& mutable_cf_options,
556 int job_id) {
557 #ifndef ROCKSDB_LITE
558 if (immutable_db_options_.listeners.size() == 0U) {
559 return;
560 }
561 mutex_.AssertHeld();
562 if (shutting_down_.load(std::memory_order_acquire)) {
563 return;
564 }
565 bool triggered_writes_slowdown =
566 (cfd->current()->storage_info()->NumLevelFiles(0) >=
567 mutable_cf_options.level0_slowdown_writes_trigger);
568 bool triggered_writes_stop =
569 (cfd->current()->storage_info()->NumLevelFiles(0) >=
570 mutable_cf_options.level0_stop_writes_trigger);
571 // release lock while notifying events
572 mutex_.Unlock();
573 {
574 FlushJobInfo info{};
575 info.cf_id = cfd->GetID();
576 info.cf_name = cfd->GetName();
577 // TODO(yhchiang): make db_paths dynamic in case flush does not
578 // go to L0 in the future.
579 const uint64_t file_number = file_meta->fd.GetNumber();
580 info.file_path =
581 MakeTableFileName(cfd->ioptions()->cf_paths[0].path, file_number);
582 info.file_number = file_number;
583 info.thread_id = env_->GetThreadID();
584 info.job_id = job_id;
585 info.triggered_writes_slowdown = triggered_writes_slowdown;
586 info.triggered_writes_stop = triggered_writes_stop;
587 info.smallest_seqno = file_meta->fd.smallest_seqno;
588 info.largest_seqno = file_meta->fd.largest_seqno;
589 info.flush_reason = cfd->GetFlushReason();
590 for (auto listener : immutable_db_options_.listeners) {
591 listener->OnFlushBegin(this, info);
592 }
593 }
594 mutex_.Lock();
595 // no need to signal bg_cv_ as it will be signaled at the end of the
596 // flush process.
597 #else
598 (void)cfd;
599 (void)file_meta;
600 (void)mutable_cf_options;
601 (void)job_id;
602 #endif // ROCKSDB_LITE
603 }
604
605 void DBImpl::NotifyOnFlushCompleted(
606 ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
607 std::list<std::unique_ptr<FlushJobInfo>>* flush_jobs_info) {
608 #ifndef ROCKSDB_LITE
609 assert(flush_jobs_info != nullptr);
610 if (immutable_db_options_.listeners.size() == 0U) {
611 return;
612 }
613 mutex_.AssertHeld();
614 if (shutting_down_.load(std::memory_order_acquire)) {
615 return;
616 }
617 bool triggered_writes_slowdown =
618 (cfd->current()->storage_info()->NumLevelFiles(0) >=
619 mutable_cf_options.level0_slowdown_writes_trigger);
620 bool triggered_writes_stop =
621 (cfd->current()->storage_info()->NumLevelFiles(0) >=
622 mutable_cf_options.level0_stop_writes_trigger);
623 // release lock while notifying events
624 mutex_.Unlock();
625 {
626 for (auto& info : *flush_jobs_info) {
627 info->triggered_writes_slowdown = triggered_writes_slowdown;
628 info->triggered_writes_stop = triggered_writes_stop;
629 for (auto listener : immutable_db_options_.listeners) {
630 listener->OnFlushCompleted(this, *info);
631 }
632 }
633 flush_jobs_info->clear();
634 }
635 mutex_.Lock();
636 // no need to signal bg_cv_ as it will be signaled at the end of the
637 // flush process.
638 #else
639 (void)cfd;
640 (void)mutable_cf_options;
641 (void)flush_jobs_info;
642 #endif // ROCKSDB_LITE
643 }
644
645 Status DBImpl::CompactRange(const CompactRangeOptions& options,
646 ColumnFamilyHandle* column_family,
647 const Slice* begin, const Slice* end) {
648 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
649 auto cfd = cfh->cfd();
650
651 if (options.target_path_id >= cfd->ioptions()->cf_paths.size()) {
652 return Status::InvalidArgument("Invalid target path ID");
653 }
654
655 bool exclusive = options.exclusive_manual_compaction;
656
657 bool flush_needed = true;
658 if (begin != nullptr && end != nullptr) {
659 // TODO(ajkr): We could also optimize away the flush in certain cases where
660 // one/both sides of the interval are unbounded. But it requires more
661 // changes to RangesOverlapWithMemtables.
662 Range range(*begin, *end);
663 SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
664 cfd->RangesOverlapWithMemtables({range}, super_version, &flush_needed);
665 CleanupSuperVersion(super_version);
666 }
667
668 Status s;
669 if (flush_needed) {
670 FlushOptions fo;
671 fo.allow_write_stall = options.allow_write_stall;
672 if (immutable_db_options_.atomic_flush) {
673 autovector<ColumnFamilyData*> cfds;
674 mutex_.Lock();
675 SelectColumnFamiliesForAtomicFlush(&cfds);
676 mutex_.Unlock();
677 s = AtomicFlushMemTables(cfds, fo, FlushReason::kManualCompaction,
678 false /* writes_stopped */);
679 } else {
680 s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction,
681 false /* writes_stopped*/);
682 }
683 if (!s.ok()) {
684 LogFlush(immutable_db_options_.info_log);
685 return s;
686 }
687 }
688
689 int max_level_with_files = 0;
690 // max_file_num_to_ignore can be used to filter out newly created SST files,
691 // useful for bottom level compaction in a manual compaction
692 uint64_t max_file_num_to_ignore = port::kMaxUint64;
693 uint64_t next_file_number = port::kMaxUint64;
694 {
695 InstrumentedMutexLock l(&mutex_);
696 Version* base = cfd->current();
697 for (int level = 1; level < base->storage_info()->num_non_empty_levels();
698 level++) {
699 if (base->storage_info()->OverlapInLevel(level, begin, end)) {
700 max_level_with_files = level;
701 }
702 }
703 next_file_number = versions_->current_next_file_number();
704 }
705
706 int final_output_level = 0;
707
708 if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal &&
709 cfd->NumberLevels() > 1) {
710 // Always compact all files together.
711 final_output_level = cfd->NumberLevels() - 1;
712 // if bottom most level is reserved
713 if (immutable_db_options_.allow_ingest_behind) {
714 final_output_level--;
715 }
716 s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
717 final_output_level, options, begin, end, exclusive,
718 false, max_file_num_to_ignore);
719 } else {
720 for (int level = 0; level <= max_level_with_files; level++) {
721 int output_level;
722 // in case the compaction is universal or if we're compacting the
723 // bottom-most level, the output level will be the same as input one.
724 // level 0 can never be the bottommost level (i.e. if all files are in
725 // level 0, we will compact to level 1)
726 if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
727 cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
728 output_level = level;
729 } else if (level == max_level_with_files && level > 0) {
730 if (options.bottommost_level_compaction ==
731 BottommostLevelCompaction::kSkip) {
732 // Skip bottommost level compaction
733 continue;
734 } else if (options.bottommost_level_compaction ==
735 BottommostLevelCompaction::kIfHaveCompactionFilter &&
736 cfd->ioptions()->compaction_filter == nullptr &&
737 cfd->ioptions()->compaction_filter_factory == nullptr) {
738 // Skip bottommost level compaction since we don't have a compaction
739 // filter
740 continue;
741 }
742 output_level = level;
743 // update max_file_num_to_ignore only for bottom level compaction
744 // because data in newly compacted files in middle levels may still need
745 // to be pushed down
746 max_file_num_to_ignore = next_file_number;
747 } else {
748 output_level = level + 1;
749 if (cfd->ioptions()->compaction_style == kCompactionStyleLevel &&
750 cfd->ioptions()->level_compaction_dynamic_level_bytes &&
751 level == 0) {
752 output_level = ColumnFamilyData::kCompactToBaseLevel;
753 }
754 }
755 s = RunManualCompaction(cfd, level, output_level, options, begin, end,
756 exclusive, false, max_file_num_to_ignore);
757 if (!s.ok()) {
758 break;
759 }
760 if (output_level == ColumnFamilyData::kCompactToBaseLevel) {
761 final_output_level = cfd->NumberLevels() - 1;
762 } else if (output_level > final_output_level) {
763 final_output_level = output_level;
764 }
765 TEST_SYNC_POINT("DBImpl::RunManualCompaction()::1");
766 TEST_SYNC_POINT("DBImpl::RunManualCompaction()::2");
767 }
768 }
769 if (!s.ok()) {
770 LogFlush(immutable_db_options_.info_log);
771 return s;
772 }
773
774 if (options.change_level) {
775 ROCKS_LOG_INFO(immutable_db_options_.info_log,
776 "[RefitLevel] waiting for background threads to stop");
777 s = PauseBackgroundWork();
778 if (s.ok()) {
779 s = ReFitLevel(cfd, final_output_level, options.target_level);
780 }
781 ContinueBackgroundWork();
782 }
783 LogFlush(immutable_db_options_.info_log);
784
785 {
786 InstrumentedMutexLock l(&mutex_);
787 // an automatic compaction that has been scheduled might have been
788 // preempted by the manual compactions. Need to schedule it back.
789 MaybeScheduleFlushOrCompaction();
790 }
791
792 return s;
793 }
794
795 Status DBImpl::CompactFiles(const CompactionOptions& compact_options,
796 ColumnFamilyHandle* column_family,
797 const std::vector<std::string>& input_file_names,
798 const int output_level, const int output_path_id,
799 std::vector<std::string>* const output_file_names,
800 CompactionJobInfo* compaction_job_info) {
801 #ifdef ROCKSDB_LITE
802 (void)compact_options;
803 (void)column_family;
804 (void)input_file_names;
805 (void)output_level;
806 (void)output_path_id;
807 (void)output_file_names;
808 (void)compaction_job_info;
809 // not supported in lite version
810 return Status::NotSupported("Not supported in ROCKSDB LITE");
811 #else
812 if (column_family == nullptr) {
813 return Status::InvalidArgument("ColumnFamilyHandle must be non-null.");
814 }
815
816 auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
817 assert(cfd);
818
819 Status s;
820 JobContext job_context(0, true);
821 LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
822 immutable_db_options_.info_log.get());
823
824 // Perform CompactFiles
825 TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile2");
826 {
827 InstrumentedMutexLock l(&mutex_);
828
829 // This call will unlock/lock the mutex to wait for current running
830 // IngestExternalFile() calls to finish.
831 WaitForIngestFile();
832
833 // We need to get current after `WaitForIngestFile`, because
834 // `IngestExternalFile` may add files that overlap with `input_file_names`
835 auto* current = cfd->current();
836 current->Ref();
837
838 s = CompactFilesImpl(compact_options, cfd, current, input_file_names,
839 output_file_names, output_level, output_path_id,
840 &job_context, &log_buffer, compaction_job_info);
841
842 current->Unref();
843 }
844
845 // Find and delete obsolete files
846 {
847 InstrumentedMutexLock l(&mutex_);
848 // If !s.ok(), this means that Compaction failed. In that case, we want
849 // to delete all obsolete files we might have created and we force
850 // FindObsoleteFiles(). This is because job_context does not
851 // catch all created files if compaction failed.
852 FindObsoleteFiles(&job_context, !s.ok());
853 } // release the mutex
854
855 // delete unnecessary files if any, this is done outside the mutex
856 if (job_context.HaveSomethingToClean() ||
857 job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
858 // Have to flush the info logs before bg_compaction_scheduled_--
859 // because if bg_flush_scheduled_ becomes 0 and the lock is
860 // released, the deconstructor of DB can kick in and destroy all the
861 // states of DB so info_log might not be available after that point.
862 // It also applies to access other states that DB owns.
863 log_buffer.FlushBufferToLog();
864 if (job_context.HaveSomethingToDelete()) {
865 // no mutex is locked here. No need to Unlock() and Lock() here.
866 PurgeObsoleteFiles(job_context);
867 }
868 job_context.Clean();
869 }
870
871 return s;
872 #endif // ROCKSDB_LITE
873 }
874
875 #ifndef ROCKSDB_LITE
876 Status DBImpl::CompactFilesImpl(
877 const CompactionOptions& compact_options, ColumnFamilyData* cfd,
878 Version* version, const std::vector<std::string>& input_file_names,
879 std::vector<std::string>* const output_file_names, const int output_level,
880 int output_path_id, JobContext* job_context, LogBuffer* log_buffer,
881 CompactionJobInfo* compaction_job_info) {
882 mutex_.AssertHeld();
883
884 if (shutting_down_.load(std::memory_order_acquire)) {
885 return Status::ShutdownInProgress();
886 }
887 if (manual_compaction_paused_.load(std::memory_order_acquire)) {
888 return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
889 }
890
891 std::unordered_set<uint64_t> input_set;
892 for (const auto& file_name : input_file_names) {
893 input_set.insert(TableFileNameToNumber(file_name));
894 }
895
896 ColumnFamilyMetaData cf_meta;
897 // TODO(yhchiang): can directly use version here if none of the
898 // following functions call is pluggable to external developers.
899 version->GetColumnFamilyMetaData(&cf_meta);
900
901 if (output_path_id < 0) {
902 if (cfd->ioptions()->cf_paths.size() == 1U) {
903 output_path_id = 0;
904 } else {
905 return Status::NotSupported(
906 "Automatic output path selection is not "
907 "yet supported in CompactFiles()");
908 }
909 }
910
911 Status s = cfd->compaction_picker()->SanitizeCompactionInputFiles(
912 &input_set, cf_meta, output_level);
913 if (!s.ok()) {
914 return s;
915 }
916
917 std::vector<CompactionInputFiles> input_files;
918 s = cfd->compaction_picker()->GetCompactionInputsFromFileNumbers(
919 &input_files, &input_set, version->storage_info(), compact_options);
920 if (!s.ok()) {
921 return s;
922 }
923
924 for (const auto& inputs : input_files) {
925 if (cfd->compaction_picker()->AreFilesInCompaction(inputs.files)) {
926 return Status::Aborted(
927 "Some of the necessary compaction input "
928 "files are already being compacted");
929 }
930 }
931 bool sfm_reserved_compact_space = false;
932 // First check if we have enough room to do the compaction
933 bool enough_room = EnoughRoomForCompaction(
934 cfd, input_files, &sfm_reserved_compact_space, log_buffer);
935
936 if (!enough_room) {
937 // m's vars will get set properly at the end of this function,
938 // as long as status == CompactionTooLarge
939 return Status::CompactionTooLarge();
940 }
941
942 // At this point, CompactFiles will be run.
943 bg_compaction_scheduled_++;
944
945 std::unique_ptr<Compaction> c;
946 assert(cfd->compaction_picker());
947 c.reset(cfd->compaction_picker()->CompactFiles(
948 compact_options, input_files, output_level, version->storage_info(),
949 *cfd->GetLatestMutableCFOptions(), output_path_id));
950 // we already sanitized the set of input files and checked for conflicts
951 // without releasing the lock, so we're guaranteed a compaction can be formed.
952 assert(c != nullptr);
953
954 c->SetInputVersion(version);
955 // deletion compaction currently not allowed in CompactFiles.
956 assert(!c->deletion_compaction());
957
958 std::vector<SequenceNumber> snapshot_seqs;
959 SequenceNumber earliest_write_conflict_snapshot;
960 SnapshotChecker* snapshot_checker;
961 GetSnapshotContext(job_context, &snapshot_seqs,
962 &earliest_write_conflict_snapshot, &snapshot_checker);
963
964 std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem(
965 new std::list<uint64_t>::iterator(
966 CaptureCurrentFileNumberInPendingOutputs()));
967
968 assert(is_snapshot_supported_ || snapshots_.empty());
969 CompactionJobStats compaction_job_stats;
970 CompactionJob compaction_job(
971 job_context->job_id, c.get(), immutable_db_options_,
972 file_options_for_compaction_, versions_.get(), &shutting_down_,
973 preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
974 GetDataDir(c->column_family_data(), c->output_path_id()), stats_, &mutex_,
975 &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot,
976 snapshot_checker, table_cache_, &event_logger_,
977 c->mutable_cf_options()->paranoid_file_checks,
978 c->mutable_cf_options()->report_bg_io_stats, dbname_,
979 &compaction_job_stats, Env::Priority::USER, &manual_compaction_paused_);
980
981 // Creating a compaction influences the compaction score because the score
982 // takes running compactions into account (by skipping files that are already
983 // being compacted). Since we just changed compaction score, we recalculate it
984 // here.
985 version->storage_info()->ComputeCompactionScore(*cfd->ioptions(),
986 *c->mutable_cf_options());
987
988 compaction_job.Prepare();
989
990 mutex_.Unlock();
991 TEST_SYNC_POINT("CompactFilesImpl:0");
992 TEST_SYNC_POINT("CompactFilesImpl:1");
993 compaction_job.Run();
994 TEST_SYNC_POINT("CompactFilesImpl:2");
995 TEST_SYNC_POINT("CompactFilesImpl:3");
996 mutex_.Lock();
997
998 Status status = compaction_job.Install(*c->mutable_cf_options());
999 if (status.ok()) {
1000 InstallSuperVersionAndScheduleWork(c->column_family_data(),
1001 &job_context->superversion_contexts[0],
1002 *c->mutable_cf_options());
1003 }
1004 c->ReleaseCompactionFiles(s);
1005 #ifndef ROCKSDB_LITE
1006 // Need to make sure SstFileManager does its bookkeeping
1007 auto sfm = static_cast<SstFileManagerImpl*>(
1008 immutable_db_options_.sst_file_manager.get());
1009 if (sfm && sfm_reserved_compact_space) {
1010 sfm->OnCompactionCompletion(c.get());
1011 }
1012 #endif // ROCKSDB_LITE
1013
1014 ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
1015
1016 if (compaction_job_info != nullptr) {
1017 BuildCompactionJobInfo(cfd, c.get(), s, compaction_job_stats,
1018 job_context->job_id, version, compaction_job_info);
1019 }
1020
1021 if (status.ok()) {
1022 // Done
1023 } else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) {
1024 // Ignore compaction errors found during shutting down
1025 } else if (status.IsManualCompactionPaused()) {
1026 // Don't report stopping manual compaction as error
1027 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1028 "[%s] [JOB %d] Stopping manual compaction",
1029 c->column_family_data()->GetName().c_str(),
1030 job_context->job_id);
1031 } else {
1032 ROCKS_LOG_WARN(immutable_db_options_.info_log,
1033 "[%s] [JOB %d] Compaction error: %s",
1034 c->column_family_data()->GetName().c_str(),
1035 job_context->job_id, status.ToString().c_str());
1036 error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
1037 }
1038
1039 if (output_file_names != nullptr) {
1040 for (const auto newf : c->edit()->GetNewFiles()) {
1041 (*output_file_names)
1042 .push_back(TableFileName(c->immutable_cf_options()->cf_paths,
1043 newf.second.fd.GetNumber(),
1044 newf.second.fd.GetPathId()));
1045 }
1046 }
1047
1048 c.reset();
1049
1050 bg_compaction_scheduled_--;
1051 if (bg_compaction_scheduled_ == 0) {
1052 bg_cv_.SignalAll();
1053 }
1054 MaybeScheduleFlushOrCompaction();
1055 TEST_SYNC_POINT("CompactFilesImpl:End");
1056
1057 return status;
1058 }
1059 #endif // ROCKSDB_LITE
1060
1061 Status DBImpl::PauseBackgroundWork() {
1062 InstrumentedMutexLock guard_lock(&mutex_);
1063 bg_compaction_paused_++;
1064 while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0 ||
1065 bg_flush_scheduled_ > 0) {
1066 bg_cv_.Wait();
1067 }
1068 bg_work_paused_++;
1069 return Status::OK();
1070 }
1071
1072 Status DBImpl::ContinueBackgroundWork() {
1073 InstrumentedMutexLock guard_lock(&mutex_);
1074 if (bg_work_paused_ == 0) {
1075 return Status::InvalidArgument();
1076 }
1077 assert(bg_work_paused_ > 0);
1078 assert(bg_compaction_paused_ > 0);
1079 bg_compaction_paused_--;
1080 bg_work_paused_--;
1081 // It's sufficient to check just bg_work_paused_ here since
1082 // bg_work_paused_ is always no greater than bg_compaction_paused_
1083 if (bg_work_paused_ == 0) {
1084 MaybeScheduleFlushOrCompaction();
1085 }
1086 return Status::OK();
1087 }
1088
1089 void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c,
1090 const Status& st,
1091 const CompactionJobStats& job_stats,
1092 int job_id) {
1093 #ifndef ROCKSDB_LITE
1094 if (immutable_db_options_.listeners.empty()) {
1095 return;
1096 }
1097 mutex_.AssertHeld();
1098 if (shutting_down_.load(std::memory_order_acquire)) {
1099 return;
1100 }
1101 if (c->is_manual_compaction() &&
1102 manual_compaction_paused_.load(std::memory_order_acquire)) {
1103 return;
1104 }
1105 Version* current = cfd->current();
1106 current->Ref();
1107 // release lock while notifying events
1108 mutex_.Unlock();
1109 TEST_SYNC_POINT("DBImpl::NotifyOnCompactionBegin::UnlockMutex");
1110 {
1111 CompactionJobInfo info{};
1112 info.cf_name = cfd->GetName();
1113 info.status = st;
1114 info.thread_id = env_->GetThreadID();
1115 info.job_id = job_id;
1116 info.base_input_level = c->start_level();
1117 info.output_level = c->output_level();
1118 info.stats = job_stats;
1119 info.table_properties = c->GetOutputTableProperties();
1120 info.compaction_reason = c->compaction_reason();
1121 info.compression = c->output_compression();
1122 for (size_t i = 0; i < c->num_input_levels(); ++i) {
1123 for (const auto fmd : *c->inputs(i)) {
1124 const FileDescriptor& desc = fmd->fd;
1125 const uint64_t file_number = desc.GetNumber();
1126 auto fn = TableFileName(c->immutable_cf_options()->cf_paths,
1127 file_number, desc.GetPathId());
1128 info.input_files.push_back(fn);
1129 info.input_file_infos.push_back(CompactionFileInfo{
1130 static_cast<int>(i), file_number, fmd->oldest_blob_file_number});
1131 if (info.table_properties.count(fn) == 0) {
1132 std::shared_ptr<const TableProperties> tp;
1133 auto s = current->GetTableProperties(&tp, fmd, &fn);
1134 if (s.ok()) {
1135 info.table_properties[fn] = tp;
1136 }
1137 }
1138 }
1139 }
1140 for (const auto newf : c->edit()->GetNewFiles()) {
1141 const FileMetaData& meta = newf.second;
1142 const FileDescriptor& desc = meta.fd;
1143 const uint64_t file_number = desc.GetNumber();
1144 info.output_files.push_back(TableFileName(
1145 c->immutable_cf_options()->cf_paths, file_number, desc.GetPathId()));
1146 info.output_file_infos.push_back(CompactionFileInfo{
1147 newf.first, file_number, meta.oldest_blob_file_number});
1148 }
1149 for (auto listener : immutable_db_options_.listeners) {
1150 listener->OnCompactionBegin(this, info);
1151 }
1152 }
1153 mutex_.Lock();
1154 current->Unref();
1155 #else
1156 (void)cfd;
1157 (void)c;
1158 (void)st;
1159 (void)job_stats;
1160 (void)job_id;
1161 #endif // ROCKSDB_LITE
1162 }
1163
1164 void DBImpl::NotifyOnCompactionCompleted(
1165 ColumnFamilyData* cfd, Compaction* c, const Status& st,
1166 const CompactionJobStats& compaction_job_stats, const int job_id) {
1167 #ifndef ROCKSDB_LITE
1168 if (immutable_db_options_.listeners.size() == 0U) {
1169 return;
1170 }
1171 mutex_.AssertHeld();
1172 if (shutting_down_.load(std::memory_order_acquire)) {
1173 return;
1174 }
1175 if (c->is_manual_compaction() &&
1176 manual_compaction_paused_.load(std::memory_order_acquire)) {
1177 return;
1178 }
1179 Version* current = cfd->current();
1180 current->Ref();
1181 // release lock while notifying events
1182 mutex_.Unlock();
1183 TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex");
1184 {
1185 CompactionJobInfo info{};
1186 BuildCompactionJobInfo(cfd, c, st, compaction_job_stats, job_id, current,
1187 &info);
1188 for (auto listener : immutable_db_options_.listeners) {
1189 listener->OnCompactionCompleted(this, info);
1190 }
1191 }
1192 mutex_.Lock();
1193 current->Unref();
1194 // no need to signal bg_cv_ as it will be signaled at the end of the
1195 // flush process.
1196 #else
1197 (void)cfd;
1198 (void)c;
1199 (void)st;
1200 (void)compaction_job_stats;
1201 (void)job_id;
1202 #endif // ROCKSDB_LITE
1203 }
1204
1205 // REQUIREMENT: block all background work by calling PauseBackgroundWork()
1206 // before calling this function
1207 Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
1208 assert(level < cfd->NumberLevels());
1209 if (target_level >= cfd->NumberLevels()) {
1210 return Status::InvalidArgument("Target level exceeds number of levels");
1211 }
1212
1213 SuperVersionContext sv_context(/* create_superversion */ true);
1214
1215 Status status;
1216
1217 InstrumentedMutexLock guard_lock(&mutex_);
1218
1219 // only allow one thread refitting
1220 if (refitting_level_) {
1221 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1222 "[ReFitLevel] another thread is refitting");
1223 return Status::NotSupported("another thread is refitting");
1224 }
1225 refitting_level_ = true;
1226
1227 const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
1228 // move to a smaller level
1229 int to_level = target_level;
1230 if (target_level < 0) {
1231 to_level = FindMinimumEmptyLevelFitting(cfd, mutable_cf_options, level);
1232 }
1233
1234 auto* vstorage = cfd->current()->storage_info();
1235 if (to_level > level) {
1236 if (level == 0) {
1237 return Status::NotSupported(
1238 "Cannot change from level 0 to other levels.");
1239 }
1240 // Check levels are empty for a trivial move
1241 for (int l = level + 1; l <= to_level; l++) {
1242 if (vstorage->NumLevelFiles(l) > 0) {
1243 return Status::NotSupported(
1244 "Levels between source and target are not empty for a move.");
1245 }
1246 }
1247 }
1248 if (to_level != level) {
1249 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1250 "[%s] Before refitting:\n%s", cfd->GetName().c_str(),
1251 cfd->current()->DebugString().data());
1252
1253 VersionEdit edit;
1254 edit.SetColumnFamily(cfd->GetID());
1255 for (const auto& f : vstorage->LevelFiles(level)) {
1256 edit.DeleteFile(level, f->fd.GetNumber());
1257 edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
1258 f->fd.GetFileSize(), f->smallest, f->largest,
1259 f->fd.smallest_seqno, f->fd.largest_seqno,
1260 f->marked_for_compaction, f->oldest_blob_file_number,
1261 f->oldest_ancester_time, f->file_creation_time,
1262 f->file_checksum, f->file_checksum_func_name);
1263 }
1264 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1265 "[%s] Apply version edit:\n%s", cfd->GetName().c_str(),
1266 edit.DebugString().data());
1267
1268 status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_,
1269 directories_.GetDbDir());
1270 InstallSuperVersionAndScheduleWork(cfd, &sv_context, mutable_cf_options);
1271
1272 ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] LogAndApply: %s\n",
1273 cfd->GetName().c_str(), status.ToString().data());
1274
1275 if (status.ok()) {
1276 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1277 "[%s] After refitting:\n%s", cfd->GetName().c_str(),
1278 cfd->current()->DebugString().data());
1279 }
1280 }
1281
1282 sv_context.Clean();
1283 refitting_level_ = false;
1284
1285 return status;
1286 }
1287
1288 int DBImpl::NumberLevels(ColumnFamilyHandle* column_family) {
1289 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1290 return cfh->cfd()->NumberLevels();
1291 }
1292
1293 int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle* /*column_family*/) {
1294 return 0;
1295 }
1296
1297 int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) {
1298 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1299 InstrumentedMutexLock l(&mutex_);
1300 return cfh->cfd()
1301 ->GetSuperVersion()
1302 ->mutable_cf_options.level0_stop_writes_trigger;
1303 }
1304
1305 Status DBImpl::Flush(const FlushOptions& flush_options,
1306 ColumnFamilyHandle* column_family) {
1307 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1308 ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.",
1309 cfh->GetName().c_str());
1310 Status s;
1311 if (immutable_db_options_.atomic_flush) {
1312 s = AtomicFlushMemTables({cfh->cfd()}, flush_options,
1313 FlushReason::kManualFlush);
1314 } else {
1315 s = FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualFlush);
1316 }
1317
1318 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1319 "[%s] Manual flush finished, status: %s\n",
1320 cfh->GetName().c_str(), s.ToString().c_str());
1321 return s;
1322 }
1323
1324 Status DBImpl::Flush(const FlushOptions& flush_options,
1325 const std::vector<ColumnFamilyHandle*>& column_families) {
1326 Status s;
1327 if (!immutable_db_options_.atomic_flush) {
1328 for (auto cfh : column_families) {
1329 s = Flush(flush_options, cfh);
1330 if (!s.ok()) {
1331 break;
1332 }
1333 }
1334 } else {
1335 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1336 "Manual atomic flush start.\n"
1337 "=====Column families:=====");
1338 for (auto cfh : column_families) {
1339 auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh);
1340 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",
1341 cfhi->GetName().c_str());
1342 }
1343 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1344 "=====End of column families list=====");
1345 autovector<ColumnFamilyData*> cfds;
1346 std::for_each(column_families.begin(), column_families.end(),
1347 [&cfds](ColumnFamilyHandle* elem) {
1348 auto cfh = static_cast<ColumnFamilyHandleImpl*>(elem);
1349 cfds.emplace_back(cfh->cfd());
1350 });
1351 s = AtomicFlushMemTables(cfds, flush_options, FlushReason::kManualFlush);
1352 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1353 "Manual atomic flush finished, status: %s\n"
1354 "=====Column families:=====",
1355 s.ToString().c_str());
1356 for (auto cfh : column_families) {
1357 auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh);
1358 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",
1359 cfhi->GetName().c_str());
1360 }
1361 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1362 "=====End of column families list=====");
1363 }
1364 return s;
1365 }
1366
1367 Status DBImpl::RunManualCompaction(
1368 ColumnFamilyData* cfd, int input_level, int output_level,
1369 const CompactRangeOptions& compact_range_options, const Slice* begin,
1370 const Slice* end, bool exclusive, bool disallow_trivial_move,
1371 uint64_t max_file_num_to_ignore) {
1372 assert(input_level == ColumnFamilyData::kCompactAllLevels ||
1373 input_level >= 0);
1374
1375 InternalKey begin_storage, end_storage;
1376 CompactionArg* ca;
1377
1378 bool scheduled = false;
1379 bool manual_conflict = false;
1380 ManualCompactionState manual;
1381 manual.cfd = cfd;
1382 manual.input_level = input_level;
1383 manual.output_level = output_level;
1384 manual.output_path_id = compact_range_options.target_path_id;
1385 manual.done = false;
1386 manual.in_progress = false;
1387 manual.incomplete = false;
1388 manual.exclusive = exclusive;
1389 manual.disallow_trivial_move = disallow_trivial_move;
1390 // For universal compaction, we enforce every manual compaction to compact
1391 // all files.
1392 if (begin == nullptr ||
1393 cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
1394 cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1395 manual.begin = nullptr;
1396 } else {
1397 begin_storage.SetMinPossibleForUserKey(*begin);
1398 manual.begin = &begin_storage;
1399 }
1400 if (end == nullptr ||
1401 cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
1402 cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1403 manual.end = nullptr;
1404 } else {
1405 end_storage.SetMaxPossibleForUserKey(*end);
1406 manual.end = &end_storage;
1407 }
1408
1409 TEST_SYNC_POINT("DBImpl::RunManualCompaction:0");
1410 TEST_SYNC_POINT("DBImpl::RunManualCompaction:1");
1411 InstrumentedMutexLock l(&mutex_);
1412
1413 // When a manual compaction arrives, temporarily disable scheduling of
1414 // non-manual compactions and wait until the number of scheduled compaction
1415 // jobs drops to zero. This is needed to ensure that this manual compaction
1416 // can compact any range of keys/files.
1417 //
1418 // HasPendingManualCompaction() is true when at least one thread is inside
1419 // RunManualCompaction(), i.e. during that time no other compaction will
1420 // get scheduled (see MaybeScheduleFlushOrCompaction).
1421 //
1422 // Note that the following loop doesn't stop more that one thread calling
1423 // RunManualCompaction() from getting to the second while loop below.
1424 // However, only one of them will actually schedule compaction, while
1425 // others will wait on a condition variable until it completes.
1426
1427 AddManualCompaction(&manual);
1428 TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_);
1429 if (exclusive) {
1430 while (bg_bottom_compaction_scheduled_ > 0 ||
1431 bg_compaction_scheduled_ > 0) {
1432 TEST_SYNC_POINT("DBImpl::RunManualCompaction:WaitScheduled");
1433 ROCKS_LOG_INFO(
1434 immutable_db_options_.info_log,
1435 "[%s] Manual compaction waiting for all other scheduled background "
1436 "compactions to finish",
1437 cfd->GetName().c_str());
1438 bg_cv_.Wait();
1439 }
1440 }
1441
1442 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1443 "[%s] Manual compaction starting", cfd->GetName().c_str());
1444
1445 LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
1446 immutable_db_options_.info_log.get());
1447 // We don't check bg_error_ here, because if we get the error in compaction,
1448 // the compaction will set manual.status to bg_error_ and set manual.done to
1449 // true.
1450 while (!manual.done) {
1451 assert(HasPendingManualCompaction());
1452 manual_conflict = false;
1453 Compaction* compaction = nullptr;
1454 if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) ||
1455 scheduled ||
1456 (((manual.manual_end = &manual.tmp_storage1) != nullptr) &&
1457 ((compaction = manual.cfd->CompactRange(
1458 *manual.cfd->GetLatestMutableCFOptions(), manual.input_level,
1459 manual.output_level, compact_range_options, manual.begin,
1460 manual.end, &manual.manual_end, &manual_conflict,
1461 max_file_num_to_ignore)) == nullptr &&
1462 manual_conflict))) {
1463 // exclusive manual compactions should not see a conflict during
1464 // CompactRange
1465 assert(!exclusive || !manual_conflict);
1466 // Running either this or some other manual compaction
1467 bg_cv_.Wait();
1468 if (scheduled && manual.incomplete == true) {
1469 assert(!manual.in_progress);
1470 scheduled = false;
1471 manual.incomplete = false;
1472 }
1473 } else if (!scheduled) {
1474 if (compaction == nullptr) {
1475 manual.done = true;
1476 bg_cv_.SignalAll();
1477 continue;
1478 }
1479 ca = new CompactionArg;
1480 ca->db = this;
1481 ca->prepicked_compaction = new PrepickedCompaction;
1482 ca->prepicked_compaction->manual_compaction_state = &manual;
1483 ca->prepicked_compaction->compaction = compaction;
1484 if (!RequestCompactionToken(
1485 cfd, true, &ca->prepicked_compaction->task_token, &log_buffer)) {
1486 // Don't throttle manual compaction, only count outstanding tasks.
1487 assert(false);
1488 }
1489 manual.incomplete = false;
1490 bg_compaction_scheduled_++;
1491 env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
1492 &DBImpl::UnscheduleCompactionCallback);
1493 scheduled = true;
1494 }
1495 }
1496
1497 log_buffer.FlushBufferToLog();
1498 assert(!manual.in_progress);
1499 assert(HasPendingManualCompaction());
1500 RemoveManualCompaction(&manual);
1501 bg_cv_.SignalAll();
1502 return manual.status;
1503 }
1504
1505 void DBImpl::GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
1506 FlushRequest* req) {
1507 assert(req != nullptr);
1508 req->reserve(cfds.size());
1509 for (const auto cfd : cfds) {
1510 if (nullptr == cfd) {
1511 // cfd may be null, see DBImpl::ScheduleFlushes
1512 continue;
1513 }
1514 uint64_t max_memtable_id = cfd->imm()->GetLatestMemTableID();
1515 req->emplace_back(cfd, max_memtable_id);
1516 }
1517 }
1518
1519 Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
1520 const FlushOptions& flush_options,
1521 FlushReason flush_reason, bool writes_stopped) {
1522 Status s;
1523 uint64_t flush_memtable_id = 0;
1524 if (!flush_options.allow_write_stall) {
1525 bool flush_needed = true;
1526 s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
1527 TEST_SYNC_POINT("DBImpl::FlushMemTable:StallWaitDone");
1528 if (!s.ok() || !flush_needed) {
1529 return s;
1530 }
1531 }
1532 FlushRequest flush_req;
1533 {
1534 WriteContext context;
1535 InstrumentedMutexLock guard_lock(&mutex_);
1536
1537 WriteThread::Writer w;
1538 WriteThread::Writer nonmem_w;
1539 if (!writes_stopped) {
1540 write_thread_.EnterUnbatched(&w, &mutex_);
1541 if (two_write_queues_) {
1542 nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
1543 }
1544 }
1545 WaitForPendingWrites();
1546
1547 if (!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load()) {
1548 s = SwitchMemtable(cfd, &context);
1549 }
1550 if (s.ok()) {
1551 if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
1552 !cached_recoverable_state_empty_.load()) {
1553 flush_memtable_id = cfd->imm()->GetLatestMemTableID();
1554 flush_req.emplace_back(cfd, flush_memtable_id);
1555 }
1556 if (immutable_db_options_.persist_stats_to_disk) {
1557 ColumnFamilyData* cfd_stats =
1558 versions_->GetColumnFamilySet()->GetColumnFamily(
1559 kPersistentStatsColumnFamilyName);
1560 if (cfd_stats != nullptr && cfd_stats != cfd &&
1561 !cfd_stats->mem()->IsEmpty()) {
1562 // only force flush stats CF when it will be the only CF lagging
1563 // behind after the current flush
1564 bool stats_cf_flush_needed = true;
1565 for (auto* loop_cfd : *versions_->GetColumnFamilySet()) {
1566 if (loop_cfd == cfd_stats || loop_cfd == cfd) {
1567 continue;
1568 }
1569 if (loop_cfd->GetLogNumber() <= cfd_stats->GetLogNumber()) {
1570 stats_cf_flush_needed = false;
1571 }
1572 }
1573 if (stats_cf_flush_needed) {
1574 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1575 "Force flushing stats CF with manual flush of %s "
1576 "to avoid holding old logs",
1577 cfd->GetName().c_str());
1578 s = SwitchMemtable(cfd_stats, &context);
1579 flush_memtable_id = cfd_stats->imm()->GetLatestMemTableID();
1580 flush_req.emplace_back(cfd_stats, flush_memtable_id);
1581 }
1582 }
1583 }
1584 }
1585
1586 if (s.ok() && !flush_req.empty()) {
1587 for (auto& elem : flush_req) {
1588 ColumnFamilyData* loop_cfd = elem.first;
1589 loop_cfd->imm()->FlushRequested();
1590 }
1591 // If the caller wants to wait for this flush to complete, it indicates
1592 // that the caller expects the ColumnFamilyData not to be free'ed by
1593 // other threads which may drop the column family concurrently.
1594 // Therefore, we increase the cfd's ref count.
1595 if (flush_options.wait) {
1596 for (auto& elem : flush_req) {
1597 ColumnFamilyData* loop_cfd = elem.first;
1598 loop_cfd->Ref();
1599 }
1600 }
1601 SchedulePendingFlush(flush_req, flush_reason);
1602 MaybeScheduleFlushOrCompaction();
1603 }
1604
1605 if (!writes_stopped) {
1606 write_thread_.ExitUnbatched(&w);
1607 if (two_write_queues_) {
1608 nonmem_write_thread_.ExitUnbatched(&nonmem_w);
1609 }
1610 }
1611 }
1612 TEST_SYNC_POINT("DBImpl::FlushMemTable:AfterScheduleFlush");
1613 TEST_SYNC_POINT("DBImpl::FlushMemTable:BeforeWaitForBgFlush");
1614 if (s.ok() && flush_options.wait) {
1615 autovector<ColumnFamilyData*> cfds;
1616 autovector<const uint64_t*> flush_memtable_ids;
1617 for (auto& iter : flush_req) {
1618 cfds.push_back(iter.first);
1619 flush_memtable_ids.push_back(&(iter.second));
1620 }
1621 s = WaitForFlushMemTables(cfds, flush_memtable_ids,
1622 (flush_reason == FlushReason::kErrorRecovery));
1623 InstrumentedMutexLock lock_guard(&mutex_);
1624 for (auto* tmp_cfd : cfds) {
1625 tmp_cfd->UnrefAndTryDelete();
1626 }
1627 }
1628 TEST_SYNC_POINT("DBImpl::FlushMemTable:FlushMemTableFinished");
1629 return s;
1630 }
1631
1632 // Flush all elements in 'column_family_datas'
1633 // and atomically record the result to the MANIFEST.
1634 Status DBImpl::AtomicFlushMemTables(
1635 const autovector<ColumnFamilyData*>& column_family_datas,
1636 const FlushOptions& flush_options, FlushReason flush_reason,
1637 bool writes_stopped) {
1638 Status s;
1639 if (!flush_options.allow_write_stall) {
1640 int num_cfs_to_flush = 0;
1641 for (auto cfd : column_family_datas) {
1642 bool flush_needed = true;
1643 s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
1644 if (!s.ok()) {
1645 return s;
1646 } else if (flush_needed) {
1647 ++num_cfs_to_flush;
1648 }
1649 }
1650 if (0 == num_cfs_to_flush) {
1651 return s;
1652 }
1653 }
1654 FlushRequest flush_req;
1655 autovector<ColumnFamilyData*> cfds;
1656 {
1657 WriteContext context;
1658 InstrumentedMutexLock guard_lock(&mutex_);
1659
1660 WriteThread::Writer w;
1661 WriteThread::Writer nonmem_w;
1662 if (!writes_stopped) {
1663 write_thread_.EnterUnbatched(&w, &mutex_);
1664 if (two_write_queues_) {
1665 nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
1666 }
1667 }
1668 WaitForPendingWrites();
1669
1670 for (auto cfd : column_family_datas) {
1671 if (cfd->IsDropped()) {
1672 continue;
1673 }
1674 if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
1675 !cached_recoverable_state_empty_.load()) {
1676 cfds.emplace_back(cfd);
1677 }
1678 }
1679 for (auto cfd : cfds) {
1680 if (cfd->mem()->IsEmpty() && cached_recoverable_state_empty_.load()) {
1681 continue;
1682 }
1683 cfd->Ref();
1684 s = SwitchMemtable(cfd, &context);
1685 cfd->UnrefAndTryDelete();
1686 if (!s.ok()) {
1687 break;
1688 }
1689 }
1690 if (s.ok()) {
1691 AssignAtomicFlushSeq(cfds);
1692 for (auto cfd : cfds) {
1693 cfd->imm()->FlushRequested();
1694 }
1695 // If the caller wants to wait for this flush to complete, it indicates
1696 // that the caller expects the ColumnFamilyData not to be free'ed by
1697 // other threads which may drop the column family concurrently.
1698 // Therefore, we increase the cfd's ref count.
1699 if (flush_options.wait) {
1700 for (auto cfd : cfds) {
1701 cfd->Ref();
1702 }
1703 }
1704 GenerateFlushRequest(cfds, &flush_req);
1705 SchedulePendingFlush(flush_req, flush_reason);
1706 MaybeScheduleFlushOrCompaction();
1707 }
1708
1709 if (!writes_stopped) {
1710 write_thread_.ExitUnbatched(&w);
1711 if (two_write_queues_) {
1712 nonmem_write_thread_.ExitUnbatched(&nonmem_w);
1713 }
1714 }
1715 }
1716 TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:AfterScheduleFlush");
1717 TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush");
1718 if (s.ok() && flush_options.wait) {
1719 autovector<const uint64_t*> flush_memtable_ids;
1720 for (auto& iter : flush_req) {
1721 flush_memtable_ids.push_back(&(iter.second));
1722 }
1723 s = WaitForFlushMemTables(cfds, flush_memtable_ids,
1724 (flush_reason == FlushReason::kErrorRecovery));
1725 InstrumentedMutexLock lock_guard(&mutex_);
1726 for (auto* cfd : cfds) {
1727 cfd->UnrefAndTryDelete();
1728 }
1729 }
1730 return s;
1731 }
1732
1733 // Calling FlushMemTable(), whether from DB::Flush() or from Backup Engine, can
1734 // cause write stall, for example if one memtable is being flushed already.
1735 // This method tries to avoid write stall (similar to CompactRange() behavior)
1736 // it emulates how the SuperVersion / LSM would change if flush happens, checks
1737 // it against various constrains and delays flush if it'd cause write stall.
1738 // Called should check status and flush_needed to see if flush already happened.
1739 Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
1740 bool* flush_needed) {
1741 {
1742 *flush_needed = true;
1743 InstrumentedMutexLock l(&mutex_);
1744 uint64_t orig_active_memtable_id = cfd->mem()->GetID();
1745 WriteStallCondition write_stall_condition = WriteStallCondition::kNormal;
1746 do {
1747 if (write_stall_condition != WriteStallCondition::kNormal) {
1748 // Same error handling as user writes: Don't wait if there's a
1749 // background error, even if it's a soft error. We might wait here
1750 // indefinitely as the pending flushes/compactions may never finish
1751 // successfully, resulting in the stall condition lasting indefinitely
1752 if (error_handler_.IsBGWorkStopped()) {
1753 return error_handler_.GetBGError();
1754 }
1755
1756 TEST_SYNC_POINT("DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait");
1757 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1758 "[%s] WaitUntilFlushWouldNotStallWrites"
1759 " waiting on stall conditions to clear",
1760 cfd->GetName().c_str());
1761 bg_cv_.Wait();
1762 }
1763 if (cfd->IsDropped()) {
1764 return Status::ColumnFamilyDropped();
1765 }
1766 if (shutting_down_.load(std::memory_order_acquire)) {
1767 return Status::ShutdownInProgress();
1768 }
1769
1770 uint64_t earliest_memtable_id =
1771 std::min(cfd->mem()->GetID(), cfd->imm()->GetEarliestMemTableID());
1772 if (earliest_memtable_id > orig_active_memtable_id) {
1773 // We waited so long that the memtable we were originally waiting on was
1774 // flushed.
1775 *flush_needed = false;
1776 return Status::OK();
1777 }
1778
1779 const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions();
1780 const auto* vstorage = cfd->current()->storage_info();
1781
1782 // Skip stalling check if we're below auto-flush and auto-compaction
1783 // triggers. If it stalled in these conditions, that'd mean the stall
1784 // triggers are so low that stalling is needed for any background work. In
1785 // that case we shouldn't wait since background work won't be scheduled.
1786 if (cfd->imm()->NumNotFlushed() <
1787 cfd->ioptions()->min_write_buffer_number_to_merge &&
1788 vstorage->l0_delay_trigger_count() <
1789 mutable_cf_options.level0_file_num_compaction_trigger) {
1790 break;
1791 }
1792
1793 // check whether one extra immutable memtable or an extra L0 file would
1794 // cause write stalling mode to be entered. It could still enter stall
1795 // mode due to pending compaction bytes, but that's less common
1796 write_stall_condition =
1797 ColumnFamilyData::GetWriteStallConditionAndCause(
1798 cfd->imm()->NumNotFlushed() + 1,
1799 vstorage->l0_delay_trigger_count() + 1,
1800 vstorage->estimated_compaction_needed_bytes(), mutable_cf_options)
1801 .first;
1802 } while (write_stall_condition != WriteStallCondition::kNormal);
1803 }
1804 return Status::OK();
1805 }
1806
1807 // Wait for memtables to be flushed for multiple column families.
1808 // let N = cfds.size()
1809 // for i in [0, N),
1810 // 1) if flush_memtable_ids[i] is not null, then the memtables with lower IDs
1811 // have to be flushed for THIS column family;
1812 // 2) if flush_memtable_ids[i] is null, then all memtables in THIS column
1813 // family have to be flushed.
1814 // Finish waiting when ALL column families finish flushing memtables.
1815 // resuming_from_bg_err indicates whether the caller is trying to resume from
1816 // background error or in normal processing.
1817 Status DBImpl::WaitForFlushMemTables(
1818 const autovector<ColumnFamilyData*>& cfds,
1819 const autovector<const uint64_t*>& flush_memtable_ids,
1820 bool resuming_from_bg_err) {
1821 int num = static_cast<int>(cfds.size());
1822 // Wait until the compaction completes
1823 InstrumentedMutexLock l(&mutex_);
1824 // If the caller is trying to resume from bg error, then
1825 // error_handler_.IsDBStopped() is true.
1826 while (resuming_from_bg_err || !error_handler_.IsDBStopped()) {
1827 if (shutting_down_.load(std::memory_order_acquire)) {
1828 return Status::ShutdownInProgress();
1829 }
1830 // If an error has occurred during resumption, then no need to wait.
1831 if (!error_handler_.GetRecoveryError().ok()) {
1832 break;
1833 }
1834 // Number of column families that have been dropped.
1835 int num_dropped = 0;
1836 // Number of column families that have finished flush.
1837 int num_finished = 0;
1838 for (int i = 0; i < num; ++i) {
1839 if (cfds[i]->IsDropped()) {
1840 ++num_dropped;
1841 } else if (cfds[i]->imm()->NumNotFlushed() == 0 ||
1842 (flush_memtable_ids[i] != nullptr &&
1843 cfds[i]->imm()->GetEarliestMemTableID() >
1844 *flush_memtable_ids[i])) {
1845 ++num_finished;
1846 }
1847 }
1848 if (1 == num_dropped && 1 == num) {
1849 return Status::InvalidArgument("Cannot flush a dropped CF");
1850 }
1851 // Column families involved in this flush request have either been dropped
1852 // or finished flush. Then it's time to finish waiting.
1853 if (num_dropped + num_finished == num) {
1854 break;
1855 }
1856 bg_cv_.Wait();
1857 }
1858 Status s;
1859 // If not resuming from bg error, and an error has caused the DB to stop,
1860 // then report the bg error to caller.
1861 if (!resuming_from_bg_err && error_handler_.IsDBStopped()) {
1862 s = error_handler_.GetBGError();
1863 }
1864 return s;
1865 }
1866
1867 Status DBImpl::EnableAutoCompaction(
1868 const std::vector<ColumnFamilyHandle*>& column_family_handles) {
1869 Status s;
1870 for (auto cf_ptr : column_family_handles) {
1871 Status status =
1872 this->SetOptions(cf_ptr, {{"disable_auto_compactions", "false"}});
1873 if (!status.ok()) {
1874 s = status;
1875 }
1876 }
1877
1878 return s;
1879 }
1880
1881 void DBImpl::DisableManualCompaction() {
1882 manual_compaction_paused_.store(true, std::memory_order_release);
1883 }
1884
1885 void DBImpl::EnableManualCompaction() {
1886 manual_compaction_paused_.store(false, std::memory_order_release);
1887 }
1888
1889 void DBImpl::MaybeScheduleFlushOrCompaction() {
1890 mutex_.AssertHeld();
1891 if (!opened_successfully_) {
1892 // Compaction may introduce data race to DB open
1893 return;
1894 }
1895 if (bg_work_paused_ > 0) {
1896 // we paused the background work
1897 return;
1898 } else if (error_handler_.IsBGWorkStopped() &&
1899 !error_handler_.IsRecoveryInProgress()) {
1900 // There has been a hard error and this call is not part of the recovery
1901 // sequence. Bail out here so we don't get into an endless loop of
1902 // scheduling BG work which will again call this function
1903 return;
1904 } else if (shutting_down_.load(std::memory_order_acquire)) {
1905 // DB is being deleted; no more background compactions
1906 return;
1907 }
1908 auto bg_job_limits = GetBGJobLimits();
1909 bool is_flush_pool_empty =
1910 env_->GetBackgroundThreads(Env::Priority::HIGH) == 0;
1911 while (!is_flush_pool_empty && unscheduled_flushes_ > 0 &&
1912 bg_flush_scheduled_ < bg_job_limits.max_flushes) {
1913 bg_flush_scheduled_++;
1914 FlushThreadArg* fta = new FlushThreadArg;
1915 fta->db_ = this;
1916 fta->thread_pri_ = Env::Priority::HIGH;
1917 env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::HIGH, this,
1918 &DBImpl::UnscheduleFlushCallback);
1919 --unscheduled_flushes_;
1920 TEST_SYNC_POINT_CALLBACK(
1921 "DBImpl::MaybeScheduleFlushOrCompaction:AfterSchedule:0",
1922 &unscheduled_flushes_);
1923 }
1924
1925 // special case -- if high-pri (flush) thread pool is empty, then schedule
1926 // flushes in low-pri (compaction) thread pool.
1927 if (is_flush_pool_empty) {
1928 while (unscheduled_flushes_ > 0 &&
1929 bg_flush_scheduled_ + bg_compaction_scheduled_ <
1930 bg_job_limits.max_flushes) {
1931 bg_flush_scheduled_++;
1932 FlushThreadArg* fta = new FlushThreadArg;
1933 fta->db_ = this;
1934 fta->thread_pri_ = Env::Priority::LOW;
1935 env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::LOW, this,
1936 &DBImpl::UnscheduleFlushCallback);
1937 --unscheduled_flushes_;
1938 }
1939 }
1940
1941 if (bg_compaction_paused_ > 0) {
1942 // we paused the background compaction
1943 return;
1944 } else if (error_handler_.IsBGWorkStopped()) {
1945 // Compaction is not part of the recovery sequence from a hard error. We
1946 // might get here because recovery might do a flush and install a new
1947 // super version, which will try to schedule pending compactions. Bail
1948 // out here and let the higher level recovery handle compactions
1949 return;
1950 }
1951
1952 if (HasExclusiveManualCompaction()) {
1953 // only manual compactions are allowed to run. don't schedule automatic
1954 // compactions
1955 TEST_SYNC_POINT("DBImpl::MaybeScheduleFlushOrCompaction:Conflict");
1956 return;
1957 }
1958
1959 while (bg_compaction_scheduled_ < bg_job_limits.max_compactions &&
1960 unscheduled_compactions_ > 0) {
1961 CompactionArg* ca = new CompactionArg;
1962 ca->db = this;
1963 ca->prepicked_compaction = nullptr;
1964 bg_compaction_scheduled_++;
1965 unscheduled_compactions_--;
1966 env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
1967 &DBImpl::UnscheduleCompactionCallback);
1968 }
1969 }
1970
1971 DBImpl::BGJobLimits DBImpl::GetBGJobLimits() const {
1972 mutex_.AssertHeld();
1973 return GetBGJobLimits(immutable_db_options_.max_background_flushes,
1974 mutable_db_options_.max_background_compactions,
1975 mutable_db_options_.max_background_jobs,
1976 write_controller_.NeedSpeedupCompaction());
1977 }
1978
1979 DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes,
1980 int max_background_compactions,
1981 int max_background_jobs,
1982 bool parallelize_compactions) {
1983 BGJobLimits res;
1984 if (max_background_flushes == -1 && max_background_compactions == -1) {
1985 // for our first stab implementing max_background_jobs, simply allocate a
1986 // quarter of the threads to flushes.
1987 res.max_flushes = std::max(1, max_background_jobs / 4);
1988 res.max_compactions = std::max(1, max_background_jobs - res.max_flushes);
1989 } else {
1990 // compatibility code in case users haven't migrated to max_background_jobs,
1991 // which automatically computes flush/compaction limits
1992 res.max_flushes = std::max(1, max_background_flushes);
1993 res.max_compactions = std::max(1, max_background_compactions);
1994 }
1995 if (!parallelize_compactions) {
1996 // throttle background compactions until we deem necessary
1997 res.max_compactions = 1;
1998 }
1999 return res;
2000 }
2001
2002 void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) {
2003 assert(!cfd->queued_for_compaction());
2004 cfd->Ref();
2005 compaction_queue_.push_back(cfd);
2006 cfd->set_queued_for_compaction(true);
2007 }
2008
2009 ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() {
2010 assert(!compaction_queue_.empty());
2011 auto cfd = *compaction_queue_.begin();
2012 compaction_queue_.pop_front();
2013 assert(cfd->queued_for_compaction());
2014 cfd->set_queued_for_compaction(false);
2015 return cfd;
2016 }
2017
2018 DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() {
2019 assert(!flush_queue_.empty());
2020 FlushRequest flush_req = flush_queue_.front();
2021 flush_queue_.pop_front();
2022 // TODO: need to unset flush reason?
2023 return flush_req;
2024 }
2025
2026 ColumnFamilyData* DBImpl::PickCompactionFromQueue(
2027 std::unique_ptr<TaskLimiterToken>* token, LogBuffer* log_buffer) {
2028 assert(!compaction_queue_.empty());
2029 assert(*token == nullptr);
2030 autovector<ColumnFamilyData*> throttled_candidates;
2031 ColumnFamilyData* cfd = nullptr;
2032 while (!compaction_queue_.empty()) {
2033 auto first_cfd = *compaction_queue_.begin();
2034 compaction_queue_.pop_front();
2035 assert(first_cfd->queued_for_compaction());
2036 if (!RequestCompactionToken(first_cfd, false, token, log_buffer)) {
2037 throttled_candidates.push_back(first_cfd);
2038 continue;
2039 }
2040 cfd = first_cfd;
2041 cfd->set_queued_for_compaction(false);
2042 break;
2043 }
2044 // Add throttled compaction candidates back to queue in the original order.
2045 for (auto iter = throttled_candidates.rbegin();
2046 iter != throttled_candidates.rend(); ++iter) {
2047 compaction_queue_.push_front(*iter);
2048 }
2049 return cfd;
2050 }
2051
2052 void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req,
2053 FlushReason flush_reason) {
2054 if (flush_req.empty()) {
2055 return;
2056 }
2057 for (auto& iter : flush_req) {
2058 ColumnFamilyData* cfd = iter.first;
2059 cfd->Ref();
2060 cfd->SetFlushReason(flush_reason);
2061 }
2062 ++unscheduled_flushes_;
2063 flush_queue_.push_back(flush_req);
2064 }
2065
2066 void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
2067 if (!cfd->queued_for_compaction() && cfd->NeedsCompaction()) {
2068 AddToCompactionQueue(cfd);
2069 ++unscheduled_compactions_;
2070 }
2071 }
2072
2073 void DBImpl::SchedulePendingPurge(std::string fname, std::string dir_to_sync,
2074 FileType type, uint64_t number, int job_id) {
2075 mutex_.AssertHeld();
2076 PurgeFileInfo file_info(fname, dir_to_sync, type, number, job_id);
2077 purge_files_.insert({{number, std::move(file_info)}});
2078 }
2079
2080 void DBImpl::BGWorkFlush(void* arg) {
2081 FlushThreadArg fta = *(reinterpret_cast<FlushThreadArg*>(arg));
2082 delete reinterpret_cast<FlushThreadArg*>(arg);
2083
2084 IOSTATS_SET_THREAD_POOL_ID(fta.thread_pri_);
2085 TEST_SYNC_POINT("DBImpl::BGWorkFlush");
2086 static_cast_with_check<DBImpl, DB>(fta.db_)->BackgroundCallFlush(
2087 fta.thread_pri_);
2088 TEST_SYNC_POINT("DBImpl::BGWorkFlush:done");
2089 }
2090
2091 void DBImpl::BGWorkCompaction(void* arg) {
2092 CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
2093 delete reinterpret_cast<CompactionArg*>(arg);
2094 IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW);
2095 TEST_SYNC_POINT("DBImpl::BGWorkCompaction");
2096 auto prepicked_compaction =
2097 static_cast<PrepickedCompaction*>(ca.prepicked_compaction);
2098 static_cast_with_check<DBImpl, DB>(ca.db)->BackgroundCallCompaction(
2099 prepicked_compaction, Env::Priority::LOW);
2100 delete prepicked_compaction;
2101 }
2102
2103 void DBImpl::BGWorkBottomCompaction(void* arg) {
2104 CompactionArg ca = *(static_cast<CompactionArg*>(arg));
2105 delete static_cast<CompactionArg*>(arg);
2106 IOSTATS_SET_THREAD_POOL_ID(Env::Priority::BOTTOM);
2107 TEST_SYNC_POINT("DBImpl::BGWorkBottomCompaction");
2108 auto* prepicked_compaction = ca.prepicked_compaction;
2109 assert(prepicked_compaction && prepicked_compaction->compaction &&
2110 !prepicked_compaction->manual_compaction_state);
2111 ca.db->BackgroundCallCompaction(prepicked_compaction, Env::Priority::BOTTOM);
2112 delete prepicked_compaction;
2113 }
2114
2115 void DBImpl::BGWorkPurge(void* db) {
2116 IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
2117 TEST_SYNC_POINT("DBImpl::BGWorkPurge:start");
2118 reinterpret_cast<DBImpl*>(db)->BackgroundCallPurge();
2119 TEST_SYNC_POINT("DBImpl::BGWorkPurge:end");
2120 }
2121
2122 void DBImpl::UnscheduleCompactionCallback(void* arg) {
2123 CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
2124 delete reinterpret_cast<CompactionArg*>(arg);
2125 if (ca.prepicked_compaction != nullptr) {
2126 if (ca.prepicked_compaction->compaction != nullptr) {
2127 delete ca.prepicked_compaction->compaction;
2128 }
2129 delete ca.prepicked_compaction;
2130 }
2131 TEST_SYNC_POINT("DBImpl::UnscheduleCompactionCallback");
2132 }
2133
2134 void DBImpl::UnscheduleFlushCallback(void* arg) {
2135 delete reinterpret_cast<FlushThreadArg*>(arg);
2136 TEST_SYNC_POINT("DBImpl::UnscheduleFlushCallback");
2137 }
2138
2139 Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
2140 LogBuffer* log_buffer, FlushReason* reason,
2141 Env::Priority thread_pri) {
2142 mutex_.AssertHeld();
2143
2144 Status status;
2145 *reason = FlushReason::kOthers;
2146 // If BG work is stopped due to an error, but a recovery is in progress,
2147 // that means this flush is part of the recovery. So allow it to go through
2148 if (!error_handler_.IsBGWorkStopped()) {
2149 if (shutting_down_.load(std::memory_order_acquire)) {
2150 status = Status::ShutdownInProgress();
2151 }
2152 } else if (!error_handler_.IsRecoveryInProgress()) {
2153 status = error_handler_.GetBGError();
2154 }
2155
2156 if (!status.ok()) {
2157 return status;
2158 }
2159
2160 autovector<BGFlushArg> bg_flush_args;
2161 std::vector<SuperVersionContext>& superversion_contexts =
2162 job_context->superversion_contexts;
2163 autovector<ColumnFamilyData*> column_families_not_to_flush;
2164 while (!flush_queue_.empty()) {
2165 // This cfd is already referenced
2166 const FlushRequest& flush_req = PopFirstFromFlushQueue();
2167 superversion_contexts.clear();
2168 superversion_contexts.reserve(flush_req.size());
2169
2170 for (const auto& iter : flush_req) {
2171 ColumnFamilyData* cfd = iter.first;
2172 if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) {
2173 // can't flush this CF, try next one
2174 column_families_not_to_flush.push_back(cfd);
2175 continue;
2176 }
2177 superversion_contexts.emplace_back(SuperVersionContext(true));
2178 bg_flush_args.emplace_back(cfd, iter.second,
2179 &(superversion_contexts.back()));
2180 }
2181 if (!bg_flush_args.empty()) {
2182 break;
2183 }
2184 }
2185
2186 if (!bg_flush_args.empty()) {
2187 auto bg_job_limits = GetBGJobLimits();
2188 for (const auto& arg : bg_flush_args) {
2189 ColumnFamilyData* cfd = arg.cfd_;
2190 ROCKS_LOG_BUFFER(
2191 log_buffer,
2192 "Calling FlushMemTableToOutputFile with column "
2193 "family [%s], flush slots available %d, compaction slots available "
2194 "%d, "
2195 "flush slots scheduled %d, compaction slots scheduled %d",
2196 cfd->GetName().c_str(), bg_job_limits.max_flushes,
2197 bg_job_limits.max_compactions, bg_flush_scheduled_,
2198 bg_compaction_scheduled_);
2199 }
2200 status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress,
2201 job_context, log_buffer, thread_pri);
2202 TEST_SYNC_POINT("DBImpl::BackgroundFlush:BeforeFlush");
2203 // All the CFDs in the FlushReq must have the same flush reason, so just
2204 // grab the first one
2205 *reason = bg_flush_args[0].cfd_->GetFlushReason();
2206 for (auto& arg : bg_flush_args) {
2207 ColumnFamilyData* cfd = arg.cfd_;
2208 if (cfd->UnrefAndTryDelete()) {
2209 arg.cfd_ = nullptr;
2210 }
2211 }
2212 }
2213 for (auto cfd : column_families_not_to_flush) {
2214 cfd->UnrefAndTryDelete();
2215 }
2216 return status;
2217 }
2218
2219 void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
2220 bool made_progress = false;
2221 JobContext job_context(next_job_id_.fetch_add(1), true);
2222
2223 TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:start");
2224
2225 LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
2226 immutable_db_options_.info_log.get());
2227 {
2228 InstrumentedMutexLock l(&mutex_);
2229 assert(bg_flush_scheduled_);
2230 num_running_flushes_++;
2231
2232 std::unique_ptr<std::list<uint64_t>::iterator>
2233 pending_outputs_inserted_elem(new std::list<uint64_t>::iterator(
2234 CaptureCurrentFileNumberInPendingOutputs()));
2235 FlushReason reason;
2236
2237 Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer,
2238 &reason, thread_pri);
2239 if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped() &&
2240 reason != FlushReason::kErrorRecovery) {
2241 // Wait a little bit before retrying background flush in
2242 // case this is an environmental problem and we do not want to
2243 // chew up resources for failed flushes for the duration of
2244 // the problem.
2245 uint64_t error_cnt =
2246 default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
2247 bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
2248 mutex_.Unlock();
2249 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2250 "Waiting after background flush error: %s"
2251 "Accumulated background error counts: %" PRIu64,
2252 s.ToString().c_str(), error_cnt);
2253 log_buffer.FlushBufferToLog();
2254 LogFlush(immutable_db_options_.info_log);
2255 env_->SleepForMicroseconds(1000000);
2256 mutex_.Lock();
2257 }
2258
2259 TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FlushFinish:0");
2260 ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
2261
2262 // If flush failed, we want to delete all temporary files that we might have
2263 // created. Thus, we force full scan in FindObsoleteFiles()
2264 FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
2265 !s.IsColumnFamilyDropped());
2266 // delete unnecessary files if any, this is done outside the mutex
2267 if (job_context.HaveSomethingToClean() ||
2268 job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
2269 mutex_.Unlock();
2270 TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FilesFound");
2271 // Have to flush the info logs before bg_flush_scheduled_--
2272 // because if bg_flush_scheduled_ becomes 0 and the lock is
2273 // released, the deconstructor of DB can kick in and destroy all the
2274 // states of DB so info_log might not be available after that point.
2275 // It also applies to access other states that DB owns.
2276 log_buffer.FlushBufferToLog();
2277 if (job_context.HaveSomethingToDelete()) {
2278 PurgeObsoleteFiles(job_context);
2279 }
2280 job_context.Clean();
2281 mutex_.Lock();
2282 }
2283 TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:ContextCleanedUp");
2284
2285 assert(num_running_flushes_ > 0);
2286 num_running_flushes_--;
2287 bg_flush_scheduled_--;
2288 // See if there's more work to be done
2289 MaybeScheduleFlushOrCompaction();
2290 atomic_flush_install_cv_.SignalAll();
2291 bg_cv_.SignalAll();
2292 // IMPORTANT: there should be no code after calling SignalAll. This call may
2293 // signal the DB destructor that it's OK to proceed with destruction. In
2294 // that case, all DB variables will be dealloacated and referencing them
2295 // will cause trouble.
2296 }
2297 }
2298
2299 void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
2300 Env::Priority bg_thread_pri) {
2301 bool made_progress = false;
2302 JobContext job_context(next_job_id_.fetch_add(1), true);
2303 TEST_SYNC_POINT("BackgroundCallCompaction:0");
2304 LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
2305 immutable_db_options_.info_log.get());
2306 {
2307 InstrumentedMutexLock l(&mutex_);
2308
2309 // This call will unlock/lock the mutex to wait for current running
2310 // IngestExternalFile() calls to finish.
2311 WaitForIngestFile();
2312
2313 num_running_compactions_++;
2314
2315 std::unique_ptr<std::list<uint64_t>::iterator>
2316 pending_outputs_inserted_elem(new std::list<uint64_t>::iterator(
2317 CaptureCurrentFileNumberInPendingOutputs()));
2318
2319 assert((bg_thread_pri == Env::Priority::BOTTOM &&
2320 bg_bottom_compaction_scheduled_) ||
2321 (bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_));
2322 Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer,
2323 prepicked_compaction, bg_thread_pri);
2324 TEST_SYNC_POINT("BackgroundCallCompaction:1");
2325 if (s.IsBusy()) {
2326 bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
2327 mutex_.Unlock();
2328 env_->SleepForMicroseconds(10000); // prevent hot loop
2329 mutex_.Lock();
2330 } else if (!s.ok() && !s.IsShutdownInProgress() &&
2331 !s.IsManualCompactionPaused() && !s.IsColumnFamilyDropped()) {
2332 // Wait a little bit before retrying background compaction in
2333 // case this is an environmental problem and we do not want to
2334 // chew up resources for failed compactions for the duration of
2335 // the problem.
2336 uint64_t error_cnt =
2337 default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
2338 bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
2339 mutex_.Unlock();
2340 log_buffer.FlushBufferToLog();
2341 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2342 "Waiting after background compaction error: %s, "
2343 "Accumulated background error counts: %" PRIu64,
2344 s.ToString().c_str(), error_cnt);
2345 LogFlush(immutable_db_options_.info_log);
2346 env_->SleepForMicroseconds(1000000);
2347 mutex_.Lock();
2348 } else if (s.IsManualCompactionPaused()) {
2349 ManualCompactionState* m = prepicked_compaction->manual_compaction_state;
2350 assert(m);
2351 ROCKS_LOG_BUFFER(&log_buffer, "[%s] [JOB %d] Manual compaction paused",
2352 m->cfd->GetName().c_str(), job_context.job_id);
2353 }
2354
2355 ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
2356
2357 // If compaction failed, we want to delete all temporary files that we might
2358 // have created (they might not be all recorded in job_context in case of a
2359 // failure). Thus, we force full scan in FindObsoleteFiles()
2360 FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
2361 !s.IsManualCompactionPaused() &&
2362 !s.IsColumnFamilyDropped());
2363 TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles");
2364
2365 // delete unnecessary files if any, this is done outside the mutex
2366 if (job_context.HaveSomethingToClean() ||
2367 job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
2368 mutex_.Unlock();
2369 // Have to flush the info logs before bg_compaction_scheduled_--
2370 // because if bg_flush_scheduled_ becomes 0 and the lock is
2371 // released, the deconstructor of DB can kick in and destroy all the
2372 // states of DB so info_log might not be available after that point.
2373 // It also applies to access other states that DB owns.
2374 log_buffer.FlushBufferToLog();
2375 if (job_context.HaveSomethingToDelete()) {
2376 PurgeObsoleteFiles(job_context);
2377 TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles");
2378 }
2379 job_context.Clean();
2380 mutex_.Lock();
2381 }
2382
2383 assert(num_running_compactions_ > 0);
2384 num_running_compactions_--;
2385 if (bg_thread_pri == Env::Priority::LOW) {
2386 bg_compaction_scheduled_--;
2387 } else {
2388 assert(bg_thread_pri == Env::Priority::BOTTOM);
2389 bg_bottom_compaction_scheduled_--;
2390 }
2391
2392 versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
2393
2394 // See if there's more work to be done
2395 MaybeScheduleFlushOrCompaction();
2396 if (made_progress ||
2397 (bg_compaction_scheduled_ == 0 &&
2398 bg_bottom_compaction_scheduled_ == 0) ||
2399 HasPendingManualCompaction() || unscheduled_compactions_ == 0) {
2400 // signal if
2401 // * made_progress -- need to wakeup DelayWrite
2402 // * bg_{bottom,}_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
2403 // * HasPendingManualCompaction -- need to wakeup RunManualCompaction
2404 // If none of this is true, there is no need to signal since nobody is
2405 // waiting for it
2406 bg_cv_.SignalAll();
2407 }
2408 // IMPORTANT: there should be no code after calling SignalAll. This call may
2409 // signal the DB destructor that it's OK to proceed with destruction. In
2410 // that case, all DB variables will be dealloacated and referencing them
2411 // will cause trouble.
2412 }
2413 }
2414
2415 Status DBImpl::BackgroundCompaction(bool* made_progress,
2416 JobContext* job_context,
2417 LogBuffer* log_buffer,
2418 PrepickedCompaction* prepicked_compaction,
2419 Env::Priority thread_pri) {
2420 ManualCompactionState* manual_compaction =
2421 prepicked_compaction == nullptr
2422 ? nullptr
2423 : prepicked_compaction->manual_compaction_state;
2424 *made_progress = false;
2425 mutex_.AssertHeld();
2426 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Start");
2427
2428 bool is_manual = (manual_compaction != nullptr);
2429 std::unique_ptr<Compaction> c;
2430 if (prepicked_compaction != nullptr &&
2431 prepicked_compaction->compaction != nullptr) {
2432 c.reset(prepicked_compaction->compaction);
2433 }
2434 bool is_prepicked = is_manual || c;
2435
2436 // (manual_compaction->in_progress == false);
2437 bool trivial_move_disallowed =
2438 is_manual && manual_compaction->disallow_trivial_move;
2439
2440 CompactionJobStats compaction_job_stats;
2441 Status status;
2442 if (!error_handler_.IsBGWorkStopped()) {
2443 if (shutting_down_.load(std::memory_order_acquire)) {
2444 status = Status::ShutdownInProgress();
2445 } else if (is_manual &&
2446 manual_compaction_paused_.load(std::memory_order_acquire)) {
2447 status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
2448 }
2449 } else {
2450 status = error_handler_.GetBGError();
2451 // If we get here, it means a hard error happened after this compaction
2452 // was scheduled by MaybeScheduleFlushOrCompaction(), but before it got
2453 // a chance to execute. Since we didn't pop a cfd from the compaction
2454 // queue, increment unscheduled_compactions_
2455 unscheduled_compactions_++;
2456 }
2457
2458 if (!status.ok()) {
2459 if (is_manual) {
2460 manual_compaction->status = status;
2461 manual_compaction->done = true;
2462 manual_compaction->in_progress = false;
2463 manual_compaction = nullptr;
2464 }
2465 if (c) {
2466 c->ReleaseCompactionFiles(status);
2467 c.reset();
2468 }
2469 return status;
2470 }
2471
2472 if (is_manual) {
2473 // another thread cannot pick up the same work
2474 manual_compaction->in_progress = true;
2475 }
2476
2477 std::unique_ptr<TaskLimiterToken> task_token;
2478
2479 // InternalKey manual_end_storage;
2480 // InternalKey* manual_end = &manual_end_storage;
2481 bool sfm_reserved_compact_space = false;
2482 if (is_manual) {
2483 ManualCompactionState* m = manual_compaction;
2484 assert(m->in_progress);
2485 if (!c) {
2486 m->done = true;
2487 m->manual_end = nullptr;
2488 ROCKS_LOG_BUFFER(log_buffer,
2489 "[%s] Manual compaction from level-%d from %s .. "
2490 "%s; nothing to do\n",
2491 m->cfd->GetName().c_str(), m->input_level,
2492 (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
2493 (m->end ? m->end->DebugString().c_str() : "(end)"));
2494 } else {
2495 // First check if we have enough room to do the compaction
2496 bool enough_room = EnoughRoomForCompaction(
2497 m->cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer);
2498
2499 if (!enough_room) {
2500 // Then don't do the compaction
2501 c->ReleaseCompactionFiles(status);
2502 c.reset();
2503 // m's vars will get set properly at the end of this function,
2504 // as long as status == CompactionTooLarge
2505 status = Status::CompactionTooLarge();
2506 } else {
2507 ROCKS_LOG_BUFFER(
2508 log_buffer,
2509 "[%s] Manual compaction from level-%d to level-%d from %s .. "
2510 "%s; will stop at %s\n",
2511 m->cfd->GetName().c_str(), m->input_level, c->output_level(),
2512 (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
2513 (m->end ? m->end->DebugString().c_str() : "(end)"),
2514 ((m->done || m->manual_end == nullptr)
2515 ? "(end)"
2516 : m->manual_end->DebugString().c_str()));
2517 }
2518 }
2519 } else if (!is_prepicked && !compaction_queue_.empty()) {
2520 if (HasExclusiveManualCompaction()) {
2521 // Can't compact right now, but try again later
2522 TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict");
2523
2524 // Stay in the compaction queue.
2525 unscheduled_compactions_++;
2526
2527 return Status::OK();
2528 }
2529
2530 auto cfd = PickCompactionFromQueue(&task_token, log_buffer);
2531 if (cfd == nullptr) {
2532 // Can't find any executable task from the compaction queue.
2533 // All tasks have been throttled by compaction thread limiter.
2534 ++unscheduled_compactions_;
2535 return Status::Busy();
2536 }
2537
2538 // We unreference here because the following code will take a Ref() on
2539 // this cfd if it is going to use it (Compaction class holds a
2540 // reference).
2541 // This will all happen under a mutex so we don't have to be afraid of
2542 // somebody else deleting it.
2543 if (cfd->UnrefAndTryDelete()) {
2544 // This was the last reference of the column family, so no need to
2545 // compact.
2546 return Status::OK();
2547 }
2548
2549 // Pick up latest mutable CF Options and use it throughout the
2550 // compaction job
2551 // Compaction makes a copy of the latest MutableCFOptions. It should be used
2552 // throughout the compaction procedure to make sure consistency. It will
2553 // eventually be installed into SuperVersion
2554 auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
2555 if (!mutable_cf_options->disable_auto_compactions && !cfd->IsDropped()) {
2556 // NOTE: try to avoid unnecessary copy of MutableCFOptions if
2557 // compaction is not necessary. Need to make sure mutex is held
2558 // until we make a copy in the following code
2559 TEST_SYNC_POINT("DBImpl::BackgroundCompaction():BeforePickCompaction");
2560 c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer));
2561 TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction");
2562
2563 if (c != nullptr) {
2564 bool enough_room = EnoughRoomForCompaction(
2565 cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer);
2566
2567 if (!enough_room) {
2568 // Then don't do the compaction
2569 c->ReleaseCompactionFiles(status);
2570 c->column_family_data()
2571 ->current()
2572 ->storage_info()
2573 ->ComputeCompactionScore(*(c->immutable_cf_options()),
2574 *(c->mutable_cf_options()));
2575 AddToCompactionQueue(cfd);
2576 ++unscheduled_compactions_;
2577
2578 c.reset();
2579 // Don't need to sleep here, because BackgroundCallCompaction
2580 // will sleep if !s.ok()
2581 status = Status::CompactionTooLarge();
2582 } else {
2583 // update statistics
2584 RecordInHistogram(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
2585 c->inputs(0)->size());
2586 // There are three things that can change compaction score:
2587 // 1) When flush or compaction finish. This case is covered by
2588 // InstallSuperVersionAndScheduleWork
2589 // 2) When MutableCFOptions changes. This case is also covered by
2590 // InstallSuperVersionAndScheduleWork, because this is when the new
2591 // options take effect.
2592 // 3) When we Pick a new compaction, we "remove" those files being
2593 // compacted from the calculation, which then influences compaction
2594 // score. Here we check if we need the new compaction even without the
2595 // files that are currently being compacted. If we need another
2596 // compaction, we might be able to execute it in parallel, so we add
2597 // it to the queue and schedule a new thread.
2598 if (cfd->NeedsCompaction()) {
2599 // Yes, we need more compactions!
2600 AddToCompactionQueue(cfd);
2601 ++unscheduled_compactions_;
2602 MaybeScheduleFlushOrCompaction();
2603 }
2604 }
2605 }
2606 }
2607 }
2608
2609 if (!c) {
2610 // Nothing to do
2611 ROCKS_LOG_BUFFER(log_buffer, "Compaction nothing to do");
2612 } else if (c->deletion_compaction()) {
2613 // TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old
2614 // file if there is alive snapshot pointing to it
2615 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
2616 c->column_family_data());
2617 assert(c->num_input_files(1) == 0);
2618 assert(c->level() == 0);
2619 assert(c->column_family_data()->ioptions()->compaction_style ==
2620 kCompactionStyleFIFO);
2621
2622 compaction_job_stats.num_input_files = c->num_input_files(0);
2623
2624 NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
2625 compaction_job_stats, job_context->job_id);
2626
2627 for (const auto& f : *c->inputs(0)) {
2628 c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
2629 }
2630 status = versions_->LogAndApply(c->column_family_data(),
2631 *c->mutable_cf_options(), c->edit(),
2632 &mutex_, directories_.GetDbDir());
2633 InstallSuperVersionAndScheduleWork(c->column_family_data(),
2634 &job_context->superversion_contexts[0],
2635 *c->mutable_cf_options());
2636 ROCKS_LOG_BUFFER(log_buffer, "[%s] Deleted %d files\n",
2637 c->column_family_data()->GetName().c_str(),
2638 c->num_input_files(0));
2639 *made_progress = true;
2640 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
2641 c->column_family_data());
2642 } else if (!trivial_move_disallowed && c->IsTrivialMove()) {
2643 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:TrivialMove");
2644 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
2645 c->column_family_data());
2646 // Instrument for event update
2647 // TODO(yhchiang): add op details for showing trivial-move.
2648 ThreadStatusUtil::SetColumnFamily(
2649 c->column_family_data(), c->column_family_data()->ioptions()->env,
2650 immutable_db_options_.enable_thread_tracking);
2651 ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
2652
2653 compaction_job_stats.num_input_files = c->num_input_files(0);
2654
2655 NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
2656 compaction_job_stats, job_context->job_id);
2657
2658 // Move files to next level
2659 int32_t moved_files = 0;
2660 int64_t moved_bytes = 0;
2661 for (unsigned int l = 0; l < c->num_input_levels(); l++) {
2662 if (c->level(l) == c->output_level()) {
2663 continue;
2664 }
2665 for (size_t i = 0; i < c->num_input_files(l); i++) {
2666 FileMetaData* f = c->input(l, i);
2667 c->edit()->DeleteFile(c->level(l), f->fd.GetNumber());
2668 c->edit()->AddFile(c->output_level(), f->fd.GetNumber(),
2669 f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest,
2670 f->largest, f->fd.smallest_seqno,
2671 f->fd.largest_seqno, f->marked_for_compaction,
2672 f->oldest_blob_file_number, f->oldest_ancester_time,
2673 f->file_creation_time, f->file_checksum,
2674 f->file_checksum_func_name);
2675
2676 ROCKS_LOG_BUFFER(
2677 log_buffer,
2678 "[%s] Moving #%" PRIu64 " to level-%d %" PRIu64 " bytes\n",
2679 c->column_family_data()->GetName().c_str(), f->fd.GetNumber(),
2680 c->output_level(), f->fd.GetFileSize());
2681 ++moved_files;
2682 moved_bytes += f->fd.GetFileSize();
2683 }
2684 }
2685
2686 status = versions_->LogAndApply(c->column_family_data(),
2687 *c->mutable_cf_options(), c->edit(),
2688 &mutex_, directories_.GetDbDir());
2689 // Use latest MutableCFOptions
2690 InstallSuperVersionAndScheduleWork(c->column_family_data(),
2691 &job_context->superversion_contexts[0],
2692 *c->mutable_cf_options());
2693
2694 VersionStorageInfo::LevelSummaryStorage tmp;
2695 c->column_family_data()->internal_stats()->IncBytesMoved(c->output_level(),
2696 moved_bytes);
2697 {
2698 event_logger_.LogToBuffer(log_buffer)
2699 << "job" << job_context->job_id << "event"
2700 << "trivial_move"
2701 << "destination_level" << c->output_level() << "files" << moved_files
2702 << "total_files_size" << moved_bytes;
2703 }
2704 ROCKS_LOG_BUFFER(
2705 log_buffer,
2706 "[%s] Moved #%d files to level-%d %" PRIu64 " bytes %s: %s\n",
2707 c->column_family_data()->GetName().c_str(), moved_files,
2708 c->output_level(), moved_bytes, status.ToString().c_str(),
2709 c->column_family_data()->current()->storage_info()->LevelSummary(&tmp));
2710 *made_progress = true;
2711
2712 // Clear Instrument
2713 ThreadStatusUtil::ResetThreadStatus();
2714 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
2715 c->column_family_data());
2716 } else if (!is_prepicked && c->output_level() > 0 &&
2717 c->output_level() ==
2718 c->column_family_data()
2719 ->current()
2720 ->storage_info()
2721 ->MaxOutputLevel(
2722 immutable_db_options_.allow_ingest_behind) &&
2723 env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
2724 // Forward compactions involving last level to the bottom pool if it exists,
2725 // such that compactions unlikely to contribute to write stalls can be
2726 // delayed or deprioritized.
2727 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:ForwardToBottomPriPool");
2728 CompactionArg* ca = new CompactionArg;
2729 ca->db = this;
2730 ca->prepicked_compaction = new PrepickedCompaction;
2731 ca->prepicked_compaction->compaction = c.release();
2732 ca->prepicked_compaction->manual_compaction_state = nullptr;
2733 // Transfer requested token, so it doesn't need to do it again.
2734 ca->prepicked_compaction->task_token = std::move(task_token);
2735 ++bg_bottom_compaction_scheduled_;
2736 env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, Env::Priority::BOTTOM,
2737 this, &DBImpl::UnscheduleCompactionCallback);
2738 } else {
2739 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
2740 c->column_family_data());
2741 int output_level __attribute__((__unused__));
2742 output_level = c->output_level();
2743 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial",
2744 &output_level);
2745 std::vector<SequenceNumber> snapshot_seqs;
2746 SequenceNumber earliest_write_conflict_snapshot;
2747 SnapshotChecker* snapshot_checker;
2748 GetSnapshotContext(job_context, &snapshot_seqs,
2749 &earliest_write_conflict_snapshot, &snapshot_checker);
2750 assert(is_snapshot_supported_ || snapshots_.empty());
2751 CompactionJob compaction_job(
2752 job_context->job_id, c.get(), immutable_db_options_,
2753 file_options_for_compaction_, versions_.get(), &shutting_down_,
2754 preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
2755 GetDataDir(c->column_family_data(), c->output_path_id()), stats_,
2756 &mutex_, &error_handler_, snapshot_seqs,
2757 earliest_write_conflict_snapshot, snapshot_checker, table_cache_,
2758 &event_logger_, c->mutable_cf_options()->paranoid_file_checks,
2759 c->mutable_cf_options()->report_bg_io_stats, dbname_,
2760 &compaction_job_stats, thread_pri,
2761 is_manual ? &manual_compaction_paused_ : nullptr);
2762 compaction_job.Prepare();
2763
2764 NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
2765 compaction_job_stats, job_context->job_id);
2766
2767 mutex_.Unlock();
2768 TEST_SYNC_POINT_CALLBACK(
2769 "DBImpl::BackgroundCompaction:NonTrivial:BeforeRun", nullptr);
2770 compaction_job.Run();
2771 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun");
2772 mutex_.Lock();
2773
2774 status = compaction_job.Install(*c->mutable_cf_options());
2775 if (status.ok()) {
2776 InstallSuperVersionAndScheduleWork(c->column_family_data(),
2777 &job_context->superversion_contexts[0],
2778 *c->mutable_cf_options());
2779 }
2780 *made_progress = true;
2781 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
2782 c->column_family_data());
2783 }
2784 if (c != nullptr) {
2785 c->ReleaseCompactionFiles(status);
2786 *made_progress = true;
2787
2788 #ifndef ROCKSDB_LITE
2789 // Need to make sure SstFileManager does its bookkeeping
2790 auto sfm = static_cast<SstFileManagerImpl*>(
2791 immutable_db_options_.sst_file_manager.get());
2792 if (sfm && sfm_reserved_compact_space) {
2793 sfm->OnCompactionCompletion(c.get());
2794 }
2795 #endif // ROCKSDB_LITE
2796
2797 NotifyOnCompactionCompleted(c->column_family_data(), c.get(), status,
2798 compaction_job_stats, job_context->job_id);
2799 }
2800
2801 if (status.ok() || status.IsCompactionTooLarge() ||
2802 status.IsManualCompactionPaused()) {
2803 // Done
2804 } else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) {
2805 // Ignore compaction errors found during shutting down
2806 } else {
2807 ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s",
2808 status.ToString().c_str());
2809 error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
2810 if (c != nullptr && !is_manual && !error_handler_.IsBGWorkStopped()) {
2811 // Put this cfd back in the compaction queue so we can retry after some
2812 // time
2813 auto cfd = c->column_family_data();
2814 assert(cfd != nullptr);
2815 // Since this compaction failed, we need to recompute the score so it
2816 // takes the original input files into account
2817 c->column_family_data()
2818 ->current()
2819 ->storage_info()
2820 ->ComputeCompactionScore(*(c->immutable_cf_options()),
2821 *(c->mutable_cf_options()));
2822 if (!cfd->queued_for_compaction()) {
2823 AddToCompactionQueue(cfd);
2824 ++unscheduled_compactions_;
2825 }
2826 }
2827 }
2828 // this will unref its input_version and column_family_data
2829 c.reset();
2830
2831 if (is_manual) {
2832 ManualCompactionState* m = manual_compaction;
2833 if (!status.ok()) {
2834 m->status = status;
2835 m->done = true;
2836 }
2837 // For universal compaction:
2838 // Because universal compaction always happens at level 0, so one
2839 // compaction will pick up all overlapped files. No files will be
2840 // filtered out due to size limit and left for a successive compaction.
2841 // So we can safely conclude the current compaction.
2842 //
2843 // Also note that, if we don't stop here, then the current compaction
2844 // writes a new file back to level 0, which will be used in successive
2845 // compaction. Hence the manual compaction will never finish.
2846 //
2847 // Stop the compaction if manual_end points to nullptr -- this means
2848 // that we compacted the whole range. manual_end should always point
2849 // to nullptr in case of universal compaction
2850 if (m->manual_end == nullptr) {
2851 m->done = true;
2852 }
2853 if (!m->done) {
2854 // We only compacted part of the requested range. Update *m
2855 // to the range that is left to be compacted.
2856 // Universal and FIFO compactions should always compact the whole range
2857 assert(m->cfd->ioptions()->compaction_style !=
2858 kCompactionStyleUniversal ||
2859 m->cfd->ioptions()->num_levels > 1);
2860 assert(m->cfd->ioptions()->compaction_style != kCompactionStyleFIFO);
2861 m->tmp_storage = *m->manual_end;
2862 m->begin = &m->tmp_storage;
2863 m->incomplete = true;
2864 }
2865 m->in_progress = false; // not being processed anymore
2866 }
2867 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Finish");
2868 return status;
2869 }
2870
2871 bool DBImpl::HasPendingManualCompaction() {
2872 return (!manual_compaction_dequeue_.empty());
2873 }
2874
2875 void DBImpl::AddManualCompaction(DBImpl::ManualCompactionState* m) {
2876 manual_compaction_dequeue_.push_back(m);
2877 }
2878
2879 void DBImpl::RemoveManualCompaction(DBImpl::ManualCompactionState* m) {
2880 // Remove from queue
2881 std::deque<ManualCompactionState*>::iterator it =
2882 manual_compaction_dequeue_.begin();
2883 while (it != manual_compaction_dequeue_.end()) {
2884 if (m == (*it)) {
2885 it = manual_compaction_dequeue_.erase(it);
2886 return;
2887 }
2888 ++it;
2889 }
2890 assert(false);
2891 return;
2892 }
2893
2894 bool DBImpl::ShouldntRunManualCompaction(ManualCompactionState* m) {
2895 if (num_running_ingest_file_ > 0) {
2896 // We need to wait for other IngestExternalFile() calls to finish
2897 // before running a manual compaction.
2898 return true;
2899 }
2900 if (m->exclusive) {
2901 return (bg_bottom_compaction_scheduled_ > 0 ||
2902 bg_compaction_scheduled_ > 0);
2903 }
2904 std::deque<ManualCompactionState*>::iterator it =
2905 manual_compaction_dequeue_.begin();
2906 bool seen = false;
2907 while (it != manual_compaction_dequeue_.end()) {
2908 if (m == (*it)) {
2909 ++it;
2910 seen = true;
2911 continue;
2912 } else if (MCOverlap(m, (*it)) && (!seen && !(*it)->in_progress)) {
2913 // Consider the other manual compaction *it, conflicts if:
2914 // overlaps with m
2915 // and (*it) is ahead in the queue and is not yet in progress
2916 return true;
2917 }
2918 ++it;
2919 }
2920 return false;
2921 }
2922
2923 bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) {
2924 // Remove from priority queue
2925 std::deque<ManualCompactionState*>::iterator it =
2926 manual_compaction_dequeue_.begin();
2927 while (it != manual_compaction_dequeue_.end()) {
2928 if ((*it)->exclusive) {
2929 return true;
2930 }
2931 if ((cfd == (*it)->cfd) && (!((*it)->in_progress || (*it)->done))) {
2932 // Allow automatic compaction if manual compaction is
2933 // in progress
2934 return true;
2935 }
2936 ++it;
2937 }
2938 return false;
2939 }
2940
2941 bool DBImpl::HasExclusiveManualCompaction() {
2942 // Remove from priority queue
2943 std::deque<ManualCompactionState*>::iterator it =
2944 manual_compaction_dequeue_.begin();
2945 while (it != manual_compaction_dequeue_.end()) {
2946 if ((*it)->exclusive) {
2947 return true;
2948 }
2949 ++it;
2950 }
2951 return false;
2952 }
2953
2954 bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) {
2955 if ((m->exclusive) || (m1->exclusive)) {
2956 return true;
2957 }
2958 if (m->cfd != m1->cfd) {
2959 return false;
2960 }
2961 return true;
2962 }
2963
2964 #ifndef ROCKSDB_LITE
2965 void DBImpl::BuildCompactionJobInfo(
2966 const ColumnFamilyData* cfd, Compaction* c, const Status& st,
2967 const CompactionJobStats& compaction_job_stats, const int job_id,
2968 const Version* current, CompactionJobInfo* compaction_job_info) const {
2969 assert(compaction_job_info != nullptr);
2970 compaction_job_info->cf_id = cfd->GetID();
2971 compaction_job_info->cf_name = cfd->GetName();
2972 compaction_job_info->status = st;
2973 compaction_job_info->thread_id = env_->GetThreadID();
2974 compaction_job_info->job_id = job_id;
2975 compaction_job_info->base_input_level = c->start_level();
2976 compaction_job_info->output_level = c->output_level();
2977 compaction_job_info->stats = compaction_job_stats;
2978 compaction_job_info->table_properties = c->GetOutputTableProperties();
2979 compaction_job_info->compaction_reason = c->compaction_reason();
2980 compaction_job_info->compression = c->output_compression();
2981 for (size_t i = 0; i < c->num_input_levels(); ++i) {
2982 for (const auto fmd : *c->inputs(i)) {
2983 const FileDescriptor& desc = fmd->fd;
2984 const uint64_t file_number = desc.GetNumber();
2985 auto fn = TableFileName(c->immutable_cf_options()->cf_paths, file_number,
2986 desc.GetPathId());
2987 compaction_job_info->input_files.push_back(fn);
2988 compaction_job_info->input_file_infos.push_back(CompactionFileInfo{
2989 static_cast<int>(i), file_number, fmd->oldest_blob_file_number});
2990 if (compaction_job_info->table_properties.count(fn) == 0) {
2991 std::shared_ptr<const TableProperties> tp;
2992 auto s = current->GetTableProperties(&tp, fmd, &fn);
2993 if (s.ok()) {
2994 compaction_job_info->table_properties[fn] = tp;
2995 }
2996 }
2997 }
2998 }
2999 for (const auto& newf : c->edit()->GetNewFiles()) {
3000 const FileMetaData& meta = newf.second;
3001 const FileDescriptor& desc = meta.fd;
3002 const uint64_t file_number = desc.GetNumber();
3003 compaction_job_info->output_files.push_back(TableFileName(
3004 c->immutable_cf_options()->cf_paths, file_number, desc.GetPathId()));
3005 compaction_job_info->output_file_infos.push_back(CompactionFileInfo{
3006 newf.first, file_number, meta.oldest_blob_file_number});
3007 }
3008 }
3009 #endif
3010
3011 // SuperVersionContext gets created and destructed outside of the lock --
3012 // we use this conveniently to:
3013 // * malloc one SuperVersion() outside of the lock -- new_superversion
3014 // * delete SuperVersion()s outside of the lock -- superversions_to_free
3015 //
3016 // However, if InstallSuperVersionAndScheduleWork() gets called twice with the
3017 // same sv_context, we can't reuse the SuperVersion() that got
3018 // malloced because
3019 // first call already used it. In that rare case, we take a hit and create a
3020 // new SuperVersion() inside of the mutex. We do similar thing
3021 // for superversion_to_free
3022
3023 void DBImpl::InstallSuperVersionAndScheduleWork(
3024 ColumnFamilyData* cfd, SuperVersionContext* sv_context,
3025 const MutableCFOptions& mutable_cf_options) {
3026 mutex_.AssertHeld();
3027
3028 // Update max_total_in_memory_state_
3029 size_t old_memtable_size = 0;
3030 auto* old_sv = cfd->GetSuperVersion();
3031 if (old_sv) {
3032 old_memtable_size = old_sv->mutable_cf_options.write_buffer_size *
3033 old_sv->mutable_cf_options.max_write_buffer_number;
3034 }
3035
3036 // this branch is unlikely to step in
3037 if (UNLIKELY(sv_context->new_superversion == nullptr)) {
3038 sv_context->NewSuperVersion();
3039 }
3040 cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options);
3041
3042 // There may be a small data race here. The snapshot tricking bottommost
3043 // compaction may already be released here. But assuming there will always be
3044 // newer snapshot created and released frequently, the compaction will be
3045 // triggered soon anyway.
3046 bottommost_files_mark_threshold_ = kMaxSequenceNumber;
3047 for (auto* my_cfd : *versions_->GetColumnFamilySet()) {
3048 bottommost_files_mark_threshold_ = std::min(
3049 bottommost_files_mark_threshold_,
3050 my_cfd->current()->storage_info()->bottommost_files_mark_threshold());
3051 }
3052
3053 // Whenever we install new SuperVersion, we might need to issue new flushes or
3054 // compactions.
3055 SchedulePendingCompaction(cfd);
3056 MaybeScheduleFlushOrCompaction();
3057
3058 // Update max_total_in_memory_state_
3059 max_total_in_memory_state_ = max_total_in_memory_state_ - old_memtable_size +
3060 mutable_cf_options.write_buffer_size *
3061 mutable_cf_options.max_write_buffer_number;
3062 }
3063
3064 // ShouldPurge is called by FindObsoleteFiles when doing a full scan,
3065 // and db mutex (mutex_) should already be held.
3066 // Actually, the current implementation of FindObsoleteFiles with
3067 // full_scan=true can issue I/O requests to obtain list of files in
3068 // directories, e.g. env_->getChildren while holding db mutex.
3069 bool DBImpl::ShouldPurge(uint64_t file_number) const {
3070 return files_grabbed_for_purge_.find(file_number) ==
3071 files_grabbed_for_purge_.end() &&
3072 purge_files_.find(file_number) == purge_files_.end();
3073 }
3074
3075 // MarkAsGrabbedForPurge is called by FindObsoleteFiles, and db mutex
3076 // (mutex_) should already be held.
3077 void DBImpl::MarkAsGrabbedForPurge(uint64_t file_number) {
3078 files_grabbed_for_purge_.insert(file_number);
3079 }
3080
3081 void DBImpl::SetSnapshotChecker(SnapshotChecker* snapshot_checker) {
3082 InstrumentedMutexLock l(&mutex_);
3083 // snapshot_checker_ should only set once. If we need to set it multiple
3084 // times, we need to make sure the old one is not deleted while it is still
3085 // using by a compaction job.
3086 assert(!snapshot_checker_);
3087 snapshot_checker_.reset(snapshot_checker);
3088 }
3089
3090 void DBImpl::GetSnapshotContext(
3091 JobContext* job_context, std::vector<SequenceNumber>* snapshot_seqs,
3092 SequenceNumber* earliest_write_conflict_snapshot,
3093 SnapshotChecker** snapshot_checker_ptr) {
3094 mutex_.AssertHeld();
3095 assert(job_context != nullptr);
3096 assert(snapshot_seqs != nullptr);
3097 assert(earliest_write_conflict_snapshot != nullptr);
3098 assert(snapshot_checker_ptr != nullptr);
3099
3100 *snapshot_checker_ptr = snapshot_checker_.get();
3101 if (use_custom_gc_ && *snapshot_checker_ptr == nullptr) {
3102 *snapshot_checker_ptr = DisableGCSnapshotChecker::Instance();
3103 }
3104 if (*snapshot_checker_ptr != nullptr) {
3105 // If snapshot_checker is used, that means the flush/compaction may
3106 // contain values not visible to snapshot taken after
3107 // flush/compaction job starts. Take a snapshot and it will appear
3108 // in snapshot_seqs and force compaction iterator to consider such
3109 // snapshots.
3110 const Snapshot* job_snapshot =
3111 GetSnapshotImpl(false /*write_conflict_boundary*/, false /*lock*/);
3112 job_context->job_snapshot.reset(new ManagedSnapshot(this, job_snapshot));
3113 }
3114 *snapshot_seqs = snapshots_.GetAll(earliest_write_conflict_snapshot);
3115 }
3116 } // namespace ROCKSDB_NAMESPACE