1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_impl/db_impl.h"
13 #include "db/builder.h"
14 #include "db/error_handler.h"
15 #include "db/event_helpers.h"
16 #include "file/sst_file_manager_impl.h"
17 #include "monitoring/iostats_context_imp.h"
18 #include "monitoring/perf_context_imp.h"
19 #include "monitoring/thread_status_updater.h"
20 #include "monitoring/thread_status_util.h"
21 #include "test_util/sync_point.h"
22 #include "util/cast_util.h"
23 #include "util/concurrent_task_limiter_impl.h"
25 namespace ROCKSDB_NAMESPACE
{
27 bool DBImpl::EnoughRoomForCompaction(
28 ColumnFamilyData
* cfd
, const std::vector
<CompactionInputFiles
>& inputs
,
29 bool* sfm_reserved_compact_space
, LogBuffer
* log_buffer
) {
30 // Check if we have enough room to do the compaction
31 bool enough_room
= true;
33 auto sfm
= static_cast<SstFileManagerImpl
*>(
34 immutable_db_options_
.sst_file_manager
.get());
36 // Pass the current bg_error_ to SFM so it can decide what checks to
37 // perform. If this DB instance hasn't seen any error yet, the SFM can be
38 // optimistic and not do disk space checks
40 sfm
->EnoughRoomForCompaction(cfd
, inputs
, error_handler_
.GetBGError());
42 *sfm_reserved_compact_space
= true;
48 (void)sfm_reserved_compact_space
;
49 #endif // ROCKSDB_LITE
51 // Just in case tests want to change the value of enough_room
52 TEST_SYNC_POINT_CALLBACK(
53 "DBImpl::BackgroundCompaction():CancelledCompaction", &enough_room
);
54 ROCKS_LOG_BUFFER(log_buffer
,
55 "Cancelled compaction because not enough room");
56 RecordTick(stats_
, COMPACTION_CANCELLED
, 1);
61 bool DBImpl::RequestCompactionToken(ColumnFamilyData
* cfd
, bool force
,
62 std::unique_ptr
<TaskLimiterToken
>* token
,
63 LogBuffer
* log_buffer
) {
64 assert(*token
== nullptr);
65 auto limiter
= static_cast<ConcurrentTaskLimiterImpl
*>(
66 cfd
->ioptions()->compaction_thread_limiter
.get());
67 if (limiter
== nullptr) {
70 *token
= limiter
->GetToken(force
);
71 if (*token
!= nullptr) {
72 ROCKS_LOG_BUFFER(log_buffer
,
73 "Thread limiter [%s] increase [%s] compaction task, "
74 "force: %s, tasks after: %d",
75 limiter
->GetName().c_str(), cfd
->GetName().c_str(),
76 force
? "true" : "false", limiter
->GetOutstandingTask());
82 Status
DBImpl::SyncClosedLogs(JobContext
* job_context
) {
83 TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
85 autovector
<log::Writer
*, 1> logs_to_sync
;
86 uint64_t current_log_number
= logfile_number_
;
87 while (logs_
.front().number
< current_log_number
&&
88 logs_
.front().getting_synced
) {
91 for (auto it
= logs_
.begin();
92 it
!= logs_
.end() && it
->number
< current_log_number
; ++it
) {
94 assert(!log
.getting_synced
);
95 log
.getting_synced
= true;
96 logs_to_sync
.push_back(log
.writer
);
100 if (!logs_to_sync
.empty()) {
103 for (log::Writer
* log
: logs_to_sync
) {
104 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
105 "[JOB %d] Syncing log #%" PRIu64
, job_context
->job_id
,
106 log
->get_log_number());
107 s
= log
->file()->Sync(immutable_db_options_
.use_fsync
);
112 if (immutable_db_options_
.recycle_log_file_num
> 0) {
120 s
= directories_
.GetWalDir()->Fsync();
125 // "number <= current_log_number - 1" is equivalent to
126 // "number < current_log_number".
127 MarkLogsSynced(current_log_number
- 1, true, s
);
129 error_handler_
.SetBGError(s
, BackgroundErrorReason::kFlush
);
130 TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
137 Status
DBImpl::FlushMemTableToOutputFile(
138 ColumnFamilyData
* cfd
, const MutableCFOptions
& mutable_cf_options
,
139 bool* made_progress
, JobContext
* job_context
,
140 SuperVersionContext
* superversion_context
,
141 std::vector
<SequenceNumber
>& snapshot_seqs
,
142 SequenceNumber earliest_write_conflict_snapshot
,
143 SnapshotChecker
* snapshot_checker
, LogBuffer
* log_buffer
,
144 Env::Priority thread_pri
) {
146 assert(cfd
->imm()->NumNotFlushed() != 0);
147 assert(cfd
->imm()->IsFlushPending());
150 dbname_
, cfd
, immutable_db_options_
, mutable_cf_options
,
151 nullptr /* memtable_id */, file_options_for_compaction_
, versions_
.get(),
152 &mutex_
, &shutting_down_
, snapshot_seqs
, earliest_write_conflict_snapshot
,
153 snapshot_checker
, job_context
, log_buffer
, directories_
.GetDbDir(),
155 GetCompressionFlush(*cfd
->ioptions(), mutable_cf_options
), stats_
,
156 &event_logger_
, mutable_cf_options
.report_bg_io_stats
,
157 true /* sync_output_directory */, true /* write_manifest */, thread_pri
);
159 FileMetaData file_meta
;
161 TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables");
162 flush_job
.PickMemTable();
163 TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:AfterPickMemtables");
166 // may temporarily unlock and lock the mutex.
167 NotifyOnFlushBegin(cfd
, &file_meta
, mutable_cf_options
, job_context
->job_id
);
168 #endif // ROCKSDB_LITE
171 if (logfile_number_
> 0 &&
172 versions_
->GetColumnFamilySet()->NumberOfColumnFamilies() > 1) {
173 // If there are more than one column families, we need to make sure that
174 // all the log files except the most recent one are synced. Otherwise if
175 // the host crashes after flushing and before WAL is persistent, the
176 // flushed SST may contain data from write batches whose updates to
177 // other column families are missing.
178 // SyncClosedLogs() may unlock and re-lock the db_mutex.
179 s
= SyncClosedLogs(job_context
);
181 TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Skip");
184 // Within flush_job.Run, rocksdb may call event listener to notify
185 // file creation and deletion.
187 // Note that flush_job.Run will unlock and lock the db_mutex,
188 // and EventListener callback will be called when the db_mutex
189 // is unlocked by the current thread.
191 s
= flush_job
.Run(&logs_with_prep_tracker_
, &file_meta
);
197 InstallSuperVersionAndScheduleWork(cfd
, superversion_context
,
200 *made_progress
= true;
202 VersionStorageInfo::LevelSummaryStorage tmp
;
203 ROCKS_LOG_BUFFER(log_buffer
, "[%s] Level summary: %s\n",
204 cfd
->GetName().c_str(),
205 cfd
->current()->storage_info()->LevelSummary(&tmp
));
208 if (!s
.ok() && !s
.IsShutdownInProgress() && !s
.IsColumnFamilyDropped()) {
209 Status new_bg_error
= s
;
210 error_handler_
.SetBGError(new_bg_error
, BackgroundErrorReason::kFlush
);
214 // may temporarily unlock and lock the mutex.
215 NotifyOnFlushCompleted(cfd
, mutable_cf_options
,
216 flush_job
.GetCommittedFlushJobsInfo());
217 auto sfm
= static_cast<SstFileManagerImpl
*>(
218 immutable_db_options_
.sst_file_manager
.get());
220 // Notify sst_file_manager that a new file was added
221 std::string file_path
= MakeTableFileName(
222 cfd
->ioptions()->cf_paths
[0].path
, file_meta
.fd
.GetNumber());
223 sfm
->OnAddFile(file_path
);
224 if (sfm
->IsMaxAllowedSpaceReached()) {
225 Status new_bg_error
=
226 Status::SpaceLimit("Max allowed space was reached");
227 TEST_SYNC_POINT_CALLBACK(
228 "DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached",
230 error_handler_
.SetBGError(new_bg_error
, BackgroundErrorReason::kFlush
);
233 #endif // ROCKSDB_LITE
235 TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:Finish");
239 Status
DBImpl::FlushMemTablesToOutputFiles(
240 const autovector
<BGFlushArg
>& bg_flush_args
, bool* made_progress
,
241 JobContext
* job_context
, LogBuffer
* log_buffer
, Env::Priority thread_pri
) {
242 if (immutable_db_options_
.atomic_flush
) {
243 return AtomicFlushMemTablesToOutputFiles(
244 bg_flush_args
, made_progress
, job_context
, log_buffer
, thread_pri
);
246 std::vector
<SequenceNumber
> snapshot_seqs
;
247 SequenceNumber earliest_write_conflict_snapshot
;
248 SnapshotChecker
* snapshot_checker
;
249 GetSnapshotContext(job_context
, &snapshot_seqs
,
250 &earliest_write_conflict_snapshot
, &snapshot_checker
);
252 for (auto& arg
: bg_flush_args
) {
253 ColumnFamilyData
* cfd
= arg
.cfd_
;
254 MutableCFOptions mutable_cf_options
= *cfd
->GetLatestMutableCFOptions();
255 SuperVersionContext
* superversion_context
= arg
.superversion_context_
;
256 Status s
= FlushMemTableToOutputFile(
257 cfd
, mutable_cf_options
, made_progress
, job_context
,
258 superversion_context
, snapshot_seqs
, earliest_write_conflict_snapshot
,
259 snapshot_checker
, log_buffer
, thread_pri
);
262 if (!s
.IsShutdownInProgress() && !s
.IsColumnFamilyDropped()) {
263 // At this point, DB is not shutting down, nor is cfd dropped.
264 // Something is wrong, thus we break out of the loop.
273 * Atomically flushes multiple column families.
275 * For each column family, all memtables with ID smaller than or equal to the
276 * ID specified in bg_flush_args will be flushed. Only after all column
277 * families finish flush will this function commit to MANIFEST. If any of the
278 * column families are not flushed successfully, this function does not have
279 * any side-effect on the state of the database.
281 Status
DBImpl::AtomicFlushMemTablesToOutputFiles(
282 const autovector
<BGFlushArg
>& bg_flush_args
, bool* made_progress
,
283 JobContext
* job_context
, LogBuffer
* log_buffer
, Env::Priority thread_pri
) {
286 autovector
<ColumnFamilyData
*> cfds
;
287 for (const auto& arg
: bg_flush_args
) {
288 cfds
.emplace_back(arg
.cfd_
);
292 for (const auto cfd
: cfds
) {
293 assert(cfd
->imm()->NumNotFlushed() != 0);
294 assert(cfd
->imm()->IsFlushPending());
298 std::vector
<SequenceNumber
> snapshot_seqs
;
299 SequenceNumber earliest_write_conflict_snapshot
;
300 SnapshotChecker
* snapshot_checker
;
301 GetSnapshotContext(job_context
, &snapshot_seqs
,
302 &earliest_write_conflict_snapshot
, &snapshot_checker
);
304 autovector
<Directory
*> distinct_output_dirs
;
305 autovector
<std::string
> distinct_output_dir_paths
;
306 std::vector
<std::unique_ptr
<FlushJob
>> jobs
;
307 std::vector
<MutableCFOptions
> all_mutable_cf_options
;
308 int num_cfs
= static_cast<int>(cfds
.size());
309 all_mutable_cf_options
.reserve(num_cfs
);
310 for (int i
= 0; i
< num_cfs
; ++i
) {
312 Directory
* data_dir
= GetDataDir(cfd
, 0U);
313 const std::string
& curr_path
= cfd
->ioptions()->cf_paths
[0].path
;
315 // Add to distinct output directories if eligible. Use linear search. Since
316 // the number of elements in the vector is not large, performance should be
319 for (const auto& path
: distinct_output_dir_paths
) {
320 if (path
== curr_path
) {
326 distinct_output_dir_paths
.emplace_back(curr_path
);
327 distinct_output_dirs
.emplace_back(data_dir
);
330 all_mutable_cf_options
.emplace_back(*cfd
->GetLatestMutableCFOptions());
331 const MutableCFOptions
& mutable_cf_options
= all_mutable_cf_options
.back();
332 const uint64_t* max_memtable_id
= &(bg_flush_args
[i
].max_memtable_id_
);
333 jobs
.emplace_back(new FlushJob(
334 dbname_
, cfd
, immutable_db_options_
, mutable_cf_options
,
335 max_memtable_id
, file_options_for_compaction_
, versions_
.get(), &mutex_
,
336 &shutting_down_
, snapshot_seqs
, earliest_write_conflict_snapshot
,
337 snapshot_checker
, job_context
, log_buffer
, directories_
.GetDbDir(),
338 data_dir
, GetCompressionFlush(*cfd
->ioptions(), mutable_cf_options
),
339 stats_
, &event_logger_
, mutable_cf_options
.report_bg_io_stats
,
340 false /* sync_output_directory */, false /* write_manifest */,
342 jobs
.back()->PickMemTable();
345 std::vector
<FileMetaData
> file_meta(num_cfs
);
347 assert(num_cfs
== static_cast<int>(jobs
.size()));
350 for (int i
= 0; i
!= num_cfs
; ++i
) {
351 const MutableCFOptions
& mutable_cf_options
= all_mutable_cf_options
.at(i
);
352 // may temporarily unlock and lock the mutex.
353 NotifyOnFlushBegin(cfds
[i
], &file_meta
[i
], mutable_cf_options
,
354 job_context
->job_id
);
356 #endif /* !ROCKSDB_LITE */
358 if (logfile_number_
> 0) {
359 // TODO (yanqin) investigate whether we should sync the closed logs for
360 // single column family case.
361 s
= SyncClosedLogs(job_context
);
364 // exec_status stores the execution status of flush_jobs as
365 // <bool /* executed */, Status /* status code */>
366 autovector
<std::pair
<bool, Status
>> exec_status
;
367 for (int i
= 0; i
!= num_cfs
; ++i
) {
368 // Initially all jobs are not executed, with status OK.
369 exec_status
.emplace_back(false, Status::OK());
373 // TODO (yanqin): parallelize jobs with threads.
374 for (int i
= 1; i
!= num_cfs
; ++i
) {
375 exec_status
[i
].second
=
376 jobs
[i
]->Run(&logs_with_prep_tracker_
, &file_meta
[i
]);
377 exec_status
[i
].first
= true;
381 "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:1");
383 "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2");
385 assert(exec_status
.size() > 0);
386 assert(!file_meta
.empty());
387 exec_status
[0].second
=
388 jobs
[0]->Run(&logs_with_prep_tracker_
, &file_meta
[0]);
389 exec_status
[0].first
= true;
392 for (const auto& e
: exec_status
) {
393 if (!e
.second
.ok()) {
395 if (!e
.second
.IsShutdownInProgress() &&
396 !e
.second
.IsColumnFamilyDropped()) {
397 // If a flush job did not return OK, and the CF is not dropped, and
398 // the DB is not shutting down, then we have to return this result to
400 error_status
= e
.second
;
405 s
= error_status
.ok() ? s
: error_status
;
408 if (s
.IsColumnFamilyDropped()) {
412 if (s
.ok() || s
.IsShutdownInProgress()) {
413 // Sync on all distinct output directories.
414 for (auto dir
: distinct_output_dirs
) {
415 if (dir
!= nullptr) {
416 Status error_status
= dir
->Fsync();
417 if (!error_status
.ok()) {
424 // Need to undo atomic flush if something went wrong, i.e. s is not OK and
425 // it is not because of CF drop.
426 // Have to cancel the flush jobs that have NOT executed because we need to
427 // unref the versions.
428 for (int i
= 0; i
!= num_cfs
; ++i
) {
429 if (!exec_status
[i
].first
) {
433 for (int i
= 0; i
!= num_cfs
; ++i
) {
434 if (exec_status
[i
].first
&& exec_status
[i
].second
.ok()) {
435 auto& mems
= jobs
[i
]->GetMemTables();
436 cfds
[i
]->imm()->RollbackMemtableFlush(mems
,
437 file_meta
[i
].fd
.GetNumber());
443 auto wait_to_install_func
= [&]() {
445 for (size_t i
= 0; i
!= cfds
.size(); ++i
) {
446 const auto& mems
= jobs
[i
]->GetMemTables();
447 if (cfds
[i
]->IsDropped()) {
448 // If the column family is dropped, then do not wait.
450 } else if (!mems
.empty() &&
451 cfds
[i
]->imm()->GetEarliestMemTableID() < mems
[0]->GetID()) {
452 // If a flush job needs to install the flush result for mems and
453 // mems[0] is not the earliest memtable, it means another thread must
454 // be installing flush results for the same column family, then the
455 // current thread needs to wait.
458 } else if (mems
.empty() && cfds
[i
]->imm()->GetEarliestMemTableID() <=
459 bg_flush_args
[i
].max_memtable_id_
) {
460 // If a flush job does not need to install flush results, then it has
461 // to wait until all memtables up to max_memtable_id_ (inclusive) are
470 bool resuming_from_bg_err
= error_handler_
.IsDBStopped();
471 while ((!error_handler_
.IsDBStopped() ||
472 error_handler_
.GetRecoveryError().ok()) &&
473 !wait_to_install_func()) {
474 atomic_flush_install_cv_
.Wait();
477 s
= resuming_from_bg_err
? error_handler_
.GetRecoveryError()
478 : error_handler_
.GetBGError();
482 autovector
<ColumnFamilyData
*> tmp_cfds
;
483 autovector
<const autovector
<MemTable
*>*> mems_list
;
484 autovector
<const MutableCFOptions
*> mutable_cf_options_list
;
485 autovector
<FileMetaData
*> tmp_file_meta
;
486 for (int i
= 0; i
!= num_cfs
; ++i
) {
487 const auto& mems
= jobs
[i
]->GetMemTables();
488 if (!cfds
[i
]->IsDropped() && !mems
.empty()) {
489 tmp_cfds
.emplace_back(cfds
[i
]);
490 mems_list
.emplace_back(&mems
);
491 mutable_cf_options_list
.emplace_back(&all_mutable_cf_options
[i
]);
492 tmp_file_meta
.emplace_back(&file_meta
[i
]);
496 s
= InstallMemtableAtomicFlushResults(
497 nullptr /* imm_lists */, tmp_cfds
, mutable_cf_options_list
, mems_list
,
498 versions_
.get(), &mutex_
, tmp_file_meta
,
499 &job_context
->memtables_to_free
, directories_
.GetDbDir(), log_buffer
);
504 static_cast<int>(job_context
->superversion_contexts
.size()));
505 for (int i
= 0; i
!= num_cfs
; ++i
) {
506 if (cfds
[i
]->IsDropped()) {
509 InstallSuperVersionAndScheduleWork(cfds
[i
],
510 &job_context
->superversion_contexts
[i
],
511 all_mutable_cf_options
[i
]);
512 VersionStorageInfo::LevelSummaryStorage tmp
;
513 ROCKS_LOG_BUFFER(log_buffer
, "[%s] Level summary: %s\n",
514 cfds
[i
]->GetName().c_str(),
515 cfds
[i
]->current()->storage_info()->LevelSummary(&tmp
));
518 *made_progress
= true;
521 auto sfm
= static_cast<SstFileManagerImpl
*>(
522 immutable_db_options_
.sst_file_manager
.get());
523 assert(all_mutable_cf_options
.size() == static_cast<size_t>(num_cfs
));
524 for (int i
= 0; i
!= num_cfs
; ++i
) {
525 if (cfds
[i
]->IsDropped()) {
528 NotifyOnFlushCompleted(cfds
[i
], all_mutable_cf_options
[i
],
529 jobs
[i
]->GetCommittedFlushJobsInfo());
531 std::string file_path
= MakeTableFileName(
532 cfds
[i
]->ioptions()->cf_paths
[0].path
, file_meta
[i
].fd
.GetNumber());
533 sfm
->OnAddFile(file_path
);
534 if (sfm
->IsMaxAllowedSpaceReached() &&
535 error_handler_
.GetBGError().ok()) {
536 Status new_bg_error
=
537 Status::SpaceLimit("Max allowed space was reached");
538 error_handler_
.SetBGError(new_bg_error
,
539 BackgroundErrorReason::kFlush
);
543 #endif // ROCKSDB_LITE
546 if (!s
.ok() && !s
.IsShutdownInProgress()) {
547 Status new_bg_error
= s
;
548 error_handler_
.SetBGError(new_bg_error
, BackgroundErrorReason::kFlush
);
554 void DBImpl::NotifyOnFlushBegin(ColumnFamilyData
* cfd
, FileMetaData
* file_meta
,
555 const MutableCFOptions
& mutable_cf_options
,
558 if (immutable_db_options_
.listeners
.size() == 0U) {
562 if (shutting_down_
.load(std::memory_order_acquire
)) {
565 bool triggered_writes_slowdown
=
566 (cfd
->current()->storage_info()->NumLevelFiles(0) >=
567 mutable_cf_options
.level0_slowdown_writes_trigger
);
568 bool triggered_writes_stop
=
569 (cfd
->current()->storage_info()->NumLevelFiles(0) >=
570 mutable_cf_options
.level0_stop_writes_trigger
);
571 // release lock while notifying events
575 info
.cf_id
= cfd
->GetID();
576 info
.cf_name
= cfd
->GetName();
577 // TODO(yhchiang): make db_paths dynamic in case flush does not
578 // go to L0 in the future.
579 const uint64_t file_number
= file_meta
->fd
.GetNumber();
581 MakeTableFileName(cfd
->ioptions()->cf_paths
[0].path
, file_number
);
582 info
.file_number
= file_number
;
583 info
.thread_id
= env_
->GetThreadID();
584 info
.job_id
= job_id
;
585 info
.triggered_writes_slowdown
= triggered_writes_slowdown
;
586 info
.triggered_writes_stop
= triggered_writes_stop
;
587 info
.smallest_seqno
= file_meta
->fd
.smallest_seqno
;
588 info
.largest_seqno
= file_meta
->fd
.largest_seqno
;
589 info
.flush_reason
= cfd
->GetFlushReason();
590 for (auto listener
: immutable_db_options_
.listeners
) {
591 listener
->OnFlushBegin(this, info
);
595 // no need to signal bg_cv_ as it will be signaled at the end of the
600 (void)mutable_cf_options
;
602 #endif // ROCKSDB_LITE
605 void DBImpl::NotifyOnFlushCompleted(
606 ColumnFamilyData
* cfd
, const MutableCFOptions
& mutable_cf_options
,
607 std::list
<std::unique_ptr
<FlushJobInfo
>>* flush_jobs_info
) {
609 assert(flush_jobs_info
!= nullptr);
610 if (immutable_db_options_
.listeners
.size() == 0U) {
614 if (shutting_down_
.load(std::memory_order_acquire
)) {
617 bool triggered_writes_slowdown
=
618 (cfd
->current()->storage_info()->NumLevelFiles(0) >=
619 mutable_cf_options
.level0_slowdown_writes_trigger
);
620 bool triggered_writes_stop
=
621 (cfd
->current()->storage_info()->NumLevelFiles(0) >=
622 mutable_cf_options
.level0_stop_writes_trigger
);
623 // release lock while notifying events
626 for (auto& info
: *flush_jobs_info
) {
627 info
->triggered_writes_slowdown
= triggered_writes_slowdown
;
628 info
->triggered_writes_stop
= triggered_writes_stop
;
629 for (auto listener
: immutable_db_options_
.listeners
) {
630 listener
->OnFlushCompleted(this, *info
);
633 flush_jobs_info
->clear();
636 // no need to signal bg_cv_ as it will be signaled at the end of the
640 (void)mutable_cf_options
;
641 (void)flush_jobs_info
;
642 #endif // ROCKSDB_LITE
645 Status
DBImpl::CompactRange(const CompactRangeOptions
& options
,
646 ColumnFamilyHandle
* column_family
,
647 const Slice
* begin
, const Slice
* end
) {
648 auto cfh
= reinterpret_cast<ColumnFamilyHandleImpl
*>(column_family
);
649 auto cfd
= cfh
->cfd();
651 if (options
.target_path_id
>= cfd
->ioptions()->cf_paths
.size()) {
652 return Status::InvalidArgument("Invalid target path ID");
655 bool exclusive
= options
.exclusive_manual_compaction
;
657 bool flush_needed
= true;
658 if (begin
!= nullptr && end
!= nullptr) {
659 // TODO(ajkr): We could also optimize away the flush in certain cases where
660 // one/both sides of the interval are unbounded. But it requires more
661 // changes to RangesOverlapWithMemtables.
662 Range
range(*begin
, *end
);
663 SuperVersion
* super_version
= cfd
->GetReferencedSuperVersion(this);
664 cfd
->RangesOverlapWithMemtables({range
}, super_version
, &flush_needed
);
665 CleanupSuperVersion(super_version
);
671 fo
.allow_write_stall
= options
.allow_write_stall
;
672 if (immutable_db_options_
.atomic_flush
) {
673 autovector
<ColumnFamilyData
*> cfds
;
675 SelectColumnFamiliesForAtomicFlush(&cfds
);
677 s
= AtomicFlushMemTables(cfds
, fo
, FlushReason::kManualCompaction
,
678 false /* writes_stopped */);
680 s
= FlushMemTable(cfd
, fo
, FlushReason::kManualCompaction
,
681 false /* writes_stopped*/);
684 LogFlush(immutable_db_options_
.info_log
);
689 int max_level_with_files
= 0;
690 // max_file_num_to_ignore can be used to filter out newly created SST files,
691 // useful for bottom level compaction in a manual compaction
692 uint64_t max_file_num_to_ignore
= port::kMaxUint64
;
693 uint64_t next_file_number
= port::kMaxUint64
;
695 InstrumentedMutexLock
l(&mutex_
);
696 Version
* base
= cfd
->current();
697 for (int level
= 1; level
< base
->storage_info()->num_non_empty_levels();
699 if (base
->storage_info()->OverlapInLevel(level
, begin
, end
)) {
700 max_level_with_files
= level
;
703 next_file_number
= versions_
->current_next_file_number();
706 int final_output_level
= 0;
708 if (cfd
->ioptions()->compaction_style
== kCompactionStyleUniversal
&&
709 cfd
->NumberLevels() > 1) {
710 // Always compact all files together.
711 final_output_level
= cfd
->NumberLevels() - 1;
712 // if bottom most level is reserved
713 if (immutable_db_options_
.allow_ingest_behind
) {
714 final_output_level
--;
716 s
= RunManualCompaction(cfd
, ColumnFamilyData::kCompactAllLevels
,
717 final_output_level
, options
, begin
, end
, exclusive
,
718 false, max_file_num_to_ignore
);
720 for (int level
= 0; level
<= max_level_with_files
; level
++) {
722 // in case the compaction is universal or if we're compacting the
723 // bottom-most level, the output level will be the same as input one.
724 // level 0 can never be the bottommost level (i.e. if all files are in
725 // level 0, we will compact to level 1)
726 if (cfd
->ioptions()->compaction_style
== kCompactionStyleUniversal
||
727 cfd
->ioptions()->compaction_style
== kCompactionStyleFIFO
) {
728 output_level
= level
;
729 } else if (level
== max_level_with_files
&& level
> 0) {
730 if (options
.bottommost_level_compaction
==
731 BottommostLevelCompaction::kSkip
) {
732 // Skip bottommost level compaction
734 } else if (options
.bottommost_level_compaction
==
735 BottommostLevelCompaction::kIfHaveCompactionFilter
&&
736 cfd
->ioptions()->compaction_filter
== nullptr &&
737 cfd
->ioptions()->compaction_filter_factory
== nullptr) {
738 // Skip bottommost level compaction since we don't have a compaction
742 output_level
= level
;
743 // update max_file_num_to_ignore only for bottom level compaction
744 // because data in newly compacted files in middle levels may still need
746 max_file_num_to_ignore
= next_file_number
;
748 output_level
= level
+ 1;
749 if (cfd
->ioptions()->compaction_style
== kCompactionStyleLevel
&&
750 cfd
->ioptions()->level_compaction_dynamic_level_bytes
&&
752 output_level
= ColumnFamilyData::kCompactToBaseLevel
;
755 s
= RunManualCompaction(cfd
, level
, output_level
, options
, begin
, end
,
756 exclusive
, false, max_file_num_to_ignore
);
760 if (output_level
== ColumnFamilyData::kCompactToBaseLevel
) {
761 final_output_level
= cfd
->NumberLevels() - 1;
762 } else if (output_level
> final_output_level
) {
763 final_output_level
= output_level
;
765 TEST_SYNC_POINT("DBImpl::RunManualCompaction()::1");
766 TEST_SYNC_POINT("DBImpl::RunManualCompaction()::2");
770 LogFlush(immutable_db_options_
.info_log
);
774 if (options
.change_level
) {
775 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
776 "[RefitLevel] waiting for background threads to stop");
777 s
= PauseBackgroundWork();
779 s
= ReFitLevel(cfd
, final_output_level
, options
.target_level
);
781 ContinueBackgroundWork();
783 LogFlush(immutable_db_options_
.info_log
);
786 InstrumentedMutexLock
l(&mutex_
);
787 // an automatic compaction that has been scheduled might have been
788 // preempted by the manual compactions. Need to schedule it back.
789 MaybeScheduleFlushOrCompaction();
795 Status
DBImpl::CompactFiles(const CompactionOptions
& compact_options
,
796 ColumnFamilyHandle
* column_family
,
797 const std::vector
<std::string
>& input_file_names
,
798 const int output_level
, const int output_path_id
,
799 std::vector
<std::string
>* const output_file_names
,
800 CompactionJobInfo
* compaction_job_info
) {
802 (void)compact_options
;
804 (void)input_file_names
;
806 (void)output_path_id
;
807 (void)output_file_names
;
808 (void)compaction_job_info
;
809 // not supported in lite version
810 return Status::NotSupported("Not supported in ROCKSDB LITE");
812 if (column_family
== nullptr) {
813 return Status::InvalidArgument("ColumnFamilyHandle must be non-null.");
816 auto cfd
= reinterpret_cast<ColumnFamilyHandleImpl
*>(column_family
)->cfd();
820 JobContext
job_context(0, true);
821 LogBuffer
log_buffer(InfoLogLevel::INFO_LEVEL
,
822 immutable_db_options_
.info_log
.get());
824 // Perform CompactFiles
825 TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile2");
827 InstrumentedMutexLock
l(&mutex_
);
829 // This call will unlock/lock the mutex to wait for current running
830 // IngestExternalFile() calls to finish.
833 // We need to get current after `WaitForIngestFile`, because
834 // `IngestExternalFile` may add files that overlap with `input_file_names`
835 auto* current
= cfd
->current();
838 s
= CompactFilesImpl(compact_options
, cfd
, current
, input_file_names
,
839 output_file_names
, output_level
, output_path_id
,
840 &job_context
, &log_buffer
, compaction_job_info
);
845 // Find and delete obsolete files
847 InstrumentedMutexLock
l(&mutex_
);
848 // If !s.ok(), this means that Compaction failed. In that case, we want
849 // to delete all obsolete files we might have created and we force
850 // FindObsoleteFiles(). This is because job_context does not
851 // catch all created files if compaction failed.
852 FindObsoleteFiles(&job_context
, !s
.ok());
853 } // release the mutex
855 // delete unnecessary files if any, this is done outside the mutex
856 if (job_context
.HaveSomethingToClean() ||
857 job_context
.HaveSomethingToDelete() || !log_buffer
.IsEmpty()) {
858 // Have to flush the info logs before bg_compaction_scheduled_--
859 // because if bg_flush_scheduled_ becomes 0 and the lock is
860 // released, the deconstructor of DB can kick in and destroy all the
861 // states of DB so info_log might not be available after that point.
862 // It also applies to access other states that DB owns.
863 log_buffer
.FlushBufferToLog();
864 if (job_context
.HaveSomethingToDelete()) {
865 // no mutex is locked here. No need to Unlock() and Lock() here.
866 PurgeObsoleteFiles(job_context
);
872 #endif // ROCKSDB_LITE
876 Status
DBImpl::CompactFilesImpl(
877 const CompactionOptions
& compact_options
, ColumnFamilyData
* cfd
,
878 Version
* version
, const std::vector
<std::string
>& input_file_names
,
879 std::vector
<std::string
>* const output_file_names
, const int output_level
,
880 int output_path_id
, JobContext
* job_context
, LogBuffer
* log_buffer
,
881 CompactionJobInfo
* compaction_job_info
) {
884 if (shutting_down_
.load(std::memory_order_acquire
)) {
885 return Status::ShutdownInProgress();
887 if (manual_compaction_paused_
.load(std::memory_order_acquire
)) {
888 return Status::Incomplete(Status::SubCode::kManualCompactionPaused
);
891 std::unordered_set
<uint64_t> input_set
;
892 for (const auto& file_name
: input_file_names
) {
893 input_set
.insert(TableFileNameToNumber(file_name
));
896 ColumnFamilyMetaData cf_meta
;
897 // TODO(yhchiang): can directly use version here if none of the
898 // following functions call is pluggable to external developers.
899 version
->GetColumnFamilyMetaData(&cf_meta
);
901 if (output_path_id
< 0) {
902 if (cfd
->ioptions()->cf_paths
.size() == 1U) {
905 return Status::NotSupported(
906 "Automatic output path selection is not "
907 "yet supported in CompactFiles()");
911 Status s
= cfd
->compaction_picker()->SanitizeCompactionInputFiles(
912 &input_set
, cf_meta
, output_level
);
917 std::vector
<CompactionInputFiles
> input_files
;
918 s
= cfd
->compaction_picker()->GetCompactionInputsFromFileNumbers(
919 &input_files
, &input_set
, version
->storage_info(), compact_options
);
924 for (const auto& inputs
: input_files
) {
925 if (cfd
->compaction_picker()->AreFilesInCompaction(inputs
.files
)) {
926 return Status::Aborted(
927 "Some of the necessary compaction input "
928 "files are already being compacted");
931 bool sfm_reserved_compact_space
= false;
932 // First check if we have enough room to do the compaction
933 bool enough_room
= EnoughRoomForCompaction(
934 cfd
, input_files
, &sfm_reserved_compact_space
, log_buffer
);
937 // m's vars will get set properly at the end of this function,
938 // as long as status == CompactionTooLarge
939 return Status::CompactionTooLarge();
942 // At this point, CompactFiles will be run.
943 bg_compaction_scheduled_
++;
945 std::unique_ptr
<Compaction
> c
;
946 assert(cfd
->compaction_picker());
947 c
.reset(cfd
->compaction_picker()->CompactFiles(
948 compact_options
, input_files
, output_level
, version
->storage_info(),
949 *cfd
->GetLatestMutableCFOptions(), output_path_id
));
950 // we already sanitized the set of input files and checked for conflicts
951 // without releasing the lock, so we're guaranteed a compaction can be formed.
952 assert(c
!= nullptr);
954 c
->SetInputVersion(version
);
955 // deletion compaction currently not allowed in CompactFiles.
956 assert(!c
->deletion_compaction());
958 std::vector
<SequenceNumber
> snapshot_seqs
;
959 SequenceNumber earliest_write_conflict_snapshot
;
960 SnapshotChecker
* snapshot_checker
;
961 GetSnapshotContext(job_context
, &snapshot_seqs
,
962 &earliest_write_conflict_snapshot
, &snapshot_checker
);
964 std::unique_ptr
<std::list
<uint64_t>::iterator
> pending_outputs_inserted_elem(
965 new std::list
<uint64_t>::iterator(
966 CaptureCurrentFileNumberInPendingOutputs()));
968 assert(is_snapshot_supported_
|| snapshots_
.empty());
969 CompactionJobStats compaction_job_stats
;
970 CompactionJob
compaction_job(
971 job_context
->job_id
, c
.get(), immutable_db_options_
,
972 file_options_for_compaction_
, versions_
.get(), &shutting_down_
,
973 preserve_deletes_seqnum_
.load(), log_buffer
, directories_
.GetDbDir(),
974 GetDataDir(c
->column_family_data(), c
->output_path_id()), stats_
, &mutex_
,
975 &error_handler_
, snapshot_seqs
, earliest_write_conflict_snapshot
,
976 snapshot_checker
, table_cache_
, &event_logger_
,
977 c
->mutable_cf_options()->paranoid_file_checks
,
978 c
->mutable_cf_options()->report_bg_io_stats
, dbname_
,
979 &compaction_job_stats
, Env::Priority::USER
, &manual_compaction_paused_
);
981 // Creating a compaction influences the compaction score because the score
982 // takes running compactions into account (by skipping files that are already
983 // being compacted). Since we just changed compaction score, we recalculate it
985 version
->storage_info()->ComputeCompactionScore(*cfd
->ioptions(),
986 *c
->mutable_cf_options());
988 compaction_job
.Prepare();
991 TEST_SYNC_POINT("CompactFilesImpl:0");
992 TEST_SYNC_POINT("CompactFilesImpl:1");
993 compaction_job
.Run();
994 TEST_SYNC_POINT("CompactFilesImpl:2");
995 TEST_SYNC_POINT("CompactFilesImpl:3");
998 Status status
= compaction_job
.Install(*c
->mutable_cf_options());
1000 InstallSuperVersionAndScheduleWork(c
->column_family_data(),
1001 &job_context
->superversion_contexts
[0],
1002 *c
->mutable_cf_options());
1004 c
->ReleaseCompactionFiles(s
);
1005 #ifndef ROCKSDB_LITE
1006 // Need to make sure SstFileManager does its bookkeeping
1007 auto sfm
= static_cast<SstFileManagerImpl
*>(
1008 immutable_db_options_
.sst_file_manager
.get());
1009 if (sfm
&& sfm_reserved_compact_space
) {
1010 sfm
->OnCompactionCompletion(c
.get());
1012 #endif // ROCKSDB_LITE
1014 ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem
);
1016 if (compaction_job_info
!= nullptr) {
1017 BuildCompactionJobInfo(cfd
, c
.get(), s
, compaction_job_stats
,
1018 job_context
->job_id
, version
, compaction_job_info
);
1023 } else if (status
.IsColumnFamilyDropped() || status
.IsShutdownInProgress()) {
1024 // Ignore compaction errors found during shutting down
1025 } else if (status
.IsManualCompactionPaused()) {
1026 // Don't report stopping manual compaction as error
1027 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
1028 "[%s] [JOB %d] Stopping manual compaction",
1029 c
->column_family_data()->GetName().c_str(),
1030 job_context
->job_id
);
1032 ROCKS_LOG_WARN(immutable_db_options_
.info_log
,
1033 "[%s] [JOB %d] Compaction error: %s",
1034 c
->column_family_data()->GetName().c_str(),
1035 job_context
->job_id
, status
.ToString().c_str());
1036 error_handler_
.SetBGError(status
, BackgroundErrorReason::kCompaction
);
1039 if (output_file_names
!= nullptr) {
1040 for (const auto newf
: c
->edit()->GetNewFiles()) {
1041 (*output_file_names
)
1042 .push_back(TableFileName(c
->immutable_cf_options()->cf_paths
,
1043 newf
.second
.fd
.GetNumber(),
1044 newf
.second
.fd
.GetPathId()));
1050 bg_compaction_scheduled_
--;
1051 if (bg_compaction_scheduled_
== 0) {
1054 MaybeScheduleFlushOrCompaction();
1055 TEST_SYNC_POINT("CompactFilesImpl:End");
1059 #endif // ROCKSDB_LITE
1061 Status
DBImpl::PauseBackgroundWork() {
1062 InstrumentedMutexLock
guard_lock(&mutex_
);
1063 bg_compaction_paused_
++;
1064 while (bg_bottom_compaction_scheduled_
> 0 || bg_compaction_scheduled_
> 0 ||
1065 bg_flush_scheduled_
> 0) {
1069 return Status::OK();
1072 Status
DBImpl::ContinueBackgroundWork() {
1073 InstrumentedMutexLock
guard_lock(&mutex_
);
1074 if (bg_work_paused_
== 0) {
1075 return Status::InvalidArgument();
1077 assert(bg_work_paused_
> 0);
1078 assert(bg_compaction_paused_
> 0);
1079 bg_compaction_paused_
--;
1081 // It's sufficient to check just bg_work_paused_ here since
1082 // bg_work_paused_ is always no greater than bg_compaction_paused_
1083 if (bg_work_paused_
== 0) {
1084 MaybeScheduleFlushOrCompaction();
1086 return Status::OK();
1089 void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData
* cfd
, Compaction
* c
,
1091 const CompactionJobStats
& job_stats
,
1093 #ifndef ROCKSDB_LITE
1094 if (immutable_db_options_
.listeners
.empty()) {
1097 mutex_
.AssertHeld();
1098 if (shutting_down_
.load(std::memory_order_acquire
)) {
1101 if (c
->is_manual_compaction() &&
1102 manual_compaction_paused_
.load(std::memory_order_acquire
)) {
1105 Version
* current
= cfd
->current();
1107 // release lock while notifying events
1109 TEST_SYNC_POINT("DBImpl::NotifyOnCompactionBegin::UnlockMutex");
1111 CompactionJobInfo info
{};
1112 info
.cf_name
= cfd
->GetName();
1114 info
.thread_id
= env_
->GetThreadID();
1115 info
.job_id
= job_id
;
1116 info
.base_input_level
= c
->start_level();
1117 info
.output_level
= c
->output_level();
1118 info
.stats
= job_stats
;
1119 info
.table_properties
= c
->GetOutputTableProperties();
1120 info
.compaction_reason
= c
->compaction_reason();
1121 info
.compression
= c
->output_compression();
1122 for (size_t i
= 0; i
< c
->num_input_levels(); ++i
) {
1123 for (const auto fmd
: *c
->inputs(i
)) {
1124 const FileDescriptor
& desc
= fmd
->fd
;
1125 const uint64_t file_number
= desc
.GetNumber();
1126 auto fn
= TableFileName(c
->immutable_cf_options()->cf_paths
,
1127 file_number
, desc
.GetPathId());
1128 info
.input_files
.push_back(fn
);
1129 info
.input_file_infos
.push_back(CompactionFileInfo
{
1130 static_cast<int>(i
), file_number
, fmd
->oldest_blob_file_number
});
1131 if (info
.table_properties
.count(fn
) == 0) {
1132 std::shared_ptr
<const TableProperties
> tp
;
1133 auto s
= current
->GetTableProperties(&tp
, fmd
, &fn
);
1135 info
.table_properties
[fn
] = tp
;
1140 for (const auto newf
: c
->edit()->GetNewFiles()) {
1141 const FileMetaData
& meta
= newf
.second
;
1142 const FileDescriptor
& desc
= meta
.fd
;
1143 const uint64_t file_number
= desc
.GetNumber();
1144 info
.output_files
.push_back(TableFileName(
1145 c
->immutable_cf_options()->cf_paths
, file_number
, desc
.GetPathId()));
1146 info
.output_file_infos
.push_back(CompactionFileInfo
{
1147 newf
.first
, file_number
, meta
.oldest_blob_file_number
});
1149 for (auto listener
: immutable_db_options_
.listeners
) {
1150 listener
->OnCompactionBegin(this, info
);
1161 #endif // ROCKSDB_LITE
1164 void DBImpl::NotifyOnCompactionCompleted(
1165 ColumnFamilyData
* cfd
, Compaction
* c
, const Status
& st
,
1166 const CompactionJobStats
& compaction_job_stats
, const int job_id
) {
1167 #ifndef ROCKSDB_LITE
1168 if (immutable_db_options_
.listeners
.size() == 0U) {
1171 mutex_
.AssertHeld();
1172 if (shutting_down_
.load(std::memory_order_acquire
)) {
1175 if (c
->is_manual_compaction() &&
1176 manual_compaction_paused_
.load(std::memory_order_acquire
)) {
1179 Version
* current
= cfd
->current();
1181 // release lock while notifying events
1183 TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex");
1185 CompactionJobInfo info
{};
1186 BuildCompactionJobInfo(cfd
, c
, st
, compaction_job_stats
, job_id
, current
,
1188 for (auto listener
: immutable_db_options_
.listeners
) {
1189 listener
->OnCompactionCompleted(this, info
);
1194 // no need to signal bg_cv_ as it will be signaled at the end of the
1200 (void)compaction_job_stats
;
1202 #endif // ROCKSDB_LITE
1205 // REQUIREMENT: block all background work by calling PauseBackgroundWork()
1206 // before calling this function
1207 Status
DBImpl::ReFitLevel(ColumnFamilyData
* cfd
, int level
, int target_level
) {
1208 assert(level
< cfd
->NumberLevels());
1209 if (target_level
>= cfd
->NumberLevels()) {
1210 return Status::InvalidArgument("Target level exceeds number of levels");
1213 SuperVersionContext
sv_context(/* create_superversion */ true);
1217 InstrumentedMutexLock
guard_lock(&mutex_
);
1219 // only allow one thread refitting
1220 if (refitting_level_
) {
1221 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
1222 "[ReFitLevel] another thread is refitting");
1223 return Status::NotSupported("another thread is refitting");
1225 refitting_level_
= true;
1227 const MutableCFOptions mutable_cf_options
= *cfd
->GetLatestMutableCFOptions();
1228 // move to a smaller level
1229 int to_level
= target_level
;
1230 if (target_level
< 0) {
1231 to_level
= FindMinimumEmptyLevelFitting(cfd
, mutable_cf_options
, level
);
1234 auto* vstorage
= cfd
->current()->storage_info();
1235 if (to_level
> level
) {
1237 return Status::NotSupported(
1238 "Cannot change from level 0 to other levels.");
1240 // Check levels are empty for a trivial move
1241 for (int l
= level
+ 1; l
<= to_level
; l
++) {
1242 if (vstorage
->NumLevelFiles(l
) > 0) {
1243 return Status::NotSupported(
1244 "Levels between source and target are not empty for a move.");
1248 if (to_level
!= level
) {
1249 ROCKS_LOG_DEBUG(immutable_db_options_
.info_log
,
1250 "[%s] Before refitting:\n%s", cfd
->GetName().c_str(),
1251 cfd
->current()->DebugString().data());
1254 edit
.SetColumnFamily(cfd
->GetID());
1255 for (const auto& f
: vstorage
->LevelFiles(level
)) {
1256 edit
.DeleteFile(level
, f
->fd
.GetNumber());
1257 edit
.AddFile(to_level
, f
->fd
.GetNumber(), f
->fd
.GetPathId(),
1258 f
->fd
.GetFileSize(), f
->smallest
, f
->largest
,
1259 f
->fd
.smallest_seqno
, f
->fd
.largest_seqno
,
1260 f
->marked_for_compaction
, f
->oldest_blob_file_number
,
1261 f
->oldest_ancester_time
, f
->file_creation_time
,
1262 f
->file_checksum
, f
->file_checksum_func_name
);
1264 ROCKS_LOG_DEBUG(immutable_db_options_
.info_log
,
1265 "[%s] Apply version edit:\n%s", cfd
->GetName().c_str(),
1266 edit
.DebugString().data());
1268 status
= versions_
->LogAndApply(cfd
, mutable_cf_options
, &edit
, &mutex_
,
1269 directories_
.GetDbDir());
1270 InstallSuperVersionAndScheduleWork(cfd
, &sv_context
, mutable_cf_options
);
1272 ROCKS_LOG_DEBUG(immutable_db_options_
.info_log
, "[%s] LogAndApply: %s\n",
1273 cfd
->GetName().c_str(), status
.ToString().data());
1276 ROCKS_LOG_DEBUG(immutable_db_options_
.info_log
,
1277 "[%s] After refitting:\n%s", cfd
->GetName().c_str(),
1278 cfd
->current()->DebugString().data());
1283 refitting_level_
= false;
1288 int DBImpl::NumberLevels(ColumnFamilyHandle
* column_family
) {
1289 auto cfh
= reinterpret_cast<ColumnFamilyHandleImpl
*>(column_family
);
1290 return cfh
->cfd()->NumberLevels();
1293 int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle
* /*column_family*/) {
1297 int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle
* column_family
) {
1298 auto cfh
= reinterpret_cast<ColumnFamilyHandleImpl
*>(column_family
);
1299 InstrumentedMutexLock
l(&mutex_
);
1302 ->mutable_cf_options
.level0_stop_writes_trigger
;
1305 Status
DBImpl::Flush(const FlushOptions
& flush_options
,
1306 ColumnFamilyHandle
* column_family
) {
1307 auto cfh
= reinterpret_cast<ColumnFamilyHandleImpl
*>(column_family
);
1308 ROCKS_LOG_INFO(immutable_db_options_
.info_log
, "[%s] Manual flush start.",
1309 cfh
->GetName().c_str());
1311 if (immutable_db_options_
.atomic_flush
) {
1312 s
= AtomicFlushMemTables({cfh
->cfd()}, flush_options
,
1313 FlushReason::kManualFlush
);
1315 s
= FlushMemTable(cfh
->cfd(), flush_options
, FlushReason::kManualFlush
);
1318 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
1319 "[%s] Manual flush finished, status: %s\n",
1320 cfh
->GetName().c_str(), s
.ToString().c_str());
1324 Status
DBImpl::Flush(const FlushOptions
& flush_options
,
1325 const std::vector
<ColumnFamilyHandle
*>& column_families
) {
1327 if (!immutable_db_options_
.atomic_flush
) {
1328 for (auto cfh
: column_families
) {
1329 s
= Flush(flush_options
, cfh
);
1335 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
1336 "Manual atomic flush start.\n"
1337 "=====Column families:=====");
1338 for (auto cfh
: column_families
) {
1339 auto cfhi
= static_cast<ColumnFamilyHandleImpl
*>(cfh
);
1340 ROCKS_LOG_INFO(immutable_db_options_
.info_log
, "%s",
1341 cfhi
->GetName().c_str());
1343 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
1344 "=====End of column families list=====");
1345 autovector
<ColumnFamilyData
*> cfds
;
1346 std::for_each(column_families
.begin(), column_families
.end(),
1347 [&cfds
](ColumnFamilyHandle
* elem
) {
1348 auto cfh
= static_cast<ColumnFamilyHandleImpl
*>(elem
);
1349 cfds
.emplace_back(cfh
->cfd());
1351 s
= AtomicFlushMemTables(cfds
, flush_options
, FlushReason::kManualFlush
);
1352 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
1353 "Manual atomic flush finished, status: %s\n"
1354 "=====Column families:=====",
1355 s
.ToString().c_str());
1356 for (auto cfh
: column_families
) {
1357 auto cfhi
= static_cast<ColumnFamilyHandleImpl
*>(cfh
);
1358 ROCKS_LOG_INFO(immutable_db_options_
.info_log
, "%s",
1359 cfhi
->GetName().c_str());
1361 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
1362 "=====End of column families list=====");
1367 Status
DBImpl::RunManualCompaction(
1368 ColumnFamilyData
* cfd
, int input_level
, int output_level
,
1369 const CompactRangeOptions
& compact_range_options
, const Slice
* begin
,
1370 const Slice
* end
, bool exclusive
, bool disallow_trivial_move
,
1371 uint64_t max_file_num_to_ignore
) {
1372 assert(input_level
== ColumnFamilyData::kCompactAllLevels
||
1375 InternalKey begin_storage
, end_storage
;
1378 bool scheduled
= false;
1379 bool manual_conflict
= false;
1380 ManualCompactionState manual
;
1382 manual
.input_level
= input_level
;
1383 manual
.output_level
= output_level
;
1384 manual
.output_path_id
= compact_range_options
.target_path_id
;
1385 manual
.done
= false;
1386 manual
.in_progress
= false;
1387 manual
.incomplete
= false;
1388 manual
.exclusive
= exclusive
;
1389 manual
.disallow_trivial_move
= disallow_trivial_move
;
1390 // For universal compaction, we enforce every manual compaction to compact
1392 if (begin
== nullptr ||
1393 cfd
->ioptions()->compaction_style
== kCompactionStyleUniversal
||
1394 cfd
->ioptions()->compaction_style
== kCompactionStyleFIFO
) {
1395 manual
.begin
= nullptr;
1397 begin_storage
.SetMinPossibleForUserKey(*begin
);
1398 manual
.begin
= &begin_storage
;
1400 if (end
== nullptr ||
1401 cfd
->ioptions()->compaction_style
== kCompactionStyleUniversal
||
1402 cfd
->ioptions()->compaction_style
== kCompactionStyleFIFO
) {
1403 manual
.end
= nullptr;
1405 end_storage
.SetMaxPossibleForUserKey(*end
);
1406 manual
.end
= &end_storage
;
1409 TEST_SYNC_POINT("DBImpl::RunManualCompaction:0");
1410 TEST_SYNC_POINT("DBImpl::RunManualCompaction:1");
1411 InstrumentedMutexLock
l(&mutex_
);
1413 // When a manual compaction arrives, temporarily disable scheduling of
1414 // non-manual compactions and wait until the number of scheduled compaction
1415 // jobs drops to zero. This is needed to ensure that this manual compaction
1416 // can compact any range of keys/files.
1418 // HasPendingManualCompaction() is true when at least one thread is inside
1419 // RunManualCompaction(), i.e. during that time no other compaction will
1420 // get scheduled (see MaybeScheduleFlushOrCompaction).
1422 // Note that the following loop doesn't stop more that one thread calling
1423 // RunManualCompaction() from getting to the second while loop below.
1424 // However, only one of them will actually schedule compaction, while
1425 // others will wait on a condition variable until it completes.
1427 AddManualCompaction(&manual
);
1428 TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_
);
1430 while (bg_bottom_compaction_scheduled_
> 0 ||
1431 bg_compaction_scheduled_
> 0) {
1432 TEST_SYNC_POINT("DBImpl::RunManualCompaction:WaitScheduled");
1434 immutable_db_options_
.info_log
,
1435 "[%s] Manual compaction waiting for all other scheduled background "
1436 "compactions to finish",
1437 cfd
->GetName().c_str());
1442 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
1443 "[%s] Manual compaction starting", cfd
->GetName().c_str());
1445 LogBuffer
log_buffer(InfoLogLevel::INFO_LEVEL
,
1446 immutable_db_options_
.info_log
.get());
1447 // We don't check bg_error_ here, because if we get the error in compaction,
1448 // the compaction will set manual.status to bg_error_ and set manual.done to
1450 while (!manual
.done
) {
1451 assert(HasPendingManualCompaction());
1452 manual_conflict
= false;
1453 Compaction
* compaction
= nullptr;
1454 if (ShouldntRunManualCompaction(&manual
) || (manual
.in_progress
== true) ||
1456 (((manual
.manual_end
= &manual
.tmp_storage1
) != nullptr) &&
1457 ((compaction
= manual
.cfd
->CompactRange(
1458 *manual
.cfd
->GetLatestMutableCFOptions(), manual
.input_level
,
1459 manual
.output_level
, compact_range_options
, manual
.begin
,
1460 manual
.end
, &manual
.manual_end
, &manual_conflict
,
1461 max_file_num_to_ignore
)) == nullptr &&
1462 manual_conflict
))) {
1463 // exclusive manual compactions should not see a conflict during
1465 assert(!exclusive
|| !manual_conflict
);
1466 // Running either this or some other manual compaction
1468 if (scheduled
&& manual
.incomplete
== true) {
1469 assert(!manual
.in_progress
);
1471 manual
.incomplete
= false;
1473 } else if (!scheduled
) {
1474 if (compaction
== nullptr) {
1479 ca
= new CompactionArg
;
1481 ca
->prepicked_compaction
= new PrepickedCompaction
;
1482 ca
->prepicked_compaction
->manual_compaction_state
= &manual
;
1483 ca
->prepicked_compaction
->compaction
= compaction
;
1484 if (!RequestCompactionToken(
1485 cfd
, true, &ca
->prepicked_compaction
->task_token
, &log_buffer
)) {
1486 // Don't throttle manual compaction, only count outstanding tasks.
1489 manual
.incomplete
= false;
1490 bg_compaction_scheduled_
++;
1491 env_
->Schedule(&DBImpl::BGWorkCompaction
, ca
, Env::Priority::LOW
, this,
1492 &DBImpl::UnscheduleCompactionCallback
);
1497 log_buffer
.FlushBufferToLog();
1498 assert(!manual
.in_progress
);
1499 assert(HasPendingManualCompaction());
1500 RemoveManualCompaction(&manual
);
1502 return manual
.status
;
1505 void DBImpl::GenerateFlushRequest(const autovector
<ColumnFamilyData
*>& cfds
,
1506 FlushRequest
* req
) {
1507 assert(req
!= nullptr);
1508 req
->reserve(cfds
.size());
1509 for (const auto cfd
: cfds
) {
1510 if (nullptr == cfd
) {
1511 // cfd may be null, see DBImpl::ScheduleFlushes
1514 uint64_t max_memtable_id
= cfd
->imm()->GetLatestMemTableID();
1515 req
->emplace_back(cfd
, max_memtable_id
);
1519 Status
DBImpl::FlushMemTable(ColumnFamilyData
* cfd
,
1520 const FlushOptions
& flush_options
,
1521 FlushReason flush_reason
, bool writes_stopped
) {
1523 uint64_t flush_memtable_id
= 0;
1524 if (!flush_options
.allow_write_stall
) {
1525 bool flush_needed
= true;
1526 s
= WaitUntilFlushWouldNotStallWrites(cfd
, &flush_needed
);
1527 TEST_SYNC_POINT("DBImpl::FlushMemTable:StallWaitDone");
1528 if (!s
.ok() || !flush_needed
) {
1532 FlushRequest flush_req
;
1534 WriteContext context
;
1535 InstrumentedMutexLock
guard_lock(&mutex_
);
1537 WriteThread::Writer w
;
1538 WriteThread::Writer nonmem_w
;
1539 if (!writes_stopped
) {
1540 write_thread_
.EnterUnbatched(&w
, &mutex_
);
1541 if (two_write_queues_
) {
1542 nonmem_write_thread_
.EnterUnbatched(&nonmem_w
, &mutex_
);
1545 WaitForPendingWrites();
1547 if (!cfd
->mem()->IsEmpty() || !cached_recoverable_state_empty_
.load()) {
1548 s
= SwitchMemtable(cfd
, &context
);
1551 if (cfd
->imm()->NumNotFlushed() != 0 || !cfd
->mem()->IsEmpty() ||
1552 !cached_recoverable_state_empty_
.load()) {
1553 flush_memtable_id
= cfd
->imm()->GetLatestMemTableID();
1554 flush_req
.emplace_back(cfd
, flush_memtable_id
);
1556 if (immutable_db_options_
.persist_stats_to_disk
) {
1557 ColumnFamilyData
* cfd_stats
=
1558 versions_
->GetColumnFamilySet()->GetColumnFamily(
1559 kPersistentStatsColumnFamilyName
);
1560 if (cfd_stats
!= nullptr && cfd_stats
!= cfd
&&
1561 !cfd_stats
->mem()->IsEmpty()) {
1562 // only force flush stats CF when it will be the only CF lagging
1563 // behind after the current flush
1564 bool stats_cf_flush_needed
= true;
1565 for (auto* loop_cfd
: *versions_
->GetColumnFamilySet()) {
1566 if (loop_cfd
== cfd_stats
|| loop_cfd
== cfd
) {
1569 if (loop_cfd
->GetLogNumber() <= cfd_stats
->GetLogNumber()) {
1570 stats_cf_flush_needed
= false;
1573 if (stats_cf_flush_needed
) {
1574 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
1575 "Force flushing stats CF with manual flush of %s "
1576 "to avoid holding old logs",
1577 cfd
->GetName().c_str());
1578 s
= SwitchMemtable(cfd_stats
, &context
);
1579 flush_memtable_id
= cfd_stats
->imm()->GetLatestMemTableID();
1580 flush_req
.emplace_back(cfd_stats
, flush_memtable_id
);
1586 if (s
.ok() && !flush_req
.empty()) {
1587 for (auto& elem
: flush_req
) {
1588 ColumnFamilyData
* loop_cfd
= elem
.first
;
1589 loop_cfd
->imm()->FlushRequested();
1591 // If the caller wants to wait for this flush to complete, it indicates
1592 // that the caller expects the ColumnFamilyData not to be free'ed by
1593 // other threads which may drop the column family concurrently.
1594 // Therefore, we increase the cfd's ref count.
1595 if (flush_options
.wait
) {
1596 for (auto& elem
: flush_req
) {
1597 ColumnFamilyData
* loop_cfd
= elem
.first
;
1601 SchedulePendingFlush(flush_req
, flush_reason
);
1602 MaybeScheduleFlushOrCompaction();
1605 if (!writes_stopped
) {
1606 write_thread_
.ExitUnbatched(&w
);
1607 if (two_write_queues_
) {
1608 nonmem_write_thread_
.ExitUnbatched(&nonmem_w
);
1612 TEST_SYNC_POINT("DBImpl::FlushMemTable:AfterScheduleFlush");
1613 TEST_SYNC_POINT("DBImpl::FlushMemTable:BeforeWaitForBgFlush");
1614 if (s
.ok() && flush_options
.wait
) {
1615 autovector
<ColumnFamilyData
*> cfds
;
1616 autovector
<const uint64_t*> flush_memtable_ids
;
1617 for (auto& iter
: flush_req
) {
1618 cfds
.push_back(iter
.first
);
1619 flush_memtable_ids
.push_back(&(iter
.second
));
1621 s
= WaitForFlushMemTables(cfds
, flush_memtable_ids
,
1622 (flush_reason
== FlushReason::kErrorRecovery
));
1623 InstrumentedMutexLock
lock_guard(&mutex_
);
1624 for (auto* tmp_cfd
: cfds
) {
1625 tmp_cfd
->UnrefAndTryDelete();
1628 TEST_SYNC_POINT("DBImpl::FlushMemTable:FlushMemTableFinished");
1632 // Flush all elements in 'column_family_datas'
1633 // and atomically record the result to the MANIFEST.
1634 Status
DBImpl::AtomicFlushMemTables(
1635 const autovector
<ColumnFamilyData
*>& column_family_datas
,
1636 const FlushOptions
& flush_options
, FlushReason flush_reason
,
1637 bool writes_stopped
) {
1639 if (!flush_options
.allow_write_stall
) {
1640 int num_cfs_to_flush
= 0;
1641 for (auto cfd
: column_family_datas
) {
1642 bool flush_needed
= true;
1643 s
= WaitUntilFlushWouldNotStallWrites(cfd
, &flush_needed
);
1646 } else if (flush_needed
) {
1650 if (0 == num_cfs_to_flush
) {
1654 FlushRequest flush_req
;
1655 autovector
<ColumnFamilyData
*> cfds
;
1657 WriteContext context
;
1658 InstrumentedMutexLock
guard_lock(&mutex_
);
1660 WriteThread::Writer w
;
1661 WriteThread::Writer nonmem_w
;
1662 if (!writes_stopped
) {
1663 write_thread_
.EnterUnbatched(&w
, &mutex_
);
1664 if (two_write_queues_
) {
1665 nonmem_write_thread_
.EnterUnbatched(&nonmem_w
, &mutex_
);
1668 WaitForPendingWrites();
1670 for (auto cfd
: column_family_datas
) {
1671 if (cfd
->IsDropped()) {
1674 if (cfd
->imm()->NumNotFlushed() != 0 || !cfd
->mem()->IsEmpty() ||
1675 !cached_recoverable_state_empty_
.load()) {
1676 cfds
.emplace_back(cfd
);
1679 for (auto cfd
: cfds
) {
1680 if (cfd
->mem()->IsEmpty() && cached_recoverable_state_empty_
.load()) {
1684 s
= SwitchMemtable(cfd
, &context
);
1685 cfd
->UnrefAndTryDelete();
1691 AssignAtomicFlushSeq(cfds
);
1692 for (auto cfd
: cfds
) {
1693 cfd
->imm()->FlushRequested();
1695 // If the caller wants to wait for this flush to complete, it indicates
1696 // that the caller expects the ColumnFamilyData not to be free'ed by
1697 // other threads which may drop the column family concurrently.
1698 // Therefore, we increase the cfd's ref count.
1699 if (flush_options
.wait
) {
1700 for (auto cfd
: cfds
) {
1704 GenerateFlushRequest(cfds
, &flush_req
);
1705 SchedulePendingFlush(flush_req
, flush_reason
);
1706 MaybeScheduleFlushOrCompaction();
1709 if (!writes_stopped
) {
1710 write_thread_
.ExitUnbatched(&w
);
1711 if (two_write_queues_
) {
1712 nonmem_write_thread_
.ExitUnbatched(&nonmem_w
);
1716 TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:AfterScheduleFlush");
1717 TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush");
1718 if (s
.ok() && flush_options
.wait
) {
1719 autovector
<const uint64_t*> flush_memtable_ids
;
1720 for (auto& iter
: flush_req
) {
1721 flush_memtable_ids
.push_back(&(iter
.second
));
1723 s
= WaitForFlushMemTables(cfds
, flush_memtable_ids
,
1724 (flush_reason
== FlushReason::kErrorRecovery
));
1725 InstrumentedMutexLock
lock_guard(&mutex_
);
1726 for (auto* cfd
: cfds
) {
1727 cfd
->UnrefAndTryDelete();
1733 // Calling FlushMemTable(), whether from DB::Flush() or from Backup Engine, can
1734 // cause write stall, for example if one memtable is being flushed already.
1735 // This method tries to avoid write stall (similar to CompactRange() behavior)
1736 // it emulates how the SuperVersion / LSM would change if flush happens, checks
1737 // it against various constrains and delays flush if it'd cause write stall.
1738 // Called should check status and flush_needed to see if flush already happened.
1739 Status
DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData
* cfd
,
1740 bool* flush_needed
) {
1742 *flush_needed
= true;
1743 InstrumentedMutexLock
l(&mutex_
);
1744 uint64_t orig_active_memtable_id
= cfd
->mem()->GetID();
1745 WriteStallCondition write_stall_condition
= WriteStallCondition::kNormal
;
1747 if (write_stall_condition
!= WriteStallCondition::kNormal
) {
1748 // Same error handling as user writes: Don't wait if there's a
1749 // background error, even if it's a soft error. We might wait here
1750 // indefinitely as the pending flushes/compactions may never finish
1751 // successfully, resulting in the stall condition lasting indefinitely
1752 if (error_handler_
.IsBGWorkStopped()) {
1753 return error_handler_
.GetBGError();
1756 TEST_SYNC_POINT("DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait");
1757 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
1758 "[%s] WaitUntilFlushWouldNotStallWrites"
1759 " waiting on stall conditions to clear",
1760 cfd
->GetName().c_str());
1763 if (cfd
->IsDropped()) {
1764 return Status::ColumnFamilyDropped();
1766 if (shutting_down_
.load(std::memory_order_acquire
)) {
1767 return Status::ShutdownInProgress();
1770 uint64_t earliest_memtable_id
=
1771 std::min(cfd
->mem()->GetID(), cfd
->imm()->GetEarliestMemTableID());
1772 if (earliest_memtable_id
> orig_active_memtable_id
) {
1773 // We waited so long that the memtable we were originally waiting on was
1775 *flush_needed
= false;
1776 return Status::OK();
1779 const auto& mutable_cf_options
= *cfd
->GetLatestMutableCFOptions();
1780 const auto* vstorage
= cfd
->current()->storage_info();
1782 // Skip stalling check if we're below auto-flush and auto-compaction
1783 // triggers. If it stalled in these conditions, that'd mean the stall
1784 // triggers are so low that stalling is needed for any background work. In
1785 // that case we shouldn't wait since background work won't be scheduled.
1786 if (cfd
->imm()->NumNotFlushed() <
1787 cfd
->ioptions()->min_write_buffer_number_to_merge
&&
1788 vstorage
->l0_delay_trigger_count() <
1789 mutable_cf_options
.level0_file_num_compaction_trigger
) {
1793 // check whether one extra immutable memtable or an extra L0 file would
1794 // cause write stalling mode to be entered. It could still enter stall
1795 // mode due to pending compaction bytes, but that's less common
1796 write_stall_condition
=
1797 ColumnFamilyData::GetWriteStallConditionAndCause(
1798 cfd
->imm()->NumNotFlushed() + 1,
1799 vstorage
->l0_delay_trigger_count() + 1,
1800 vstorage
->estimated_compaction_needed_bytes(), mutable_cf_options
)
1802 } while (write_stall_condition
!= WriteStallCondition::kNormal
);
1804 return Status::OK();
1807 // Wait for memtables to be flushed for multiple column families.
1808 // let N = cfds.size()
1810 // 1) if flush_memtable_ids[i] is not null, then the memtables with lower IDs
1811 // have to be flushed for THIS column family;
1812 // 2) if flush_memtable_ids[i] is null, then all memtables in THIS column
1813 // family have to be flushed.
1814 // Finish waiting when ALL column families finish flushing memtables.
1815 // resuming_from_bg_err indicates whether the caller is trying to resume from
1816 // background error or in normal processing.
1817 Status
DBImpl::WaitForFlushMemTables(
1818 const autovector
<ColumnFamilyData
*>& cfds
,
1819 const autovector
<const uint64_t*>& flush_memtable_ids
,
1820 bool resuming_from_bg_err
) {
1821 int num
= static_cast<int>(cfds
.size());
1822 // Wait until the compaction completes
1823 InstrumentedMutexLock
l(&mutex_
);
1824 // If the caller is trying to resume from bg error, then
1825 // error_handler_.IsDBStopped() is true.
1826 while (resuming_from_bg_err
|| !error_handler_
.IsDBStopped()) {
1827 if (shutting_down_
.load(std::memory_order_acquire
)) {
1828 return Status::ShutdownInProgress();
1830 // If an error has occurred during resumption, then no need to wait.
1831 if (!error_handler_
.GetRecoveryError().ok()) {
1834 // Number of column families that have been dropped.
1835 int num_dropped
= 0;
1836 // Number of column families that have finished flush.
1837 int num_finished
= 0;
1838 for (int i
= 0; i
< num
; ++i
) {
1839 if (cfds
[i
]->IsDropped()) {
1841 } else if (cfds
[i
]->imm()->NumNotFlushed() == 0 ||
1842 (flush_memtable_ids
[i
] != nullptr &&
1843 cfds
[i
]->imm()->GetEarliestMemTableID() >
1844 *flush_memtable_ids
[i
])) {
1848 if (1 == num_dropped
&& 1 == num
) {
1849 return Status::InvalidArgument("Cannot flush a dropped CF");
1851 // Column families involved in this flush request have either been dropped
1852 // or finished flush. Then it's time to finish waiting.
1853 if (num_dropped
+ num_finished
== num
) {
1859 // If not resuming from bg error, and an error has caused the DB to stop,
1860 // then report the bg error to caller.
1861 if (!resuming_from_bg_err
&& error_handler_
.IsDBStopped()) {
1862 s
= error_handler_
.GetBGError();
1867 Status
DBImpl::EnableAutoCompaction(
1868 const std::vector
<ColumnFamilyHandle
*>& column_family_handles
) {
1870 for (auto cf_ptr
: column_family_handles
) {
1872 this->SetOptions(cf_ptr
, {{"disable_auto_compactions", "false"}});
1881 void DBImpl::DisableManualCompaction() {
1882 manual_compaction_paused_
.store(true, std::memory_order_release
);
1885 void DBImpl::EnableManualCompaction() {
1886 manual_compaction_paused_
.store(false, std::memory_order_release
);
1889 void DBImpl::MaybeScheduleFlushOrCompaction() {
1890 mutex_
.AssertHeld();
1891 if (!opened_successfully_
) {
1892 // Compaction may introduce data race to DB open
1895 if (bg_work_paused_
> 0) {
1896 // we paused the background work
1898 } else if (error_handler_
.IsBGWorkStopped() &&
1899 !error_handler_
.IsRecoveryInProgress()) {
1900 // There has been a hard error and this call is not part of the recovery
1901 // sequence. Bail out here so we don't get into an endless loop of
1902 // scheduling BG work which will again call this function
1904 } else if (shutting_down_
.load(std::memory_order_acquire
)) {
1905 // DB is being deleted; no more background compactions
1908 auto bg_job_limits
= GetBGJobLimits();
1909 bool is_flush_pool_empty
=
1910 env_
->GetBackgroundThreads(Env::Priority::HIGH
) == 0;
1911 while (!is_flush_pool_empty
&& unscheduled_flushes_
> 0 &&
1912 bg_flush_scheduled_
< bg_job_limits
.max_flushes
) {
1913 bg_flush_scheduled_
++;
1914 FlushThreadArg
* fta
= new FlushThreadArg
;
1916 fta
->thread_pri_
= Env::Priority::HIGH
;
1917 env_
->Schedule(&DBImpl::BGWorkFlush
, fta
, Env::Priority::HIGH
, this,
1918 &DBImpl::UnscheduleFlushCallback
);
1919 --unscheduled_flushes_
;
1920 TEST_SYNC_POINT_CALLBACK(
1921 "DBImpl::MaybeScheduleFlushOrCompaction:AfterSchedule:0",
1922 &unscheduled_flushes_
);
1925 // special case -- if high-pri (flush) thread pool is empty, then schedule
1926 // flushes in low-pri (compaction) thread pool.
1927 if (is_flush_pool_empty
) {
1928 while (unscheduled_flushes_
> 0 &&
1929 bg_flush_scheduled_
+ bg_compaction_scheduled_
<
1930 bg_job_limits
.max_flushes
) {
1931 bg_flush_scheduled_
++;
1932 FlushThreadArg
* fta
= new FlushThreadArg
;
1934 fta
->thread_pri_
= Env::Priority::LOW
;
1935 env_
->Schedule(&DBImpl::BGWorkFlush
, fta
, Env::Priority::LOW
, this,
1936 &DBImpl::UnscheduleFlushCallback
);
1937 --unscheduled_flushes_
;
1941 if (bg_compaction_paused_
> 0) {
1942 // we paused the background compaction
1944 } else if (error_handler_
.IsBGWorkStopped()) {
1945 // Compaction is not part of the recovery sequence from a hard error. We
1946 // might get here because recovery might do a flush and install a new
1947 // super version, which will try to schedule pending compactions. Bail
1948 // out here and let the higher level recovery handle compactions
1952 if (HasExclusiveManualCompaction()) {
1953 // only manual compactions are allowed to run. don't schedule automatic
1955 TEST_SYNC_POINT("DBImpl::MaybeScheduleFlushOrCompaction:Conflict");
1959 while (bg_compaction_scheduled_
< bg_job_limits
.max_compactions
&&
1960 unscheduled_compactions_
> 0) {
1961 CompactionArg
* ca
= new CompactionArg
;
1963 ca
->prepicked_compaction
= nullptr;
1964 bg_compaction_scheduled_
++;
1965 unscheduled_compactions_
--;
1966 env_
->Schedule(&DBImpl::BGWorkCompaction
, ca
, Env::Priority::LOW
, this,
1967 &DBImpl::UnscheduleCompactionCallback
);
1971 DBImpl::BGJobLimits
DBImpl::GetBGJobLimits() const {
1972 mutex_
.AssertHeld();
1973 return GetBGJobLimits(immutable_db_options_
.max_background_flushes
,
1974 mutable_db_options_
.max_background_compactions
,
1975 mutable_db_options_
.max_background_jobs
,
1976 write_controller_
.NeedSpeedupCompaction());
1979 DBImpl::BGJobLimits
DBImpl::GetBGJobLimits(int max_background_flushes
,
1980 int max_background_compactions
,
1981 int max_background_jobs
,
1982 bool parallelize_compactions
) {
1984 if (max_background_flushes
== -1 && max_background_compactions
== -1) {
1985 // for our first stab implementing max_background_jobs, simply allocate a
1986 // quarter of the threads to flushes.
1987 res
.max_flushes
= std::max(1, max_background_jobs
/ 4);
1988 res
.max_compactions
= std::max(1, max_background_jobs
- res
.max_flushes
);
1990 // compatibility code in case users haven't migrated to max_background_jobs,
1991 // which automatically computes flush/compaction limits
1992 res
.max_flushes
= std::max(1, max_background_flushes
);
1993 res
.max_compactions
= std::max(1, max_background_compactions
);
1995 if (!parallelize_compactions
) {
1996 // throttle background compactions until we deem necessary
1997 res
.max_compactions
= 1;
2002 void DBImpl::AddToCompactionQueue(ColumnFamilyData
* cfd
) {
2003 assert(!cfd
->queued_for_compaction());
2005 compaction_queue_
.push_back(cfd
);
2006 cfd
->set_queued_for_compaction(true);
2009 ColumnFamilyData
* DBImpl::PopFirstFromCompactionQueue() {
2010 assert(!compaction_queue_
.empty());
2011 auto cfd
= *compaction_queue_
.begin();
2012 compaction_queue_
.pop_front();
2013 assert(cfd
->queued_for_compaction());
2014 cfd
->set_queued_for_compaction(false);
2018 DBImpl::FlushRequest
DBImpl::PopFirstFromFlushQueue() {
2019 assert(!flush_queue_
.empty());
2020 FlushRequest flush_req
= flush_queue_
.front();
2021 flush_queue_
.pop_front();
2022 // TODO: need to unset flush reason?
2026 ColumnFamilyData
* DBImpl::PickCompactionFromQueue(
2027 std::unique_ptr
<TaskLimiterToken
>* token
, LogBuffer
* log_buffer
) {
2028 assert(!compaction_queue_
.empty());
2029 assert(*token
== nullptr);
2030 autovector
<ColumnFamilyData
*> throttled_candidates
;
2031 ColumnFamilyData
* cfd
= nullptr;
2032 while (!compaction_queue_
.empty()) {
2033 auto first_cfd
= *compaction_queue_
.begin();
2034 compaction_queue_
.pop_front();
2035 assert(first_cfd
->queued_for_compaction());
2036 if (!RequestCompactionToken(first_cfd
, false, token
, log_buffer
)) {
2037 throttled_candidates
.push_back(first_cfd
);
2041 cfd
->set_queued_for_compaction(false);
2044 // Add throttled compaction candidates back to queue in the original order.
2045 for (auto iter
= throttled_candidates
.rbegin();
2046 iter
!= throttled_candidates
.rend(); ++iter
) {
2047 compaction_queue_
.push_front(*iter
);
2052 void DBImpl::SchedulePendingFlush(const FlushRequest
& flush_req
,
2053 FlushReason flush_reason
) {
2054 if (flush_req
.empty()) {
2057 for (auto& iter
: flush_req
) {
2058 ColumnFamilyData
* cfd
= iter
.first
;
2060 cfd
->SetFlushReason(flush_reason
);
2062 ++unscheduled_flushes_
;
2063 flush_queue_
.push_back(flush_req
);
2066 void DBImpl::SchedulePendingCompaction(ColumnFamilyData
* cfd
) {
2067 if (!cfd
->queued_for_compaction() && cfd
->NeedsCompaction()) {
2068 AddToCompactionQueue(cfd
);
2069 ++unscheduled_compactions_
;
2073 void DBImpl::SchedulePendingPurge(std::string fname
, std::string dir_to_sync
,
2074 FileType type
, uint64_t number
, int job_id
) {
2075 mutex_
.AssertHeld();
2076 PurgeFileInfo
file_info(fname
, dir_to_sync
, type
, number
, job_id
);
2077 purge_files_
.insert({{number
, std::move(file_info
)}});
2080 void DBImpl::BGWorkFlush(void* arg
) {
2081 FlushThreadArg fta
= *(reinterpret_cast<FlushThreadArg
*>(arg
));
2082 delete reinterpret_cast<FlushThreadArg
*>(arg
);
2084 IOSTATS_SET_THREAD_POOL_ID(fta
.thread_pri_
);
2085 TEST_SYNC_POINT("DBImpl::BGWorkFlush");
2086 static_cast_with_check
<DBImpl
, DB
>(fta
.db_
)->BackgroundCallFlush(
2088 TEST_SYNC_POINT("DBImpl::BGWorkFlush:done");
2091 void DBImpl::BGWorkCompaction(void* arg
) {
2092 CompactionArg ca
= *(reinterpret_cast<CompactionArg
*>(arg
));
2093 delete reinterpret_cast<CompactionArg
*>(arg
);
2094 IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW
);
2095 TEST_SYNC_POINT("DBImpl::BGWorkCompaction");
2096 auto prepicked_compaction
=
2097 static_cast<PrepickedCompaction
*>(ca
.prepicked_compaction
);
2098 static_cast_with_check
<DBImpl
, DB
>(ca
.db
)->BackgroundCallCompaction(
2099 prepicked_compaction
, Env::Priority::LOW
);
2100 delete prepicked_compaction
;
2103 void DBImpl::BGWorkBottomCompaction(void* arg
) {
2104 CompactionArg ca
= *(static_cast<CompactionArg
*>(arg
));
2105 delete static_cast<CompactionArg
*>(arg
);
2106 IOSTATS_SET_THREAD_POOL_ID(Env::Priority::BOTTOM
);
2107 TEST_SYNC_POINT("DBImpl::BGWorkBottomCompaction");
2108 auto* prepicked_compaction
= ca
.prepicked_compaction
;
2109 assert(prepicked_compaction
&& prepicked_compaction
->compaction
&&
2110 !prepicked_compaction
->manual_compaction_state
);
2111 ca
.db
->BackgroundCallCompaction(prepicked_compaction
, Env::Priority::BOTTOM
);
2112 delete prepicked_compaction
;
2115 void DBImpl::BGWorkPurge(void* db
) {
2116 IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH
);
2117 TEST_SYNC_POINT("DBImpl::BGWorkPurge:start");
2118 reinterpret_cast<DBImpl
*>(db
)->BackgroundCallPurge();
2119 TEST_SYNC_POINT("DBImpl::BGWorkPurge:end");
2122 void DBImpl::UnscheduleCompactionCallback(void* arg
) {
2123 CompactionArg ca
= *(reinterpret_cast<CompactionArg
*>(arg
));
2124 delete reinterpret_cast<CompactionArg
*>(arg
);
2125 if (ca
.prepicked_compaction
!= nullptr) {
2126 if (ca
.prepicked_compaction
->compaction
!= nullptr) {
2127 delete ca
.prepicked_compaction
->compaction
;
2129 delete ca
.prepicked_compaction
;
2131 TEST_SYNC_POINT("DBImpl::UnscheduleCompactionCallback");
2134 void DBImpl::UnscheduleFlushCallback(void* arg
) {
2135 delete reinterpret_cast<FlushThreadArg
*>(arg
);
2136 TEST_SYNC_POINT("DBImpl::UnscheduleFlushCallback");
2139 Status
DBImpl::BackgroundFlush(bool* made_progress
, JobContext
* job_context
,
2140 LogBuffer
* log_buffer
, FlushReason
* reason
,
2141 Env::Priority thread_pri
) {
2142 mutex_
.AssertHeld();
2145 *reason
= FlushReason::kOthers
;
2146 // If BG work is stopped due to an error, but a recovery is in progress,
2147 // that means this flush is part of the recovery. So allow it to go through
2148 if (!error_handler_
.IsBGWorkStopped()) {
2149 if (shutting_down_
.load(std::memory_order_acquire
)) {
2150 status
= Status::ShutdownInProgress();
2152 } else if (!error_handler_
.IsRecoveryInProgress()) {
2153 status
= error_handler_
.GetBGError();
2160 autovector
<BGFlushArg
> bg_flush_args
;
2161 std::vector
<SuperVersionContext
>& superversion_contexts
=
2162 job_context
->superversion_contexts
;
2163 autovector
<ColumnFamilyData
*> column_families_not_to_flush
;
2164 while (!flush_queue_
.empty()) {
2165 // This cfd is already referenced
2166 const FlushRequest
& flush_req
= PopFirstFromFlushQueue();
2167 superversion_contexts
.clear();
2168 superversion_contexts
.reserve(flush_req
.size());
2170 for (const auto& iter
: flush_req
) {
2171 ColumnFamilyData
* cfd
= iter
.first
;
2172 if (cfd
->IsDropped() || !cfd
->imm()->IsFlushPending()) {
2173 // can't flush this CF, try next one
2174 column_families_not_to_flush
.push_back(cfd
);
2177 superversion_contexts
.emplace_back(SuperVersionContext(true));
2178 bg_flush_args
.emplace_back(cfd
, iter
.second
,
2179 &(superversion_contexts
.back()));
2181 if (!bg_flush_args
.empty()) {
2186 if (!bg_flush_args
.empty()) {
2187 auto bg_job_limits
= GetBGJobLimits();
2188 for (const auto& arg
: bg_flush_args
) {
2189 ColumnFamilyData
* cfd
= arg
.cfd_
;
2192 "Calling FlushMemTableToOutputFile with column "
2193 "family [%s], flush slots available %d, compaction slots available "
2195 "flush slots scheduled %d, compaction slots scheduled %d",
2196 cfd
->GetName().c_str(), bg_job_limits
.max_flushes
,
2197 bg_job_limits
.max_compactions
, bg_flush_scheduled_
,
2198 bg_compaction_scheduled_
);
2200 status
= FlushMemTablesToOutputFiles(bg_flush_args
, made_progress
,
2201 job_context
, log_buffer
, thread_pri
);
2202 TEST_SYNC_POINT("DBImpl::BackgroundFlush:BeforeFlush");
2203 // All the CFDs in the FlushReq must have the same flush reason, so just
2204 // grab the first one
2205 *reason
= bg_flush_args
[0].cfd_
->GetFlushReason();
2206 for (auto& arg
: bg_flush_args
) {
2207 ColumnFamilyData
* cfd
= arg
.cfd_
;
2208 if (cfd
->UnrefAndTryDelete()) {
2213 for (auto cfd
: column_families_not_to_flush
) {
2214 cfd
->UnrefAndTryDelete();
2219 void DBImpl::BackgroundCallFlush(Env::Priority thread_pri
) {
2220 bool made_progress
= false;
2221 JobContext
job_context(next_job_id_
.fetch_add(1), true);
2223 TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:start");
2225 LogBuffer
log_buffer(InfoLogLevel::INFO_LEVEL
,
2226 immutable_db_options_
.info_log
.get());
2228 InstrumentedMutexLock
l(&mutex_
);
2229 assert(bg_flush_scheduled_
);
2230 num_running_flushes_
++;
2232 std::unique_ptr
<std::list
<uint64_t>::iterator
>
2233 pending_outputs_inserted_elem(new std::list
<uint64_t>::iterator(
2234 CaptureCurrentFileNumberInPendingOutputs()));
2237 Status s
= BackgroundFlush(&made_progress
, &job_context
, &log_buffer
,
2238 &reason
, thread_pri
);
2239 if (!s
.ok() && !s
.IsShutdownInProgress() && !s
.IsColumnFamilyDropped() &&
2240 reason
!= FlushReason::kErrorRecovery
) {
2241 // Wait a little bit before retrying background flush in
2242 // case this is an environmental problem and we do not want to
2243 // chew up resources for failed flushes for the duration of
2245 uint64_t error_cnt
=
2246 default_cf_internal_stats_
->BumpAndGetBackgroundErrorCount();
2247 bg_cv_
.SignalAll(); // In case a waiter can proceed despite the error
2249 ROCKS_LOG_ERROR(immutable_db_options_
.info_log
,
2250 "Waiting after background flush error: %s"
2251 "Accumulated background error counts: %" PRIu64
,
2252 s
.ToString().c_str(), error_cnt
);
2253 log_buffer
.FlushBufferToLog();
2254 LogFlush(immutable_db_options_
.info_log
);
2255 env_
->SleepForMicroseconds(1000000);
2259 TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FlushFinish:0");
2260 ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem
);
2262 // If flush failed, we want to delete all temporary files that we might have
2263 // created. Thus, we force full scan in FindObsoleteFiles()
2264 FindObsoleteFiles(&job_context
, !s
.ok() && !s
.IsShutdownInProgress() &&
2265 !s
.IsColumnFamilyDropped());
2266 // delete unnecessary files if any, this is done outside the mutex
2267 if (job_context
.HaveSomethingToClean() ||
2268 job_context
.HaveSomethingToDelete() || !log_buffer
.IsEmpty()) {
2270 TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FilesFound");
2271 // Have to flush the info logs before bg_flush_scheduled_--
2272 // because if bg_flush_scheduled_ becomes 0 and the lock is
2273 // released, the deconstructor of DB can kick in and destroy all the
2274 // states of DB so info_log might not be available after that point.
2275 // It also applies to access other states that DB owns.
2276 log_buffer
.FlushBufferToLog();
2277 if (job_context
.HaveSomethingToDelete()) {
2278 PurgeObsoleteFiles(job_context
);
2280 job_context
.Clean();
2283 TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:ContextCleanedUp");
2285 assert(num_running_flushes_
> 0);
2286 num_running_flushes_
--;
2287 bg_flush_scheduled_
--;
2288 // See if there's more work to be done
2289 MaybeScheduleFlushOrCompaction();
2290 atomic_flush_install_cv_
.SignalAll();
2292 // IMPORTANT: there should be no code after calling SignalAll. This call may
2293 // signal the DB destructor that it's OK to proceed with destruction. In
2294 // that case, all DB variables will be dealloacated and referencing them
2295 // will cause trouble.
2299 void DBImpl::BackgroundCallCompaction(PrepickedCompaction
* prepicked_compaction
,
2300 Env::Priority bg_thread_pri
) {
2301 bool made_progress
= false;
2302 JobContext
job_context(next_job_id_
.fetch_add(1), true);
2303 TEST_SYNC_POINT("BackgroundCallCompaction:0");
2304 LogBuffer
log_buffer(InfoLogLevel::INFO_LEVEL
,
2305 immutable_db_options_
.info_log
.get());
2307 InstrumentedMutexLock
l(&mutex_
);
2309 // This call will unlock/lock the mutex to wait for current running
2310 // IngestExternalFile() calls to finish.
2311 WaitForIngestFile();
2313 num_running_compactions_
++;
2315 std::unique_ptr
<std::list
<uint64_t>::iterator
>
2316 pending_outputs_inserted_elem(new std::list
<uint64_t>::iterator(
2317 CaptureCurrentFileNumberInPendingOutputs()));
2319 assert((bg_thread_pri
== Env::Priority::BOTTOM
&&
2320 bg_bottom_compaction_scheduled_
) ||
2321 (bg_thread_pri
== Env::Priority::LOW
&& bg_compaction_scheduled_
));
2322 Status s
= BackgroundCompaction(&made_progress
, &job_context
, &log_buffer
,
2323 prepicked_compaction
, bg_thread_pri
);
2324 TEST_SYNC_POINT("BackgroundCallCompaction:1");
2326 bg_cv_
.SignalAll(); // In case a waiter can proceed despite the error
2328 env_
->SleepForMicroseconds(10000); // prevent hot loop
2330 } else if (!s
.ok() && !s
.IsShutdownInProgress() &&
2331 !s
.IsManualCompactionPaused() && !s
.IsColumnFamilyDropped()) {
2332 // Wait a little bit before retrying background compaction in
2333 // case this is an environmental problem and we do not want to
2334 // chew up resources for failed compactions for the duration of
2336 uint64_t error_cnt
=
2337 default_cf_internal_stats_
->BumpAndGetBackgroundErrorCount();
2338 bg_cv_
.SignalAll(); // In case a waiter can proceed despite the error
2340 log_buffer
.FlushBufferToLog();
2341 ROCKS_LOG_ERROR(immutable_db_options_
.info_log
,
2342 "Waiting after background compaction error: %s, "
2343 "Accumulated background error counts: %" PRIu64
,
2344 s
.ToString().c_str(), error_cnt
);
2345 LogFlush(immutable_db_options_
.info_log
);
2346 env_
->SleepForMicroseconds(1000000);
2348 } else if (s
.IsManualCompactionPaused()) {
2349 ManualCompactionState
* m
= prepicked_compaction
->manual_compaction_state
;
2351 ROCKS_LOG_BUFFER(&log_buffer
, "[%s] [JOB %d] Manual compaction paused",
2352 m
->cfd
->GetName().c_str(), job_context
.job_id
);
2355 ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem
);
2357 // If compaction failed, we want to delete all temporary files that we might
2358 // have created (they might not be all recorded in job_context in case of a
2359 // failure). Thus, we force full scan in FindObsoleteFiles()
2360 FindObsoleteFiles(&job_context
, !s
.ok() && !s
.IsShutdownInProgress() &&
2361 !s
.IsManualCompactionPaused() &&
2362 !s
.IsColumnFamilyDropped());
2363 TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles");
2365 // delete unnecessary files if any, this is done outside the mutex
2366 if (job_context
.HaveSomethingToClean() ||
2367 job_context
.HaveSomethingToDelete() || !log_buffer
.IsEmpty()) {
2369 // Have to flush the info logs before bg_compaction_scheduled_--
2370 // because if bg_flush_scheduled_ becomes 0 and the lock is
2371 // released, the deconstructor of DB can kick in and destroy all the
2372 // states of DB so info_log might not be available after that point.
2373 // It also applies to access other states that DB owns.
2374 log_buffer
.FlushBufferToLog();
2375 if (job_context
.HaveSomethingToDelete()) {
2376 PurgeObsoleteFiles(job_context
);
2377 TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles");
2379 job_context
.Clean();
2383 assert(num_running_compactions_
> 0);
2384 num_running_compactions_
--;
2385 if (bg_thread_pri
== Env::Priority::LOW
) {
2386 bg_compaction_scheduled_
--;
2388 assert(bg_thread_pri
== Env::Priority::BOTTOM
);
2389 bg_bottom_compaction_scheduled_
--;
2392 versions_
->GetColumnFamilySet()->FreeDeadColumnFamilies();
2394 // See if there's more work to be done
2395 MaybeScheduleFlushOrCompaction();
2396 if (made_progress
||
2397 (bg_compaction_scheduled_
== 0 &&
2398 bg_bottom_compaction_scheduled_
== 0) ||
2399 HasPendingManualCompaction() || unscheduled_compactions_
== 0) {
2401 // * made_progress -- need to wakeup DelayWrite
2402 // * bg_{bottom,}_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
2403 // * HasPendingManualCompaction -- need to wakeup RunManualCompaction
2404 // If none of this is true, there is no need to signal since nobody is
2408 // IMPORTANT: there should be no code after calling SignalAll. This call may
2409 // signal the DB destructor that it's OK to proceed with destruction. In
2410 // that case, all DB variables will be dealloacated and referencing them
2411 // will cause trouble.
2415 Status
DBImpl::BackgroundCompaction(bool* made_progress
,
2416 JobContext
* job_context
,
2417 LogBuffer
* log_buffer
,
2418 PrepickedCompaction
* prepicked_compaction
,
2419 Env::Priority thread_pri
) {
2420 ManualCompactionState
* manual_compaction
=
2421 prepicked_compaction
== nullptr
2423 : prepicked_compaction
->manual_compaction_state
;
2424 *made_progress
= false;
2425 mutex_
.AssertHeld();
2426 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Start");
2428 bool is_manual
= (manual_compaction
!= nullptr);
2429 std::unique_ptr
<Compaction
> c
;
2430 if (prepicked_compaction
!= nullptr &&
2431 prepicked_compaction
->compaction
!= nullptr) {
2432 c
.reset(prepicked_compaction
->compaction
);
2434 bool is_prepicked
= is_manual
|| c
;
2436 // (manual_compaction->in_progress == false);
2437 bool trivial_move_disallowed
=
2438 is_manual
&& manual_compaction
->disallow_trivial_move
;
2440 CompactionJobStats compaction_job_stats
;
2442 if (!error_handler_
.IsBGWorkStopped()) {
2443 if (shutting_down_
.load(std::memory_order_acquire
)) {
2444 status
= Status::ShutdownInProgress();
2445 } else if (is_manual
&&
2446 manual_compaction_paused_
.load(std::memory_order_acquire
)) {
2447 status
= Status::Incomplete(Status::SubCode::kManualCompactionPaused
);
2450 status
= error_handler_
.GetBGError();
2451 // If we get here, it means a hard error happened after this compaction
2452 // was scheduled by MaybeScheduleFlushOrCompaction(), but before it got
2453 // a chance to execute. Since we didn't pop a cfd from the compaction
2454 // queue, increment unscheduled_compactions_
2455 unscheduled_compactions_
++;
2460 manual_compaction
->status
= status
;
2461 manual_compaction
->done
= true;
2462 manual_compaction
->in_progress
= false;
2463 manual_compaction
= nullptr;
2466 c
->ReleaseCompactionFiles(status
);
2473 // another thread cannot pick up the same work
2474 manual_compaction
->in_progress
= true;
2477 std::unique_ptr
<TaskLimiterToken
> task_token
;
2479 // InternalKey manual_end_storage;
2480 // InternalKey* manual_end = &manual_end_storage;
2481 bool sfm_reserved_compact_space
= false;
2483 ManualCompactionState
* m
= manual_compaction
;
2484 assert(m
->in_progress
);
2487 m
->manual_end
= nullptr;
2488 ROCKS_LOG_BUFFER(log_buffer
,
2489 "[%s] Manual compaction from level-%d from %s .. "
2490 "%s; nothing to do\n",
2491 m
->cfd
->GetName().c_str(), m
->input_level
,
2492 (m
->begin
? m
->begin
->DebugString().c_str() : "(begin)"),
2493 (m
->end
? m
->end
->DebugString().c_str() : "(end)"));
2495 // First check if we have enough room to do the compaction
2496 bool enough_room
= EnoughRoomForCompaction(
2497 m
->cfd
, *(c
->inputs()), &sfm_reserved_compact_space
, log_buffer
);
2500 // Then don't do the compaction
2501 c
->ReleaseCompactionFiles(status
);
2503 // m's vars will get set properly at the end of this function,
2504 // as long as status == CompactionTooLarge
2505 status
= Status::CompactionTooLarge();
2509 "[%s] Manual compaction from level-%d to level-%d from %s .. "
2510 "%s; will stop at %s\n",
2511 m
->cfd
->GetName().c_str(), m
->input_level
, c
->output_level(),
2512 (m
->begin
? m
->begin
->DebugString().c_str() : "(begin)"),
2513 (m
->end
? m
->end
->DebugString().c_str() : "(end)"),
2514 ((m
->done
|| m
->manual_end
== nullptr)
2516 : m
->manual_end
->DebugString().c_str()));
2519 } else if (!is_prepicked
&& !compaction_queue_
.empty()) {
2520 if (HasExclusiveManualCompaction()) {
2521 // Can't compact right now, but try again later
2522 TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict");
2524 // Stay in the compaction queue.
2525 unscheduled_compactions_
++;
2527 return Status::OK();
2530 auto cfd
= PickCompactionFromQueue(&task_token
, log_buffer
);
2531 if (cfd
== nullptr) {
2532 // Can't find any executable task from the compaction queue.
2533 // All tasks have been throttled by compaction thread limiter.
2534 ++unscheduled_compactions_
;
2535 return Status::Busy();
2538 // We unreference here because the following code will take a Ref() on
2539 // this cfd if it is going to use it (Compaction class holds a
2541 // This will all happen under a mutex so we don't have to be afraid of
2542 // somebody else deleting it.
2543 if (cfd
->UnrefAndTryDelete()) {
2544 // This was the last reference of the column family, so no need to
2546 return Status::OK();
2549 // Pick up latest mutable CF Options and use it throughout the
2551 // Compaction makes a copy of the latest MutableCFOptions. It should be used
2552 // throughout the compaction procedure to make sure consistency. It will
2553 // eventually be installed into SuperVersion
2554 auto* mutable_cf_options
= cfd
->GetLatestMutableCFOptions();
2555 if (!mutable_cf_options
->disable_auto_compactions
&& !cfd
->IsDropped()) {
2556 // NOTE: try to avoid unnecessary copy of MutableCFOptions if
2557 // compaction is not necessary. Need to make sure mutex is held
2558 // until we make a copy in the following code
2559 TEST_SYNC_POINT("DBImpl::BackgroundCompaction():BeforePickCompaction");
2560 c
.reset(cfd
->PickCompaction(*mutable_cf_options
, log_buffer
));
2561 TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction");
2564 bool enough_room
= EnoughRoomForCompaction(
2565 cfd
, *(c
->inputs()), &sfm_reserved_compact_space
, log_buffer
);
2568 // Then don't do the compaction
2569 c
->ReleaseCompactionFiles(status
);
2570 c
->column_family_data()
2573 ->ComputeCompactionScore(*(c
->immutable_cf_options()),
2574 *(c
->mutable_cf_options()));
2575 AddToCompactionQueue(cfd
);
2576 ++unscheduled_compactions_
;
2579 // Don't need to sleep here, because BackgroundCallCompaction
2580 // will sleep if !s.ok()
2581 status
= Status::CompactionTooLarge();
2583 // update statistics
2584 RecordInHistogram(stats_
, NUM_FILES_IN_SINGLE_COMPACTION
,
2585 c
->inputs(0)->size());
2586 // There are three things that can change compaction score:
2587 // 1) When flush or compaction finish. This case is covered by
2588 // InstallSuperVersionAndScheduleWork
2589 // 2) When MutableCFOptions changes. This case is also covered by
2590 // InstallSuperVersionAndScheduleWork, because this is when the new
2591 // options take effect.
2592 // 3) When we Pick a new compaction, we "remove" those files being
2593 // compacted from the calculation, which then influences compaction
2594 // score. Here we check if we need the new compaction even without the
2595 // files that are currently being compacted. If we need another
2596 // compaction, we might be able to execute it in parallel, so we add
2597 // it to the queue and schedule a new thread.
2598 if (cfd
->NeedsCompaction()) {
2599 // Yes, we need more compactions!
2600 AddToCompactionQueue(cfd
);
2601 ++unscheduled_compactions_
;
2602 MaybeScheduleFlushOrCompaction();
2611 ROCKS_LOG_BUFFER(log_buffer
, "Compaction nothing to do");
2612 } else if (c
->deletion_compaction()) {
2613 // TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old
2614 // file if there is alive snapshot pointing to it
2615 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
2616 c
->column_family_data());
2617 assert(c
->num_input_files(1) == 0);
2618 assert(c
->level() == 0);
2619 assert(c
->column_family_data()->ioptions()->compaction_style
==
2620 kCompactionStyleFIFO
);
2622 compaction_job_stats
.num_input_files
= c
->num_input_files(0);
2624 NotifyOnCompactionBegin(c
->column_family_data(), c
.get(), status
,
2625 compaction_job_stats
, job_context
->job_id
);
2627 for (const auto& f
: *c
->inputs(0)) {
2628 c
->edit()->DeleteFile(c
->level(), f
->fd
.GetNumber());
2630 status
= versions_
->LogAndApply(c
->column_family_data(),
2631 *c
->mutable_cf_options(), c
->edit(),
2632 &mutex_
, directories_
.GetDbDir());
2633 InstallSuperVersionAndScheduleWork(c
->column_family_data(),
2634 &job_context
->superversion_contexts
[0],
2635 *c
->mutable_cf_options());
2636 ROCKS_LOG_BUFFER(log_buffer
, "[%s] Deleted %d files\n",
2637 c
->column_family_data()->GetName().c_str(),
2638 c
->num_input_files(0));
2639 *made_progress
= true;
2640 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
2641 c
->column_family_data());
2642 } else if (!trivial_move_disallowed
&& c
->IsTrivialMove()) {
2643 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:TrivialMove");
2644 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
2645 c
->column_family_data());
2646 // Instrument for event update
2647 // TODO(yhchiang): add op details for showing trivial-move.
2648 ThreadStatusUtil::SetColumnFamily(
2649 c
->column_family_data(), c
->column_family_data()->ioptions()->env
,
2650 immutable_db_options_
.enable_thread_tracking
);
2651 ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION
);
2653 compaction_job_stats
.num_input_files
= c
->num_input_files(0);
2655 NotifyOnCompactionBegin(c
->column_family_data(), c
.get(), status
,
2656 compaction_job_stats
, job_context
->job_id
);
2658 // Move files to next level
2659 int32_t moved_files
= 0;
2660 int64_t moved_bytes
= 0;
2661 for (unsigned int l
= 0; l
< c
->num_input_levels(); l
++) {
2662 if (c
->level(l
) == c
->output_level()) {
2665 for (size_t i
= 0; i
< c
->num_input_files(l
); i
++) {
2666 FileMetaData
* f
= c
->input(l
, i
);
2667 c
->edit()->DeleteFile(c
->level(l
), f
->fd
.GetNumber());
2668 c
->edit()->AddFile(c
->output_level(), f
->fd
.GetNumber(),
2669 f
->fd
.GetPathId(), f
->fd
.GetFileSize(), f
->smallest
,
2670 f
->largest
, f
->fd
.smallest_seqno
,
2671 f
->fd
.largest_seqno
, f
->marked_for_compaction
,
2672 f
->oldest_blob_file_number
, f
->oldest_ancester_time
,
2673 f
->file_creation_time
, f
->file_checksum
,
2674 f
->file_checksum_func_name
);
2678 "[%s] Moving #%" PRIu64
" to level-%d %" PRIu64
" bytes\n",
2679 c
->column_family_data()->GetName().c_str(), f
->fd
.GetNumber(),
2680 c
->output_level(), f
->fd
.GetFileSize());
2682 moved_bytes
+= f
->fd
.GetFileSize();
2686 status
= versions_
->LogAndApply(c
->column_family_data(),
2687 *c
->mutable_cf_options(), c
->edit(),
2688 &mutex_
, directories_
.GetDbDir());
2689 // Use latest MutableCFOptions
2690 InstallSuperVersionAndScheduleWork(c
->column_family_data(),
2691 &job_context
->superversion_contexts
[0],
2692 *c
->mutable_cf_options());
2694 VersionStorageInfo::LevelSummaryStorage tmp
;
2695 c
->column_family_data()->internal_stats()->IncBytesMoved(c
->output_level(),
2698 event_logger_
.LogToBuffer(log_buffer
)
2699 << "job" << job_context
->job_id
<< "event"
2701 << "destination_level" << c
->output_level() << "files" << moved_files
2702 << "total_files_size" << moved_bytes
;
2706 "[%s] Moved #%d files to level-%d %" PRIu64
" bytes %s: %s\n",
2707 c
->column_family_data()->GetName().c_str(), moved_files
,
2708 c
->output_level(), moved_bytes
, status
.ToString().c_str(),
2709 c
->column_family_data()->current()->storage_info()->LevelSummary(&tmp
));
2710 *made_progress
= true;
2713 ThreadStatusUtil::ResetThreadStatus();
2714 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
2715 c
->column_family_data());
2716 } else if (!is_prepicked
&& c
->output_level() > 0 &&
2717 c
->output_level() ==
2718 c
->column_family_data()
2722 immutable_db_options_
.allow_ingest_behind
) &&
2723 env_
->GetBackgroundThreads(Env::Priority::BOTTOM
) > 0) {
2724 // Forward compactions involving last level to the bottom pool if it exists,
2725 // such that compactions unlikely to contribute to write stalls can be
2726 // delayed or deprioritized.
2727 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:ForwardToBottomPriPool");
2728 CompactionArg
* ca
= new CompactionArg
;
2730 ca
->prepicked_compaction
= new PrepickedCompaction
;
2731 ca
->prepicked_compaction
->compaction
= c
.release();
2732 ca
->prepicked_compaction
->manual_compaction_state
= nullptr;
2733 // Transfer requested token, so it doesn't need to do it again.
2734 ca
->prepicked_compaction
->task_token
= std::move(task_token
);
2735 ++bg_bottom_compaction_scheduled_
;
2736 env_
->Schedule(&DBImpl::BGWorkBottomCompaction
, ca
, Env::Priority::BOTTOM
,
2737 this, &DBImpl::UnscheduleCompactionCallback
);
2739 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
2740 c
->column_family_data());
2741 int output_level
__attribute__((__unused__
));
2742 output_level
= c
->output_level();
2743 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial",
2745 std::vector
<SequenceNumber
> snapshot_seqs
;
2746 SequenceNumber earliest_write_conflict_snapshot
;
2747 SnapshotChecker
* snapshot_checker
;
2748 GetSnapshotContext(job_context
, &snapshot_seqs
,
2749 &earliest_write_conflict_snapshot
, &snapshot_checker
);
2750 assert(is_snapshot_supported_
|| snapshots_
.empty());
2751 CompactionJob
compaction_job(
2752 job_context
->job_id
, c
.get(), immutable_db_options_
,
2753 file_options_for_compaction_
, versions_
.get(), &shutting_down_
,
2754 preserve_deletes_seqnum_
.load(), log_buffer
, directories_
.GetDbDir(),
2755 GetDataDir(c
->column_family_data(), c
->output_path_id()), stats_
,
2756 &mutex_
, &error_handler_
, snapshot_seqs
,
2757 earliest_write_conflict_snapshot
, snapshot_checker
, table_cache_
,
2758 &event_logger_
, c
->mutable_cf_options()->paranoid_file_checks
,
2759 c
->mutable_cf_options()->report_bg_io_stats
, dbname_
,
2760 &compaction_job_stats
, thread_pri
,
2761 is_manual
? &manual_compaction_paused_
: nullptr);
2762 compaction_job
.Prepare();
2764 NotifyOnCompactionBegin(c
->column_family_data(), c
.get(), status
,
2765 compaction_job_stats
, job_context
->job_id
);
2768 TEST_SYNC_POINT_CALLBACK(
2769 "DBImpl::BackgroundCompaction:NonTrivial:BeforeRun", nullptr);
2770 compaction_job
.Run();
2771 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun");
2774 status
= compaction_job
.Install(*c
->mutable_cf_options());
2776 InstallSuperVersionAndScheduleWork(c
->column_family_data(),
2777 &job_context
->superversion_contexts
[0],
2778 *c
->mutable_cf_options());
2780 *made_progress
= true;
2781 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
2782 c
->column_family_data());
2785 c
->ReleaseCompactionFiles(status
);
2786 *made_progress
= true;
2788 #ifndef ROCKSDB_LITE
2789 // Need to make sure SstFileManager does its bookkeeping
2790 auto sfm
= static_cast<SstFileManagerImpl
*>(
2791 immutable_db_options_
.sst_file_manager
.get());
2792 if (sfm
&& sfm_reserved_compact_space
) {
2793 sfm
->OnCompactionCompletion(c
.get());
2795 #endif // ROCKSDB_LITE
2797 NotifyOnCompactionCompleted(c
->column_family_data(), c
.get(), status
,
2798 compaction_job_stats
, job_context
->job_id
);
2801 if (status
.ok() || status
.IsCompactionTooLarge() ||
2802 status
.IsManualCompactionPaused()) {
2804 } else if (status
.IsColumnFamilyDropped() || status
.IsShutdownInProgress()) {
2805 // Ignore compaction errors found during shutting down
2807 ROCKS_LOG_WARN(immutable_db_options_
.info_log
, "Compaction error: %s",
2808 status
.ToString().c_str());
2809 error_handler_
.SetBGError(status
, BackgroundErrorReason::kCompaction
);
2810 if (c
!= nullptr && !is_manual
&& !error_handler_
.IsBGWorkStopped()) {
2811 // Put this cfd back in the compaction queue so we can retry after some
2813 auto cfd
= c
->column_family_data();
2814 assert(cfd
!= nullptr);
2815 // Since this compaction failed, we need to recompute the score so it
2816 // takes the original input files into account
2817 c
->column_family_data()
2820 ->ComputeCompactionScore(*(c
->immutable_cf_options()),
2821 *(c
->mutable_cf_options()));
2822 if (!cfd
->queued_for_compaction()) {
2823 AddToCompactionQueue(cfd
);
2824 ++unscheduled_compactions_
;
2828 // this will unref its input_version and column_family_data
2832 ManualCompactionState
* m
= manual_compaction
;
2837 // For universal compaction:
2838 // Because universal compaction always happens at level 0, so one
2839 // compaction will pick up all overlapped files. No files will be
2840 // filtered out due to size limit and left for a successive compaction.
2841 // So we can safely conclude the current compaction.
2843 // Also note that, if we don't stop here, then the current compaction
2844 // writes a new file back to level 0, which will be used in successive
2845 // compaction. Hence the manual compaction will never finish.
2847 // Stop the compaction if manual_end points to nullptr -- this means
2848 // that we compacted the whole range. manual_end should always point
2849 // to nullptr in case of universal compaction
2850 if (m
->manual_end
== nullptr) {
2854 // We only compacted part of the requested range. Update *m
2855 // to the range that is left to be compacted.
2856 // Universal and FIFO compactions should always compact the whole range
2857 assert(m
->cfd
->ioptions()->compaction_style
!=
2858 kCompactionStyleUniversal
||
2859 m
->cfd
->ioptions()->num_levels
> 1);
2860 assert(m
->cfd
->ioptions()->compaction_style
!= kCompactionStyleFIFO
);
2861 m
->tmp_storage
= *m
->manual_end
;
2862 m
->begin
= &m
->tmp_storage
;
2863 m
->incomplete
= true;
2865 m
->in_progress
= false; // not being processed anymore
2867 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Finish");
2871 bool DBImpl::HasPendingManualCompaction() {
2872 return (!manual_compaction_dequeue_
.empty());
2875 void DBImpl::AddManualCompaction(DBImpl::ManualCompactionState
* m
) {
2876 manual_compaction_dequeue_
.push_back(m
);
2879 void DBImpl::RemoveManualCompaction(DBImpl::ManualCompactionState
* m
) {
2880 // Remove from queue
2881 std::deque
<ManualCompactionState
*>::iterator it
=
2882 manual_compaction_dequeue_
.begin();
2883 while (it
!= manual_compaction_dequeue_
.end()) {
2885 it
= manual_compaction_dequeue_
.erase(it
);
2894 bool DBImpl::ShouldntRunManualCompaction(ManualCompactionState
* m
) {
2895 if (num_running_ingest_file_
> 0) {
2896 // We need to wait for other IngestExternalFile() calls to finish
2897 // before running a manual compaction.
2901 return (bg_bottom_compaction_scheduled_
> 0 ||
2902 bg_compaction_scheduled_
> 0);
2904 std::deque
<ManualCompactionState
*>::iterator it
=
2905 manual_compaction_dequeue_
.begin();
2907 while (it
!= manual_compaction_dequeue_
.end()) {
2912 } else if (MCOverlap(m
, (*it
)) && (!seen
&& !(*it
)->in_progress
)) {
2913 // Consider the other manual compaction *it, conflicts if:
2915 // and (*it) is ahead in the queue and is not yet in progress
2923 bool DBImpl::HaveManualCompaction(ColumnFamilyData
* cfd
) {
2924 // Remove from priority queue
2925 std::deque
<ManualCompactionState
*>::iterator it
=
2926 manual_compaction_dequeue_
.begin();
2927 while (it
!= manual_compaction_dequeue_
.end()) {
2928 if ((*it
)->exclusive
) {
2931 if ((cfd
== (*it
)->cfd
) && (!((*it
)->in_progress
|| (*it
)->done
))) {
2932 // Allow automatic compaction if manual compaction is
2941 bool DBImpl::HasExclusiveManualCompaction() {
2942 // Remove from priority queue
2943 std::deque
<ManualCompactionState
*>::iterator it
=
2944 manual_compaction_dequeue_
.begin();
2945 while (it
!= manual_compaction_dequeue_
.end()) {
2946 if ((*it
)->exclusive
) {
2954 bool DBImpl::MCOverlap(ManualCompactionState
* m
, ManualCompactionState
* m1
) {
2955 if ((m
->exclusive
) || (m1
->exclusive
)) {
2958 if (m
->cfd
!= m1
->cfd
) {
2964 #ifndef ROCKSDB_LITE
2965 void DBImpl::BuildCompactionJobInfo(
2966 const ColumnFamilyData
* cfd
, Compaction
* c
, const Status
& st
,
2967 const CompactionJobStats
& compaction_job_stats
, const int job_id
,
2968 const Version
* current
, CompactionJobInfo
* compaction_job_info
) const {
2969 assert(compaction_job_info
!= nullptr);
2970 compaction_job_info
->cf_id
= cfd
->GetID();
2971 compaction_job_info
->cf_name
= cfd
->GetName();
2972 compaction_job_info
->status
= st
;
2973 compaction_job_info
->thread_id
= env_
->GetThreadID();
2974 compaction_job_info
->job_id
= job_id
;
2975 compaction_job_info
->base_input_level
= c
->start_level();
2976 compaction_job_info
->output_level
= c
->output_level();
2977 compaction_job_info
->stats
= compaction_job_stats
;
2978 compaction_job_info
->table_properties
= c
->GetOutputTableProperties();
2979 compaction_job_info
->compaction_reason
= c
->compaction_reason();
2980 compaction_job_info
->compression
= c
->output_compression();
2981 for (size_t i
= 0; i
< c
->num_input_levels(); ++i
) {
2982 for (const auto fmd
: *c
->inputs(i
)) {
2983 const FileDescriptor
& desc
= fmd
->fd
;
2984 const uint64_t file_number
= desc
.GetNumber();
2985 auto fn
= TableFileName(c
->immutable_cf_options()->cf_paths
, file_number
,
2987 compaction_job_info
->input_files
.push_back(fn
);
2988 compaction_job_info
->input_file_infos
.push_back(CompactionFileInfo
{
2989 static_cast<int>(i
), file_number
, fmd
->oldest_blob_file_number
});
2990 if (compaction_job_info
->table_properties
.count(fn
) == 0) {
2991 std::shared_ptr
<const TableProperties
> tp
;
2992 auto s
= current
->GetTableProperties(&tp
, fmd
, &fn
);
2994 compaction_job_info
->table_properties
[fn
] = tp
;
2999 for (const auto& newf
: c
->edit()->GetNewFiles()) {
3000 const FileMetaData
& meta
= newf
.second
;
3001 const FileDescriptor
& desc
= meta
.fd
;
3002 const uint64_t file_number
= desc
.GetNumber();
3003 compaction_job_info
->output_files
.push_back(TableFileName(
3004 c
->immutable_cf_options()->cf_paths
, file_number
, desc
.GetPathId()));
3005 compaction_job_info
->output_file_infos
.push_back(CompactionFileInfo
{
3006 newf
.first
, file_number
, meta
.oldest_blob_file_number
});
3011 // SuperVersionContext gets created and destructed outside of the lock --
3012 // we use this conveniently to:
3013 // * malloc one SuperVersion() outside of the lock -- new_superversion
3014 // * delete SuperVersion()s outside of the lock -- superversions_to_free
3016 // However, if InstallSuperVersionAndScheduleWork() gets called twice with the
3017 // same sv_context, we can't reuse the SuperVersion() that got
3019 // first call already used it. In that rare case, we take a hit and create a
3020 // new SuperVersion() inside of the mutex. We do similar thing
3021 // for superversion_to_free
3023 void DBImpl::InstallSuperVersionAndScheduleWork(
3024 ColumnFamilyData
* cfd
, SuperVersionContext
* sv_context
,
3025 const MutableCFOptions
& mutable_cf_options
) {
3026 mutex_
.AssertHeld();
3028 // Update max_total_in_memory_state_
3029 size_t old_memtable_size
= 0;
3030 auto* old_sv
= cfd
->GetSuperVersion();
3032 old_memtable_size
= old_sv
->mutable_cf_options
.write_buffer_size
*
3033 old_sv
->mutable_cf_options
.max_write_buffer_number
;
3036 // this branch is unlikely to step in
3037 if (UNLIKELY(sv_context
->new_superversion
== nullptr)) {
3038 sv_context
->NewSuperVersion();
3040 cfd
->InstallSuperVersion(sv_context
, &mutex_
, mutable_cf_options
);
3042 // There may be a small data race here. The snapshot tricking bottommost
3043 // compaction may already be released here. But assuming there will always be
3044 // newer snapshot created and released frequently, the compaction will be
3045 // triggered soon anyway.
3046 bottommost_files_mark_threshold_
= kMaxSequenceNumber
;
3047 for (auto* my_cfd
: *versions_
->GetColumnFamilySet()) {
3048 bottommost_files_mark_threshold_
= std::min(
3049 bottommost_files_mark_threshold_
,
3050 my_cfd
->current()->storage_info()->bottommost_files_mark_threshold());
3053 // Whenever we install new SuperVersion, we might need to issue new flushes or
3055 SchedulePendingCompaction(cfd
);
3056 MaybeScheduleFlushOrCompaction();
3058 // Update max_total_in_memory_state_
3059 max_total_in_memory_state_
= max_total_in_memory_state_
- old_memtable_size
+
3060 mutable_cf_options
.write_buffer_size
*
3061 mutable_cf_options
.max_write_buffer_number
;
3064 // ShouldPurge is called by FindObsoleteFiles when doing a full scan,
3065 // and db mutex (mutex_) should already be held.
3066 // Actually, the current implementation of FindObsoleteFiles with
3067 // full_scan=true can issue I/O requests to obtain list of files in
3068 // directories, e.g. env_->getChildren while holding db mutex.
3069 bool DBImpl::ShouldPurge(uint64_t file_number
) const {
3070 return files_grabbed_for_purge_
.find(file_number
) ==
3071 files_grabbed_for_purge_
.end() &&
3072 purge_files_
.find(file_number
) == purge_files_
.end();
3075 // MarkAsGrabbedForPurge is called by FindObsoleteFiles, and db mutex
3076 // (mutex_) should already be held.
3077 void DBImpl::MarkAsGrabbedForPurge(uint64_t file_number
) {
3078 files_grabbed_for_purge_
.insert(file_number
);
3081 void DBImpl::SetSnapshotChecker(SnapshotChecker
* snapshot_checker
) {
3082 InstrumentedMutexLock
l(&mutex_
);
3083 // snapshot_checker_ should only set once. If we need to set it multiple
3084 // times, we need to make sure the old one is not deleted while it is still
3085 // using by a compaction job.
3086 assert(!snapshot_checker_
);
3087 snapshot_checker_
.reset(snapshot_checker
);
3090 void DBImpl::GetSnapshotContext(
3091 JobContext
* job_context
, std::vector
<SequenceNumber
>* snapshot_seqs
,
3092 SequenceNumber
* earliest_write_conflict_snapshot
,
3093 SnapshotChecker
** snapshot_checker_ptr
) {
3094 mutex_
.AssertHeld();
3095 assert(job_context
!= nullptr);
3096 assert(snapshot_seqs
!= nullptr);
3097 assert(earliest_write_conflict_snapshot
!= nullptr);
3098 assert(snapshot_checker_ptr
!= nullptr);
3100 *snapshot_checker_ptr
= snapshot_checker_
.get();
3101 if (use_custom_gc_
&& *snapshot_checker_ptr
== nullptr) {
3102 *snapshot_checker_ptr
= DisableGCSnapshotChecker::Instance();
3104 if (*snapshot_checker_ptr
!= nullptr) {
3105 // If snapshot_checker is used, that means the flush/compaction may
3106 // contain values not visible to snapshot taken after
3107 // flush/compaction job starts. Take a snapshot and it will appear
3108 // in snapshot_seqs and force compaction iterator to consider such
3110 const Snapshot
* job_snapshot
=
3111 GetSnapshotImpl(false /*write_conflict_boundary*/, false /*lock*/);
3112 job_context
->job_snapshot
.reset(new ManagedSnapshot(this, job_snapshot
));
3114 *snapshot_seqs
= snapshots_
.GetAll(earliest_write_conflict_snapshot
);
3116 } // namespace ROCKSDB_NAMESPACE