]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
f67539c2 | 9 | #include <cinttypes> |
1e59de90 | 10 | #include <deque> |
7c673cae FG |
11 | |
12 | #include "db/builder.h" | |
20effc67 | 13 | #include "db/db_impl/db_impl.h" |
11fdf7f2 TL |
14 | #include "db/error_handler.h" |
15 | #include "db/event_helpers.h" | |
f67539c2 | 16 | #include "file/sst_file_manager_impl.h" |
1e59de90 | 17 | #include "logging/logging.h" |
7c673cae FG |
18 | #include "monitoring/iostats_context_imp.h" |
19 | #include "monitoring/perf_context_imp.h" | |
20 | #include "monitoring/thread_status_updater.h" | |
21 | #include "monitoring/thread_status_util.h" | |
f67539c2 TL |
22 | #include "test_util/sync_point.h" |
23 | #include "util/cast_util.h" | |
494da23a | 24 | #include "util/concurrent_task_limiter_impl.h" |
7c673cae | 25 | |
f67539c2 | 26 | namespace ROCKSDB_NAMESPACE { |
11fdf7f2 TL |
27 | |
28 | bool DBImpl::EnoughRoomForCompaction( | |
29 | ColumnFamilyData* cfd, const std::vector<CompactionInputFiles>& inputs, | |
30 | bool* sfm_reserved_compact_space, LogBuffer* log_buffer) { | |
31 | // Check if we have enough room to do the compaction | |
32 | bool enough_room = true; | |
33 | #ifndef ROCKSDB_LITE | |
34 | auto sfm = static_cast<SstFileManagerImpl*>( | |
35 | immutable_db_options_.sst_file_manager.get()); | |
36 | if (sfm) { | |
37 | // Pass the current bg_error_ to SFM so it can decide what checks to | |
38 | // perform. If this DB instance hasn't seen any error yet, the SFM can be | |
39 | // optimistic and not do disk space checks | |
1e59de90 TL |
40 | Status bg_error = error_handler_.GetBGError(); |
41 | enough_room = sfm->EnoughRoomForCompaction(cfd, inputs, bg_error); | |
42 | bg_error.PermitUncheckedError(); // bg_error is just a copy of the Status | |
43 | // from the error_handler_ | |
11fdf7f2 TL |
44 | if (enough_room) { |
45 | *sfm_reserved_compact_space = true; | |
46 | } | |
47 | } | |
48 | #else | |
49 | (void)cfd; | |
50 | (void)inputs; | |
51 | (void)sfm_reserved_compact_space; | |
52 | #endif // ROCKSDB_LITE | |
53 | if (!enough_room) { | |
54 | // Just in case tests want to change the value of enough_room | |
55 | TEST_SYNC_POINT_CALLBACK( | |
56 | "DBImpl::BackgroundCompaction():CancelledCompaction", &enough_room); | |
57 | ROCKS_LOG_BUFFER(log_buffer, | |
58 | "Cancelled compaction because not enough room"); | |
59 | RecordTick(stats_, COMPACTION_CANCELLED, 1); | |
60 | } | |
61 | return enough_room; | |
62 | } | |
63 | ||
494da23a TL |
64 | bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force, |
65 | std::unique_ptr<TaskLimiterToken>* token, | |
66 | LogBuffer* log_buffer) { | |
67 | assert(*token == nullptr); | |
68 | auto limiter = static_cast<ConcurrentTaskLimiterImpl*>( | |
69 | cfd->ioptions()->compaction_thread_limiter.get()); | |
70 | if (limiter == nullptr) { | |
71 | return true; | |
72 | } | |
73 | *token = limiter->GetToken(force); | |
74 | if (*token != nullptr) { | |
75 | ROCKS_LOG_BUFFER(log_buffer, | |
76 | "Thread limiter [%s] increase [%s] compaction task, " | |
77 | "force: %s, tasks after: %d", | |
78 | limiter->GetName().c_str(), cfd->GetName().c_str(), | |
79 | force ? "true" : "false", limiter->GetOutstandingTask()); | |
80 | return true; | |
81 | } | |
82 | return false; | |
83 | } | |
84 | ||
1e59de90 TL |
85 | IOStatus DBImpl::SyncClosedLogs(JobContext* job_context, |
86 | VersionEdit* synced_wals) { | |
7c673cae | 87 | TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start"); |
1e59de90 | 88 | InstrumentedMutexLock l(&log_write_mutex_); |
7c673cae FG |
89 | autovector<log::Writer*, 1> logs_to_sync; |
90 | uint64_t current_log_number = logfile_number_; | |
91 | while (logs_.front().number < current_log_number && | |
1e59de90 | 92 | logs_.front().IsSyncing()) { |
7c673cae FG |
93 | log_sync_cv_.Wait(); |
94 | } | |
95 | for (auto it = logs_.begin(); | |
96 | it != logs_.end() && it->number < current_log_number; ++it) { | |
97 | auto& log = *it; | |
1e59de90 | 98 | log.PrepareForSync(); |
7c673cae FG |
99 | logs_to_sync.push_back(log.writer); |
100 | } | |
101 | ||
20effc67 | 102 | IOStatus io_s; |
7c673cae | 103 | if (!logs_to_sync.empty()) { |
1e59de90 TL |
104 | log_write_mutex_.Unlock(); |
105 | ||
106 | assert(job_context); | |
7c673cae FG |
107 | |
108 | for (log::Writer* log : logs_to_sync) { | |
109 | ROCKS_LOG_INFO(immutable_db_options_.info_log, | |
110 | "[JOB %d] Syncing log #%" PRIu64, job_context->job_id, | |
111 | log->get_log_number()); | |
1e59de90 TL |
112 | if (error_handler_.IsRecoveryInProgress()) { |
113 | log->file()->reset_seen_error(); | |
114 | } | |
20effc67 TL |
115 | io_s = log->file()->Sync(immutable_db_options_.use_fsync); |
116 | if (!io_s.ok()) { | |
11fdf7f2 TL |
117 | break; |
118 | } | |
f67539c2 TL |
119 | |
120 | if (immutable_db_options_.recycle_log_file_num > 0) { | |
1e59de90 TL |
121 | if (error_handler_.IsRecoveryInProgress()) { |
122 | log->file()->reset_seen_error(); | |
123 | } | |
20effc67 TL |
124 | io_s = log->Close(); |
125 | if (!io_s.ok()) { | |
f67539c2 TL |
126 | break; |
127 | } | |
128 | } | |
7c673cae | 129 | } |
20effc67 | 130 | if (io_s.ok()) { |
1e59de90 TL |
131 | io_s = directories_.GetWalDir()->FsyncWithDirOptions( |
132 | IOOptions(), nullptr, | |
133 | DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced)); | |
7c673cae FG |
134 | } |
135 | ||
1e59de90 TL |
136 | TEST_SYNC_POINT_CALLBACK("DBImpl::SyncClosedLogs:BeforeReLock", |
137 | /*arg=*/nullptr); | |
138 | log_write_mutex_.Lock(); | |
7c673cae FG |
139 | |
140 | // "number <= current_log_number - 1" is equivalent to | |
141 | // "number < current_log_number". | |
20effc67 | 142 | if (io_s.ok()) { |
1e59de90 | 143 | MarkLogsSynced(current_log_number - 1, true, synced_wals); |
20effc67 TL |
144 | } else { |
145 | MarkLogsNotSynced(current_log_number - 1); | |
146 | } | |
147 | if (!io_s.ok()) { | |
7c673cae | 148 | TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed"); |
20effc67 | 149 | return io_s; |
7c673cae FG |
150 | } |
151 | } | |
1e59de90 | 152 | TEST_SYNC_POINT("DBImpl::SyncClosedLogs:end"); |
20effc67 | 153 | return io_s; |
7c673cae FG |
154 | } |
155 | ||
156 | Status DBImpl::FlushMemTableToOutputFile( | |
157 | ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, | |
11fdf7f2 | 158 | bool* made_progress, JobContext* job_context, |
494da23a TL |
159 | SuperVersionContext* superversion_context, |
160 | std::vector<SequenceNumber>& snapshot_seqs, | |
161 | SequenceNumber earliest_write_conflict_snapshot, | |
162 | SnapshotChecker* snapshot_checker, LogBuffer* log_buffer, | |
163 | Env::Priority thread_pri) { | |
7c673cae | 164 | mutex_.AssertHeld(); |
20effc67 | 165 | assert(cfd); |
1e59de90 | 166 | assert(cfd->imm()); |
7c673cae FG |
167 | assert(cfd->imm()->NumNotFlushed() != 0); |
168 | assert(cfd->imm()->IsFlushPending()); | |
1e59de90 TL |
169 | assert(versions_); |
170 | assert(versions_->GetColumnFamilySet()); | |
171 | // If there are more than one column families, we need to make sure that | |
172 | // all the log files except the most recent one are synced. Otherwise if | |
173 | // the host crashes after flushing and before WAL is persistent, the | |
174 | // flushed SST may contain data from write batches whose updates to | |
175 | // other (unflushed) column families are missing. | |
176 | const bool needs_to_sync_closed_wals = | |
177 | logfile_number_ > 0 && | |
178 | versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1; | |
179 | ||
180 | // If needs_to_sync_closed_wals is true, we need to record the current | |
181 | // maximum memtable ID of this column family so that a later PickMemtables() | |
182 | // call will not pick memtables whose IDs are higher. This is due to the fact | |
183 | // that SyncClosedLogs() may release the db mutex, and memtable switch can | |
184 | // happen for this column family in the meantime. The newly created memtables | |
185 | // have their data backed by unsynced WALs, thus they cannot be included in | |
186 | // this flush job. | |
187 | // Another reason why we must record the current maximum memtable ID of this | |
188 | // column family: SyncClosedLogs() may release db mutex, thus it's possible | |
189 | // for application to continue to insert into memtables increasing db's | |
190 | // sequence number. The application may take a snapshot, but this snapshot is | |
191 | // not included in `snapshot_seqs` which will be passed to flush job because | |
192 | // `snapshot_seqs` has already been computed before this function starts. | |
193 | // Recording the max memtable ID ensures that the flush job does not flush | |
194 | // a memtable without knowing such snapshot(s). | |
195 | uint64_t max_memtable_id = needs_to_sync_closed_wals | |
196 | ? cfd->imm()->GetLatestMemTableID() | |
197 | : std::numeric_limits<uint64_t>::max(); | |
198 | ||
199 | // If needs_to_sync_closed_wals is false, then the flush job will pick ALL | |
200 | // existing memtables of the column family when PickMemTable() is called | |
201 | // later. Although we won't call SyncClosedLogs() in this case, we may still | |
202 | // call the callbacks of the listeners, i.e. NotifyOnFlushBegin() which also | |
203 | // releases and re-acquires the db mutex. In the meantime, the application | |
204 | // can still insert into the memtables and increase the db's sequence number. | |
205 | // The application can take a snapshot, hoping that the latest visible state | |
206 | // to this snapshto is preserved. This is hard to guarantee since db mutex | |
207 | // not held. This newly-created snapshot is not included in `snapshot_seqs` | |
208 | // and the flush job is unaware of its presence. Consequently, the flush job | |
209 | // may drop certain keys when generating the L0, causing incorrect data to be | |
210 | // returned for snapshot read using this snapshot. | |
211 | // To address this, we make sure NotifyOnFlushBegin() executes after memtable | |
212 | // picking so that no new snapshot can be taken between the two functions. | |
7c673cae | 213 | |
7c673cae | 214 | FlushJob flush_job( |
1e59de90 TL |
215 | dbname_, cfd, immutable_db_options_, mutable_cf_options, max_memtable_id, |
216 | file_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_, | |
217 | snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, | |
218 | job_context, log_buffer, directories_.GetDbDir(), GetDataDir(cfd, 0U), | |
7c673cae | 219 | GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, |
494da23a | 220 | &event_logger_, mutable_cf_options.report_bg_io_stats, |
20effc67 | 221 | true /* sync_output_directory */, true /* write_manifest */, thread_pri, |
1e59de90 TL |
222 | io_tracer_, seqno_time_mapping_, db_id_, db_session_id_, |
223 | cfd->GetFullHistoryTsLow(), &blob_callback_); | |
7c673cae FG |
224 | FileMetaData file_meta; |
225 | ||
1e59de90 TL |
226 | Status s; |
227 | bool need_cancel = false; | |
228 | IOStatus log_io_s = IOStatus::OK(); | |
229 | if (needs_to_sync_closed_wals) { | |
230 | // SyncClosedLogs() may unlock and re-lock the log_write_mutex multiple | |
231 | // times. | |
232 | VersionEdit synced_wals; | |
233 | mutex_.Unlock(); | |
234 | log_io_s = SyncClosedLogs(job_context, &synced_wals); | |
235 | mutex_.Lock(); | |
236 | if (log_io_s.ok() && synced_wals.IsWalAddition()) { | |
237 | log_io_s = status_to_io_status(ApplyWALToManifest(&synced_wals)); | |
238 | TEST_SYNC_POINT_CALLBACK("DBImpl::FlushMemTableToOutputFile:CommitWal:1", | |
239 | nullptr); | |
240 | } | |
241 | ||
242 | if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() && | |
243 | !log_io_s.IsColumnFamilyDropped()) { | |
244 | error_handler_.SetBGError(log_io_s, BackgroundErrorReason::kFlush); | |
245 | } | |
246 | } else { | |
247 | TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Skip"); | |
248 | } | |
249 | s = log_io_s; | |
250 | ||
251 | // If the log sync failed, we do not need to pick memtable. Otherwise, | |
252 | // num_flush_not_started_ needs to be rollback. | |
494da23a | 253 | TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables"); |
1e59de90 TL |
254 | if (s.ok()) { |
255 | flush_job.PickMemTable(); | |
256 | need_cancel = true; | |
257 | } | |
258 | TEST_SYNC_POINT_CALLBACK( | |
259 | "DBImpl::FlushMemTableToOutputFile:AfterPickMemtables", &flush_job); | |
7c673cae FG |
260 | |
261 | #ifndef ROCKSDB_LITE | |
262 | // may temporarily unlock and lock the mutex. | |
f67539c2 | 263 | NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id); |
7c673cae FG |
264 | #endif // ROCKSDB_LITE |
265 | ||
1e59de90 | 266 | bool switched_to_mempurge = false; |
7c673cae FG |
267 | // Within flush_job.Run, rocksdb may call event listener to notify |
268 | // file creation and deletion. | |
269 | // | |
270 | // Note that flush_job.Run will unlock and lock the db_mutex, | |
271 | // and EventListener callback will be called when the db_mutex | |
272 | // is unlocked by the current thread. | |
273 | if (s.ok()) { | |
1e59de90 TL |
274 | s = flush_job.Run(&logs_with_prep_tracker_, &file_meta, |
275 | &switched_to_mempurge); | |
276 | need_cancel = false; | |
7c673cae | 277 | } |
1e59de90 TL |
278 | |
279 | if (!s.ok() && need_cancel) { | |
280 | flush_job.Cancel(); | |
20effc67 | 281 | } |
7c673cae FG |
282 | |
283 | if (s.ok()) { | |
11fdf7f2 TL |
284 | InstallSuperVersionAndScheduleWork(cfd, superversion_context, |
285 | mutable_cf_options); | |
7c673cae | 286 | if (made_progress) { |
494da23a | 287 | *made_progress = true; |
7c673cae | 288 | } |
20effc67 TL |
289 | |
290 | const std::string& column_family_name = cfd->GetName(); | |
291 | ||
292 | Version* const current = cfd->current(); | |
293 | assert(current); | |
294 | ||
295 | const VersionStorageInfo* const storage_info = current->storage_info(); | |
296 | assert(storage_info); | |
297 | ||
7c673cae FG |
298 | VersionStorageInfo::LevelSummaryStorage tmp; |
299 | ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n", | |
20effc67 TL |
300 | column_family_name.c_str(), |
301 | storage_info->LevelSummary(&tmp)); | |
302 | ||
303 | const auto& blob_files = storage_info->GetBlobFiles(); | |
304 | if (!blob_files.empty()) { | |
1e59de90 TL |
305 | assert(blob_files.front()); |
306 | assert(blob_files.back()); | |
307 | ||
308 | ROCKS_LOG_BUFFER( | |
309 | log_buffer, | |
310 | "[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64 "\n", | |
311 | column_family_name.c_str(), blob_files.front()->GetBlobFileNumber(), | |
312 | blob_files.back()->GetBlobFileNumber()); | |
20effc67 | 313 | } |
7c673cae FG |
314 | } |
315 | ||
f67539c2 | 316 | if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) { |
1e59de90 | 317 | if (log_io_s.ok()) { |
20effc67 TL |
318 | // Error while writing to MANIFEST. |
319 | // In fact, versions_->io_status() can also be the result of renaming | |
320 | // CURRENT file. With current code, it's just difficult to tell. So just | |
321 | // be pessimistic and try write to a new MANIFEST. | |
322 | // TODO: distinguish between MANIFEST write and CURRENT renaming | |
323 | if (!versions_->io_status().ok()) { | |
1e59de90 TL |
324 | // If WAL sync is successful (either WAL size is 0 or there is no IO |
325 | // error), all the Manifest write will be map to soft error. | |
326 | // TODO: kManifestWriteNoWAL and kFlushNoWAL are misleading. Refactor is | |
327 | // needed. | |
328 | error_handler_.SetBGError(s, | |
329 | BackgroundErrorReason::kManifestWriteNoWAL); | |
20effc67 | 330 | } else { |
1e59de90 TL |
331 | // If WAL sync is successful (either WAL size is 0 or there is no IO |
332 | // error), all the other SST file write errors will be set as | |
333 | // kFlushNoWAL. | |
334 | error_handler_.SetBGError(s, BackgroundErrorReason::kFlushNoWAL); | |
20effc67 TL |
335 | } |
336 | } else { | |
1e59de90 | 337 | assert(s == log_io_s); |
20effc67 | 338 | Status new_bg_error = s; |
1e59de90 | 339 | error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); |
20effc67 | 340 | } |
7c673cae | 341 | } |
1e59de90 TL |
342 | // If flush ran smoothly and no mempurge happened |
343 | // install new SST file path. | |
344 | if (s.ok() && (!switched_to_mempurge)) { | |
7c673cae FG |
345 | #ifndef ROCKSDB_LITE |
346 | // may temporarily unlock and lock the mutex. | |
f67539c2 TL |
347 | NotifyOnFlushCompleted(cfd, mutable_cf_options, |
348 | flush_job.GetCommittedFlushJobsInfo()); | |
7c673cae FG |
349 | auto sfm = static_cast<SstFileManagerImpl*>( |
350 | immutable_db_options_.sst_file_manager.get()); | |
351 | if (sfm) { | |
352 | // Notify sst_file_manager that a new file was added | |
353 | std::string file_path = MakeTableFileName( | |
11fdf7f2 | 354 | cfd->ioptions()->cf_paths[0].path, file_meta.fd.GetNumber()); |
1e59de90 TL |
355 | // TODO (PR7798). We should only add the file to the FileManager if it |
356 | // exists. Otherwise, some tests may fail. Ignore the error in the | |
357 | // interim. | |
358 | sfm->OnAddFile(file_path).PermitUncheckedError(); | |
11fdf7f2 | 359 | if (sfm->IsMaxAllowedSpaceReached()) { |
494da23a TL |
360 | Status new_bg_error = |
361 | Status::SpaceLimit("Max allowed space was reached"); | |
7c673cae FG |
362 | TEST_SYNC_POINT_CALLBACK( |
363 | "DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached", | |
11fdf7f2 | 364 | &new_bg_error); |
1e59de90 | 365 | error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); |
7c673cae FG |
366 | } |
367 | } | |
368 | #endif // ROCKSDB_LITE | |
369 | } | |
f67539c2 | 370 | TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:Finish"); |
7c673cae FG |
371 | return s; |
372 | } | |
373 | ||
11fdf7f2 TL |
374 | Status DBImpl::FlushMemTablesToOutputFiles( |
375 | const autovector<BGFlushArg>& bg_flush_args, bool* made_progress, | |
494da23a TL |
376 | JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) { |
377 | if (immutable_db_options_.atomic_flush) { | |
378 | return AtomicFlushMemTablesToOutputFiles( | |
379 | bg_flush_args, made_progress, job_context, log_buffer, thread_pri); | |
380 | } | |
1e59de90 | 381 | assert(bg_flush_args.size() == 1); |
494da23a TL |
382 | std::vector<SequenceNumber> snapshot_seqs; |
383 | SequenceNumber earliest_write_conflict_snapshot; | |
384 | SnapshotChecker* snapshot_checker; | |
385 | GetSnapshotContext(job_context, &snapshot_seqs, | |
386 | &earliest_write_conflict_snapshot, &snapshot_checker); | |
1e59de90 TL |
387 | const auto& bg_flush_arg = bg_flush_args[0]; |
388 | ColumnFamilyData* cfd = bg_flush_arg.cfd_; | |
389 | // intentional infrequent copy for each flush | |
390 | MutableCFOptions mutable_cf_options_copy = *cfd->GetLatestMutableCFOptions(); | |
391 | SuperVersionContext* superversion_context = | |
392 | bg_flush_arg.superversion_context_; | |
393 | Status s = FlushMemTableToOutputFile( | |
394 | cfd, mutable_cf_options_copy, made_progress, job_context, | |
395 | superversion_context, snapshot_seqs, earliest_write_conflict_snapshot, | |
396 | snapshot_checker, log_buffer, thread_pri); | |
397 | return s; | |
494da23a TL |
398 | } |
399 | ||
400 | /* | |
401 | * Atomically flushes multiple column families. | |
402 | * | |
403 | * For each column family, all memtables with ID smaller than or equal to the | |
404 | * ID specified in bg_flush_args will be flushed. Only after all column | |
405 | * families finish flush will this function commit to MANIFEST. If any of the | |
406 | * column families are not flushed successfully, this function does not have | |
407 | * any side-effect on the state of the database. | |
408 | */ | |
409 | Status DBImpl::AtomicFlushMemTablesToOutputFiles( | |
410 | const autovector<BGFlushArg>& bg_flush_args, bool* made_progress, | |
411 | JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) { | |
412 | mutex_.AssertHeld(); | |
413 | ||
414 | autovector<ColumnFamilyData*> cfds; | |
415 | for (const auto& arg : bg_flush_args) { | |
416 | cfds.emplace_back(arg.cfd_); | |
417 | } | |
418 | ||
419 | #ifndef NDEBUG | |
420 | for (const auto cfd : cfds) { | |
421 | assert(cfd->imm()->NumNotFlushed() != 0); | |
422 | assert(cfd->imm()->IsFlushPending()); | |
1e59de90 | 423 | assert(cfd->GetFlushReason() == cfds[0]->GetFlushReason()); |
494da23a TL |
424 | } |
425 | #endif /* !NDEBUG */ | |
426 | ||
427 | std::vector<SequenceNumber> snapshot_seqs; | |
428 | SequenceNumber earliest_write_conflict_snapshot; | |
429 | SnapshotChecker* snapshot_checker; | |
430 | GetSnapshotContext(job_context, &snapshot_seqs, | |
431 | &earliest_write_conflict_snapshot, &snapshot_checker); | |
432 | ||
20effc67 | 433 | autovector<FSDirectory*> distinct_output_dirs; |
494da23a | 434 | autovector<std::string> distinct_output_dir_paths; |
f67539c2 | 435 | std::vector<std::unique_ptr<FlushJob>> jobs; |
494da23a TL |
436 | std::vector<MutableCFOptions> all_mutable_cf_options; |
437 | int num_cfs = static_cast<int>(cfds.size()); | |
438 | all_mutable_cf_options.reserve(num_cfs); | |
439 | for (int i = 0; i < num_cfs; ++i) { | |
440 | auto cfd = cfds[i]; | |
20effc67 | 441 | FSDirectory* data_dir = GetDataDir(cfd, 0U); |
494da23a TL |
442 | const std::string& curr_path = cfd->ioptions()->cf_paths[0].path; |
443 | ||
444 | // Add to distinct output directories if eligible. Use linear search. Since | |
445 | // the number of elements in the vector is not large, performance should be | |
446 | // tolerable. | |
447 | bool found = false; | |
448 | for (const auto& path : distinct_output_dir_paths) { | |
449 | if (path == curr_path) { | |
450 | found = true; | |
451 | break; | |
452 | } | |
453 | } | |
454 | if (!found) { | |
455 | distinct_output_dir_paths.emplace_back(curr_path); | |
456 | distinct_output_dirs.emplace_back(data_dir); | |
457 | } | |
458 | ||
459 | all_mutable_cf_options.emplace_back(*cfd->GetLatestMutableCFOptions()); | |
460 | const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.back(); | |
1e59de90 | 461 | uint64_t max_memtable_id = bg_flush_args[i].max_memtable_id_; |
f67539c2 | 462 | jobs.emplace_back(new FlushJob( |
494da23a | 463 | dbname_, cfd, immutable_db_options_, mutable_cf_options, |
f67539c2 | 464 | max_memtable_id, file_options_for_compaction_, versions_.get(), &mutex_, |
494da23a TL |
465 | &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot, |
466 | snapshot_checker, job_context, log_buffer, directories_.GetDbDir(), | |
467 | data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), | |
468 | stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, | |
469 | false /* sync_output_directory */, false /* write_manifest */, | |
1e59de90 TL |
470 | thread_pri, io_tracer_, seqno_time_mapping_, db_id_, db_session_id_, |
471 | cfd->GetFullHistoryTsLow(), &blob_callback_)); | |
494da23a TL |
472 | } |
473 | ||
474 | std::vector<FileMetaData> file_meta(num_cfs); | |
1e59de90 TL |
475 | // Use of deque<bool> because vector<bool> |
476 | // is specific and doesn't allow &v[i]. | |
477 | std::deque<bool> switched_to_mempurge(num_cfs, false); | |
494da23a | 478 | Status s; |
1e59de90 | 479 | IOStatus log_io_s = IOStatus::OK(); |
494da23a TL |
480 | assert(num_cfs == static_cast<int>(jobs.size())); |
481 | ||
482 | #ifndef ROCKSDB_LITE | |
483 | for (int i = 0; i != num_cfs; ++i) { | |
484 | const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.at(i); | |
485 | // may temporarily unlock and lock the mutex. | |
486 | NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options, | |
f67539c2 | 487 | job_context->job_id); |
494da23a TL |
488 | } |
489 | #endif /* !ROCKSDB_LITE */ | |
490 | ||
491 | if (logfile_number_ > 0) { | |
492 | // TODO (yanqin) investigate whether we should sync the closed logs for | |
493 | // single column family case. | |
1e59de90 TL |
494 | VersionEdit synced_wals; |
495 | mutex_.Unlock(); | |
496 | log_io_s = SyncClosedLogs(job_context, &synced_wals); | |
497 | mutex_.Lock(); | |
498 | if (log_io_s.ok() && synced_wals.IsWalAddition()) { | |
499 | log_io_s = status_to_io_status(ApplyWALToManifest(&synced_wals)); | |
500 | } | |
501 | ||
502 | if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() && | |
503 | !log_io_s.IsColumnFamilyDropped()) { | |
504 | if (total_log_size_ > 0) { | |
505 | error_handler_.SetBGError(log_io_s, BackgroundErrorReason::kFlush); | |
506 | } else { | |
507 | // If the WAL is empty, we use different error reason | |
508 | error_handler_.SetBGError(log_io_s, BackgroundErrorReason::kFlushNoWAL); | |
509 | } | |
510 | } | |
494da23a | 511 | } |
1e59de90 | 512 | s = log_io_s; |
494da23a TL |
513 | |
514 | // exec_status stores the execution status of flush_jobs as | |
515 | // <bool /* executed */, Status /* status code */> | |
516 | autovector<std::pair<bool, Status>> exec_status; | |
1e59de90 | 517 | std::vector<bool> pick_status; |
494da23a TL |
518 | for (int i = 0; i != num_cfs; ++i) { |
519 | // Initially all jobs are not executed, with status OK. | |
520 | exec_status.emplace_back(false, Status::OK()); | |
1e59de90 | 521 | pick_status.push_back(false); |
494da23a TL |
522 | } |
523 | ||
524 | if (s.ok()) { | |
1e59de90 TL |
525 | for (int i = 0; i != num_cfs; ++i) { |
526 | jobs[i]->PickMemTable(); | |
527 | pick_status[i] = true; | |
528 | } | |
529 | } | |
530 | ||
531 | if (s.ok()) { | |
532 | assert(switched_to_mempurge.size() == | |
533 | static_cast<long unsigned int>(num_cfs)); | |
494da23a TL |
534 | // TODO (yanqin): parallelize jobs with threads. |
535 | for (int i = 1; i != num_cfs; ++i) { | |
536 | exec_status[i].second = | |
1e59de90 TL |
537 | jobs[i]->Run(&logs_with_prep_tracker_, &file_meta[i], |
538 | &(switched_to_mempurge.at(i))); | |
494da23a TL |
539 | exec_status[i].first = true; |
540 | } | |
541 | if (num_cfs > 1) { | |
542 | TEST_SYNC_POINT( | |
543 | "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:1"); | |
544 | TEST_SYNC_POINT( | |
545 | "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2"); | |
546 | } | |
f67539c2 TL |
547 | assert(exec_status.size() > 0); |
548 | assert(!file_meta.empty()); | |
1e59de90 TL |
549 | exec_status[0].second = jobs[0]->Run( |
550 | &logs_with_prep_tracker_, file_meta.data() /* &file_meta[0] */, | |
551 | switched_to_mempurge.empty() ? nullptr : &(switched_to_mempurge.at(0))); | |
494da23a TL |
552 | exec_status[0].first = true; |
553 | ||
554 | Status error_status; | |
555 | for (const auto& e : exec_status) { | |
556 | if (!e.second.ok()) { | |
557 | s = e.second; | |
f67539c2 TL |
558 | if (!e.second.IsShutdownInProgress() && |
559 | !e.second.IsColumnFamilyDropped()) { | |
494da23a TL |
560 | // If a flush job did not return OK, and the CF is not dropped, and |
561 | // the DB is not shutting down, then we have to return this result to | |
562 | // caller later. | |
563 | error_status = e.second; | |
564 | } | |
565 | } | |
566 | } | |
567 | ||
568 | s = error_status.ok() ? s : error_status; | |
569 | } | |
570 | ||
f67539c2 TL |
571 | if (s.IsColumnFamilyDropped()) { |
572 | s = Status::OK(); | |
573 | } | |
574 | ||
494da23a TL |
575 | if (s.ok() || s.IsShutdownInProgress()) { |
576 | // Sync on all distinct output directories. | |
577 | for (auto dir : distinct_output_dirs) { | |
578 | if (dir != nullptr) { | |
1e59de90 TL |
579 | Status error_status = dir->FsyncWithDirOptions( |
580 | IOOptions(), nullptr, | |
581 | DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced)); | |
f67539c2 TL |
582 | if (!error_status.ok()) { |
583 | s = error_status; | |
494da23a TL |
584 | break; |
585 | } | |
586 | } | |
587 | } | |
f67539c2 TL |
588 | } else { |
589 | // Need to undo atomic flush if something went wrong, i.e. s is not OK and | |
590 | // it is not because of CF drop. | |
591 | // Have to cancel the flush jobs that have NOT executed because we need to | |
592 | // unref the versions. | |
593 | for (int i = 0; i != num_cfs; ++i) { | |
1e59de90 | 594 | if (pick_status[i] && !exec_status[i].first) { |
f67539c2 TL |
595 | jobs[i]->Cancel(); |
596 | } | |
597 | } | |
598 | for (int i = 0; i != num_cfs; ++i) { | |
1e59de90 | 599 | if (exec_status[i].second.ok() && exec_status[i].first) { |
f67539c2 TL |
600 | auto& mems = jobs[i]->GetMemTables(); |
601 | cfds[i]->imm()->RollbackMemtableFlush(mems, | |
602 | file_meta[i].fd.GetNumber()); | |
603 | } | |
604 | } | |
494da23a TL |
605 | } |
606 | ||
607 | if (s.ok()) { | |
1e59de90 TL |
608 | const auto wait_to_install_func = |
609 | [&]() -> std::pair<Status, bool /*continue to wait*/> { | |
610 | if (!versions_->io_status().ok()) { | |
611 | // Something went wrong elsewhere, we cannot count on waiting for our | |
612 | // turn to write/sync to MANIFEST or CURRENT. Just return. | |
613 | return std::make_pair(versions_->io_status(), false); | |
614 | } else if (shutting_down_.load(std::memory_order_acquire)) { | |
615 | return std::make_pair(Status::ShutdownInProgress(), false); | |
616 | } | |
494da23a TL |
617 | bool ready = true; |
618 | for (size_t i = 0; i != cfds.size(); ++i) { | |
f67539c2 | 619 | const auto& mems = jobs[i]->GetMemTables(); |
494da23a TL |
620 | if (cfds[i]->IsDropped()) { |
621 | // If the column family is dropped, then do not wait. | |
622 | continue; | |
623 | } else if (!mems.empty() && | |
624 | cfds[i]->imm()->GetEarliestMemTableID() < mems[0]->GetID()) { | |
625 | // If a flush job needs to install the flush result for mems and | |
626 | // mems[0] is not the earliest memtable, it means another thread must | |
627 | // be installing flush results for the same column family, then the | |
628 | // current thread needs to wait. | |
629 | ready = false; | |
630 | break; | |
631 | } else if (mems.empty() && cfds[i]->imm()->GetEarliestMemTableID() <= | |
632 | bg_flush_args[i].max_memtable_id_) { | |
633 | // If a flush job does not need to install flush results, then it has | |
634 | // to wait until all memtables up to max_memtable_id_ (inclusive) are | |
635 | // installed. | |
636 | ready = false; | |
637 | break; | |
638 | } | |
639 | } | |
1e59de90 | 640 | return std::make_pair(Status::OK(), !ready); |
494da23a TL |
641 | }; |
642 | ||
1e59de90 TL |
643 | bool resuming_from_bg_err = |
644 | error_handler_.IsDBStopped() || | |
645 | (cfds[0]->GetFlushReason() == FlushReason::kErrorRecovery || | |
646 | cfds[0]->GetFlushReason() == FlushReason::kErrorRecoveryRetryFlush); | |
647 | while ((!resuming_from_bg_err || error_handler_.GetRecoveryError().ok())) { | |
648 | std::pair<Status, bool> res = wait_to_install_func(); | |
649 | ||
650 | TEST_SYNC_POINT_CALLBACK( | |
651 | "DBImpl::AtomicFlushMemTablesToOutputFiles:WaitToCommit", &res); | |
652 | ||
653 | if (!res.first.ok()) { | |
654 | s = res.first; | |
655 | break; | |
656 | } else if (!res.second) { | |
657 | break; | |
658 | } | |
494da23a | 659 | atomic_flush_install_cv_.Wait(); |
1e59de90 TL |
660 | |
661 | resuming_from_bg_err = | |
662 | error_handler_.IsDBStopped() || | |
663 | (cfds[0]->GetFlushReason() == FlushReason::kErrorRecovery || | |
664 | cfds[0]->GetFlushReason() == FlushReason::kErrorRecoveryRetryFlush); | |
494da23a TL |
665 | } |
666 | ||
1e59de90 TL |
667 | if (!resuming_from_bg_err) { |
668 | // If not resuming from bg err, then we determine future action based on | |
669 | // whether we hit background error. | |
670 | if (s.ok()) { | |
671 | s = error_handler_.GetBGError(); | |
672 | } | |
673 | } else if (s.ok()) { | |
674 | // If resuming from bg err, we still rely on wait_to_install_func()'s | |
675 | // result to determine future action. If wait_to_install_func() returns | |
676 | // non-ok already, then we should not proceed to flush result | |
677 | // installation. | |
678 | s = error_handler_.GetRecoveryError(); | |
679 | } | |
494da23a TL |
680 | } |
681 | ||
682 | if (s.ok()) { | |
683 | autovector<ColumnFamilyData*> tmp_cfds; | |
684 | autovector<const autovector<MemTable*>*> mems_list; | |
685 | autovector<const MutableCFOptions*> mutable_cf_options_list; | |
686 | autovector<FileMetaData*> tmp_file_meta; | |
1e59de90 TL |
687 | autovector<std::list<std::unique_ptr<FlushJobInfo>>*> |
688 | committed_flush_jobs_info; | |
494da23a | 689 | for (int i = 0; i != num_cfs; ++i) { |
f67539c2 | 690 | const auto& mems = jobs[i]->GetMemTables(); |
494da23a TL |
691 | if (!cfds[i]->IsDropped() && !mems.empty()) { |
692 | tmp_cfds.emplace_back(cfds[i]); | |
693 | mems_list.emplace_back(&mems); | |
694 | mutable_cf_options_list.emplace_back(&all_mutable_cf_options[i]); | |
695 | tmp_file_meta.emplace_back(&file_meta[i]); | |
1e59de90 TL |
696 | #ifndef ROCKSDB_LITE |
697 | committed_flush_jobs_info.emplace_back( | |
698 | jobs[i]->GetCommittedFlushJobsInfo()); | |
699 | #endif //! ROCKSDB_LITE | |
494da23a TL |
700 | } |
701 | } | |
702 | ||
703 | s = InstallMemtableAtomicFlushResults( | |
704 | nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list, | |
1e59de90 TL |
705 | versions_.get(), &logs_with_prep_tracker_, &mutex_, tmp_file_meta, |
706 | committed_flush_jobs_info, &job_context->memtables_to_free, | |
707 | directories_.GetDbDir(), log_buffer); | |
494da23a TL |
708 | } |
709 | ||
f67539c2 | 710 | if (s.ok()) { |
494da23a TL |
711 | assert(num_cfs == |
712 | static_cast<int>(job_context->superversion_contexts.size())); | |
713 | for (int i = 0; i != num_cfs; ++i) { | |
20effc67 TL |
714 | assert(cfds[i]); |
715 | ||
494da23a TL |
716 | if (cfds[i]->IsDropped()) { |
717 | continue; | |
718 | } | |
719 | InstallSuperVersionAndScheduleWork(cfds[i], | |
720 | &job_context->superversion_contexts[i], | |
721 | all_mutable_cf_options[i]); | |
20effc67 TL |
722 | |
723 | const std::string& column_family_name = cfds[i]->GetName(); | |
724 | ||
725 | Version* const current = cfds[i]->current(); | |
726 | assert(current); | |
727 | ||
728 | const VersionStorageInfo* const storage_info = current->storage_info(); | |
729 | assert(storage_info); | |
730 | ||
494da23a TL |
731 | VersionStorageInfo::LevelSummaryStorage tmp; |
732 | ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n", | |
20effc67 TL |
733 | column_family_name.c_str(), |
734 | storage_info->LevelSummary(&tmp)); | |
735 | ||
736 | const auto& blob_files = storage_info->GetBlobFiles(); | |
737 | if (!blob_files.empty()) { | |
1e59de90 TL |
738 | assert(blob_files.front()); |
739 | assert(blob_files.back()); | |
740 | ||
741 | ROCKS_LOG_BUFFER( | |
742 | log_buffer, | |
743 | "[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64 "\n", | |
744 | column_family_name.c_str(), blob_files.front()->GetBlobFileNumber(), | |
745 | blob_files.back()->GetBlobFileNumber()); | |
20effc67 | 746 | } |
494da23a TL |
747 | } |
748 | if (made_progress) { | |
749 | *made_progress = true; | |
750 | } | |
751 | #ifndef ROCKSDB_LITE | |
752 | auto sfm = static_cast<SstFileManagerImpl*>( | |
753 | immutable_db_options_.sst_file_manager.get()); | |
f67539c2 | 754 | assert(all_mutable_cf_options.size() == static_cast<size_t>(num_cfs)); |
1e59de90 TL |
755 | for (int i = 0; s.ok() && i != num_cfs; ++i) { |
756 | // If mempurge happened instead of Flush, | |
757 | // no NotifyOnFlushCompleted call (no SST file created). | |
758 | if (switched_to_mempurge[i]) { | |
759 | continue; | |
760 | } | |
494da23a TL |
761 | if (cfds[i]->IsDropped()) { |
762 | continue; | |
763 | } | |
f67539c2 TL |
764 | NotifyOnFlushCompleted(cfds[i], all_mutable_cf_options[i], |
765 | jobs[i]->GetCommittedFlushJobsInfo()); | |
494da23a TL |
766 | if (sfm) { |
767 | std::string file_path = MakeTableFileName( | |
768 | cfds[i]->ioptions()->cf_paths[0].path, file_meta[i].fd.GetNumber()); | |
1e59de90 TL |
769 | // TODO (PR7798). We should only add the file to the FileManager if it |
770 | // exists. Otherwise, some tests may fail. Ignore the error in the | |
771 | // interim. | |
772 | sfm->OnAddFile(file_path).PermitUncheckedError(); | |
494da23a TL |
773 | if (sfm->IsMaxAllowedSpaceReached() && |
774 | error_handler_.GetBGError().ok()) { | |
775 | Status new_bg_error = | |
776 | Status::SpaceLimit("Max allowed space was reached"); | |
1e59de90 TL |
777 | error_handler_.SetBGError(new_bg_error, |
778 | BackgroundErrorReason::kFlush); | |
494da23a TL |
779 | } |
780 | } | |
11fdf7f2 | 781 | } |
494da23a TL |
782 | #endif // ROCKSDB_LITE |
783 | } | |
784 | ||
20effc67 TL |
785 | // Need to undo atomic flush if something went wrong, i.e. s is not OK and |
786 | // it is not because of CF drop. | |
787 | if (!s.ok() && !s.IsColumnFamilyDropped()) { | |
1e59de90 | 788 | if (log_io_s.ok()) { |
20effc67 TL |
789 | // Error while writing to MANIFEST. |
790 | // In fact, versions_->io_status() can also be the result of renaming | |
791 | // CURRENT file. With current code, it's just difficult to tell. So just | |
792 | // be pessimistic and try write to a new MANIFEST. | |
793 | // TODO: distinguish between MANIFEST write and CURRENT renaming | |
794 | if (!versions_->io_status().ok()) { | |
1e59de90 TL |
795 | // If WAL sync is successful (either WAL size is 0 or there is no IO |
796 | // error), all the Manifest write will be map to soft error. | |
797 | // TODO: kManifestWriteNoWAL and kFlushNoWAL are misleading. Refactor | |
798 | // is needed. | |
799 | error_handler_.SetBGError(s, | |
800 | BackgroundErrorReason::kManifestWriteNoWAL); | |
20effc67 | 801 | } else { |
1e59de90 TL |
802 | // If WAL sync is successful (either WAL size is 0 or there is no IO |
803 | // error), all the other SST file write errors will be set as | |
804 | // kFlushNoWAL. | |
805 | error_handler_.SetBGError(s, BackgroundErrorReason::kFlushNoWAL); | |
20effc67 TL |
806 | } |
807 | } else { | |
1e59de90 | 808 | assert(s == log_io_s); |
20effc67 | 809 | Status new_bg_error = s; |
1e59de90 | 810 | error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); |
20effc67 | 811 | } |
11fdf7f2 | 812 | } |
494da23a | 813 | |
11fdf7f2 TL |
814 | return s; |
815 | } | |
816 | ||
7c673cae FG |
817 | void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, |
818 | const MutableCFOptions& mutable_cf_options, | |
f67539c2 | 819 | int job_id) { |
7c673cae FG |
820 | #ifndef ROCKSDB_LITE |
821 | if (immutable_db_options_.listeners.size() == 0U) { | |
822 | return; | |
823 | } | |
824 | mutex_.AssertHeld(); | |
825 | if (shutting_down_.load(std::memory_order_acquire)) { | |
826 | return; | |
827 | } | |
828 | bool triggered_writes_slowdown = | |
829 | (cfd->current()->storage_info()->NumLevelFiles(0) >= | |
830 | mutable_cf_options.level0_slowdown_writes_trigger); | |
831 | bool triggered_writes_stop = | |
832 | (cfd->current()->storage_info()->NumLevelFiles(0) >= | |
833 | mutable_cf_options.level0_stop_writes_trigger); | |
834 | // release lock while notifying events | |
835 | mutex_.Unlock(); | |
836 | { | |
f67539c2 | 837 | FlushJobInfo info{}; |
494da23a | 838 | info.cf_id = cfd->GetID(); |
7c673cae FG |
839 | info.cf_name = cfd->GetName(); |
840 | // TODO(yhchiang): make db_paths dynamic in case flush does not | |
841 | // go to L0 in the future. | |
f67539c2 TL |
842 | const uint64_t file_number = file_meta->fd.GetNumber(); |
843 | info.file_path = | |
844 | MakeTableFileName(cfd->ioptions()->cf_paths[0].path, file_number); | |
845 | info.file_number = file_number; | |
7c673cae FG |
846 | info.thread_id = env_->GetThreadID(); |
847 | info.job_id = job_id; | |
848 | info.triggered_writes_slowdown = triggered_writes_slowdown; | |
849 | info.triggered_writes_stop = triggered_writes_stop; | |
11fdf7f2 TL |
850 | info.smallest_seqno = file_meta->fd.smallest_seqno; |
851 | info.largest_seqno = file_meta->fd.largest_seqno; | |
11fdf7f2 | 852 | info.flush_reason = cfd->GetFlushReason(); |
7c673cae FG |
853 | for (auto listener : immutable_db_options_.listeners) { |
854 | listener->OnFlushBegin(this, info); | |
855 | } | |
856 | } | |
857 | mutex_.Lock(); | |
858 | // no need to signal bg_cv_ as it will be signaled at the end of the | |
859 | // flush process. | |
11fdf7f2 TL |
860 | #else |
861 | (void)cfd; | |
862 | (void)file_meta; | |
863 | (void)mutable_cf_options; | |
864 | (void)job_id; | |
7c673cae FG |
865 | #endif // ROCKSDB_LITE |
866 | } | |
867 | ||
f67539c2 TL |
868 | void DBImpl::NotifyOnFlushCompleted( |
869 | ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, | |
870 | std::list<std::unique_ptr<FlushJobInfo>>* flush_jobs_info) { | |
7c673cae | 871 | #ifndef ROCKSDB_LITE |
f67539c2 | 872 | assert(flush_jobs_info != nullptr); |
7c673cae FG |
873 | if (immutable_db_options_.listeners.size() == 0U) { |
874 | return; | |
875 | } | |
876 | mutex_.AssertHeld(); | |
877 | if (shutting_down_.load(std::memory_order_acquire)) { | |
878 | return; | |
879 | } | |
880 | bool triggered_writes_slowdown = | |
881 | (cfd->current()->storage_info()->NumLevelFiles(0) >= | |
882 | mutable_cf_options.level0_slowdown_writes_trigger); | |
883 | bool triggered_writes_stop = | |
884 | (cfd->current()->storage_info()->NumLevelFiles(0) >= | |
885 | mutable_cf_options.level0_stop_writes_trigger); | |
886 | // release lock while notifying events | |
887 | mutex_.Unlock(); | |
888 | { | |
f67539c2 TL |
889 | for (auto& info : *flush_jobs_info) { |
890 | info->triggered_writes_slowdown = triggered_writes_slowdown; | |
891 | info->triggered_writes_stop = triggered_writes_stop; | |
892 | for (auto listener : immutable_db_options_.listeners) { | |
893 | listener->OnFlushCompleted(this, *info); | |
894 | } | |
1e59de90 TL |
895 | TEST_SYNC_POINT( |
896 | "DBImpl::NotifyOnFlushCompleted::PostAllOnFlushCompleted"); | |
7c673cae | 897 | } |
f67539c2 | 898 | flush_jobs_info->clear(); |
7c673cae FG |
899 | } |
900 | mutex_.Lock(); | |
901 | // no need to signal bg_cv_ as it will be signaled at the end of the | |
902 | // flush process. | |
11fdf7f2 TL |
903 | #else |
904 | (void)cfd; | |
11fdf7f2 | 905 | (void)mutable_cf_options; |
f67539c2 | 906 | (void)flush_jobs_info; |
7c673cae FG |
907 | #endif // ROCKSDB_LITE |
908 | } | |
909 | ||
910 | Status DBImpl::CompactRange(const CompactRangeOptions& options, | |
911 | ColumnFamilyHandle* column_family, | |
1e59de90 TL |
912 | const Slice* begin_without_ts, |
913 | const Slice* end_without_ts) { | |
914 | if (manual_compaction_paused_.load(std::memory_order_acquire) > 0) { | |
915 | return Status::Incomplete(Status::SubCode::kManualCompactionPaused); | |
916 | } | |
917 | ||
918 | if (options.canceled && options.canceled->load(std::memory_order_acquire)) { | |
919 | return Status::Incomplete(Status::SubCode::kManualCompactionPaused); | |
920 | } | |
921 | ||
922 | const Comparator* const ucmp = column_family->GetComparator(); | |
923 | assert(ucmp); | |
924 | size_t ts_sz = ucmp->timestamp_size(); | |
925 | if (ts_sz == 0) { | |
926 | return CompactRangeInternal(options, column_family, begin_without_ts, | |
927 | end_without_ts, "" /*trim_ts*/); | |
928 | } | |
929 | ||
930 | std::string begin_str; | |
931 | std::string end_str; | |
932 | ||
933 | // CompactRange compact all keys: [begin, end] inclusively. Add maximum | |
934 | // timestamp to include all `begin` keys, and add minimal timestamp to include | |
935 | // all `end` keys. | |
936 | if (begin_without_ts != nullptr) { | |
937 | AppendKeyWithMaxTimestamp(&begin_str, *begin_without_ts, ts_sz); | |
938 | } | |
939 | if (end_without_ts != nullptr) { | |
940 | AppendKeyWithMinTimestamp(&end_str, *end_without_ts, ts_sz); | |
941 | } | |
942 | Slice begin(begin_str); | |
943 | Slice end(end_str); | |
944 | ||
945 | Slice* begin_with_ts = begin_without_ts ? &begin : nullptr; | |
946 | Slice* end_with_ts = end_without_ts ? &end : nullptr; | |
947 | ||
948 | return CompactRangeInternal(options, column_family, begin_with_ts, | |
949 | end_with_ts, "" /*trim_ts*/); | |
950 | } | |
951 | ||
952 | Status DBImpl::IncreaseFullHistoryTsLow(ColumnFamilyHandle* column_family, | |
953 | std::string ts_low) { | |
954 | ColumnFamilyData* cfd = nullptr; | |
955 | if (column_family == nullptr) { | |
956 | cfd = default_cf_handle_->cfd(); | |
957 | } else { | |
958 | auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family); | |
959 | assert(cfh != nullptr); | |
960 | cfd = cfh->cfd(); | |
961 | } | |
962 | assert(cfd != nullptr && cfd->user_comparator() != nullptr); | |
963 | if (cfd->user_comparator()->timestamp_size() == 0) { | |
964 | return Status::InvalidArgument( | |
965 | "Timestamp is not enabled in this column family"); | |
966 | } | |
967 | if (cfd->user_comparator()->timestamp_size() != ts_low.size()) { | |
968 | return Status::InvalidArgument("ts_low size mismatch"); | |
969 | } | |
970 | return IncreaseFullHistoryTsLowImpl(cfd, ts_low); | |
971 | } | |
972 | ||
973 | Status DBImpl::IncreaseFullHistoryTsLowImpl(ColumnFamilyData* cfd, | |
974 | std::string ts_low) { | |
975 | VersionEdit edit; | |
976 | edit.SetColumnFamily(cfd->GetID()); | |
977 | edit.SetFullHistoryTsLow(ts_low); | |
978 | TEST_SYNC_POINT_CALLBACK("DBImpl::IncreaseFullHistoryTsLowImpl:BeforeEdit", | |
979 | &edit); | |
980 | ||
981 | InstrumentedMutexLock l(&mutex_); | |
982 | std::string current_ts_low = cfd->GetFullHistoryTsLow(); | |
983 | const Comparator* ucmp = cfd->user_comparator(); | |
984 | assert(ucmp->timestamp_size() == ts_low.size() && !ts_low.empty()); | |
985 | if (!current_ts_low.empty() && | |
986 | ucmp->CompareTimestamp(ts_low, current_ts_low) < 0) { | |
987 | return Status::InvalidArgument("Cannot decrease full_history_ts_low"); | |
988 | } | |
989 | ||
990 | Status s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), | |
991 | &edit, &mutex_, directories_.GetDbDir()); | |
992 | if (!s.ok()) { | |
993 | return s; | |
994 | } | |
995 | current_ts_low = cfd->GetFullHistoryTsLow(); | |
996 | if (!current_ts_low.empty() && | |
997 | ucmp->CompareTimestamp(current_ts_low, ts_low) > 0) { | |
998 | std::stringstream oss; | |
999 | oss << "full_history_ts_low: " << Slice(current_ts_low).ToString(true) | |
1000 | << " is set to be higher than the requested " | |
1001 | "timestamp: " | |
1002 | << Slice(ts_low).ToString(true) << std::endl; | |
1003 | return Status::TryAgain(oss.str()); | |
1004 | } | |
1005 | return Status::OK(); | |
1006 | } | |
1007 | ||
1008 | Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options, | |
1009 | ColumnFamilyHandle* column_family, | |
1010 | const Slice* begin, const Slice* end, | |
1011 | const std::string& trim_ts) { | |
20effc67 | 1012 | auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family); |
11fdf7f2 TL |
1013 | auto cfd = cfh->cfd(); |
1014 | ||
1015 | if (options.target_path_id >= cfd->ioptions()->cf_paths.size()) { | |
7c673cae FG |
1016 | return Status::InvalidArgument("Invalid target path ID"); |
1017 | } | |
1018 | ||
11fdf7f2 | 1019 | bool flush_needed = true; |
1e59de90 TL |
1020 | |
1021 | // Update full_history_ts_low if it's set | |
1022 | if (options.full_history_ts_low != nullptr && | |
1023 | !options.full_history_ts_low->empty()) { | |
1024 | std::string ts_low = options.full_history_ts_low->ToString(); | |
1025 | if (begin != nullptr || end != nullptr) { | |
1026 | return Status::InvalidArgument( | |
1027 | "Cannot specify compaction range with full_history_ts_low"); | |
1028 | } | |
1029 | Status s = IncreaseFullHistoryTsLowImpl(cfd, ts_low); | |
1030 | if (!s.ok()) { | |
1031 | LogFlush(immutable_db_options_.info_log); | |
1032 | return s; | |
1033 | } | |
1034 | } | |
1035 | ||
1036 | Status s; | |
11fdf7f2 TL |
1037 | if (begin != nullptr && end != nullptr) { |
1038 | // TODO(ajkr): We could also optimize away the flush in certain cases where | |
1039 | // one/both sides of the interval are unbounded. But it requires more | |
1040 | // changes to RangesOverlapWithMemtables. | |
1041 | Range range(*begin, *end); | |
f67539c2 | 1042 | SuperVersion* super_version = cfd->GetReferencedSuperVersion(this); |
1e59de90 TL |
1043 | s = cfd->RangesOverlapWithMemtables( |
1044 | {range}, super_version, immutable_db_options_.allow_data_in_errors, | |
1045 | &flush_needed); | |
11fdf7f2 TL |
1046 | CleanupSuperVersion(super_version); |
1047 | } | |
1048 | ||
1e59de90 | 1049 | if (s.ok() && flush_needed) { |
11fdf7f2 TL |
1050 | FlushOptions fo; |
1051 | fo.allow_write_stall = options.allow_write_stall; | |
494da23a TL |
1052 | if (immutable_db_options_.atomic_flush) { |
1053 | autovector<ColumnFamilyData*> cfds; | |
1054 | mutex_.Lock(); | |
1055 | SelectColumnFamiliesForAtomicFlush(&cfds); | |
1056 | mutex_.Unlock(); | |
1057 | s = AtomicFlushMemTables(cfds, fo, FlushReason::kManualCompaction, | |
1e59de90 | 1058 | false /* entered_write_thread */); |
494da23a TL |
1059 | } else { |
1060 | s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction, | |
1e59de90 | 1061 | false /* entered_write_thread */); |
494da23a | 1062 | } |
11fdf7f2 TL |
1063 | if (!s.ok()) { |
1064 | LogFlush(immutable_db_options_.info_log); | |
1065 | return s; | |
1066 | } | |
7c673cae FG |
1067 | } |
1068 | ||
20effc67 TL |
1069 | constexpr int kInvalidLevel = -1; |
1070 | int final_output_level = kInvalidLevel; | |
1071 | bool exclusive = options.exclusive_manual_compaction; | |
7c673cae FG |
1072 | if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal && |
1073 | cfd->NumberLevels() > 1) { | |
1074 | // Always compact all files together. | |
7c673cae | 1075 | final_output_level = cfd->NumberLevels() - 1; |
11fdf7f2 TL |
1076 | // if bottom most level is reserved |
1077 | if (immutable_db_options_.allow_ingest_behind) { | |
1078 | final_output_level--; | |
1079 | } | |
1080 | s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels, | |
f67539c2 | 1081 | final_output_level, options, begin, end, exclusive, |
1e59de90 TL |
1082 | false, std::numeric_limits<uint64_t>::max(), |
1083 | trim_ts); | |
7c673cae | 1084 | } else { |
20effc67 TL |
1085 | int first_overlapped_level = kInvalidLevel; |
1086 | int max_overlapped_level = kInvalidLevel; | |
1087 | { | |
1088 | SuperVersion* super_version = cfd->GetReferencedSuperVersion(this); | |
1089 | Version* current_version = super_version->current; | |
1090 | ReadOptions ro; | |
1091 | ro.total_order_seek = true; | |
1092 | bool overlap; | |
1093 | for (int level = 0; | |
1094 | level < current_version->storage_info()->num_non_empty_levels(); | |
1095 | level++) { | |
1096 | overlap = true; | |
1097 | if (begin != nullptr && end != nullptr) { | |
1098 | Status status = current_version->OverlapWithLevelIterator( | |
1099 | ro, file_options_, *begin, *end, level, &overlap); | |
1100 | if (!status.ok()) { | |
1101 | overlap = current_version->storage_info()->OverlapInLevel( | |
1102 | level, begin, end); | |
1103 | } | |
1104 | } else { | |
1105 | overlap = current_version->storage_info()->OverlapInLevel(level, | |
1106 | begin, end); | |
7c673cae | 1107 | } |
20effc67 TL |
1108 | if (overlap) { |
1109 | if (first_overlapped_level == kInvalidLevel) { | |
1110 | first_overlapped_level = level; | |
1111 | } | |
1112 | max_overlapped_level = level; | |
7c673cae FG |
1113 | } |
1114 | } | |
20effc67 TL |
1115 | CleanupSuperVersion(super_version); |
1116 | } | |
1117 | if (s.ok() && first_overlapped_level != kInvalidLevel) { | |
1118 | // max_file_num_to_ignore can be used to filter out newly created SST | |
1119 | // files, useful for bottom level compaction in a manual compaction | |
1e59de90 | 1120 | uint64_t max_file_num_to_ignore = std::numeric_limits<uint64_t>::max(); |
20effc67 TL |
1121 | uint64_t next_file_number = versions_->current_next_file_number(); |
1122 | final_output_level = max_overlapped_level; | |
1123 | int output_level; | |
1124 | for (int level = first_overlapped_level; level <= max_overlapped_level; | |
1125 | level++) { | |
1126 | bool disallow_trivial_move = false; | |
1127 | // in case the compaction is universal or if we're compacting the | |
1128 | // bottom-most level, the output level will be the same as input one. | |
1129 | // level 0 can never be the bottommost level (i.e. if all files are in | |
1130 | // level 0, we will compact to level 1) | |
1131 | if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal || | |
1132 | cfd->ioptions()->compaction_style == kCompactionStyleFIFO) { | |
1133 | output_level = level; | |
1134 | } else if (level == max_overlapped_level && level > 0) { | |
1135 | if (options.bottommost_level_compaction == | |
1136 | BottommostLevelCompaction::kSkip) { | |
1137 | // Skip bottommost level compaction | |
1138 | continue; | |
1139 | } else if (options.bottommost_level_compaction == | |
1140 | BottommostLevelCompaction::kIfHaveCompactionFilter && | |
1141 | cfd->ioptions()->compaction_filter == nullptr && | |
1142 | cfd->ioptions()->compaction_filter_factory == nullptr) { | |
1143 | // Skip bottommost level compaction since we don't have a compaction | |
1144 | // filter | |
1145 | continue; | |
1146 | } | |
1147 | output_level = level; | |
1148 | // update max_file_num_to_ignore only for bottom level compaction | |
1149 | // because data in newly compacted files in middle levels may still | |
1150 | // need to be pushed down | |
1151 | max_file_num_to_ignore = next_file_number; | |
1152 | } else { | |
1153 | output_level = level + 1; | |
1154 | if (cfd->ioptions()->compaction_style == kCompactionStyleLevel && | |
1155 | cfd->ioptions()->level_compaction_dynamic_level_bytes && | |
1156 | level == 0) { | |
1157 | output_level = ColumnFamilyData::kCompactToBaseLevel; | |
1158 | } | |
1159 | // if it's a BottommostLevel compaction and `kForce*` compaction is | |
1160 | // set, disallow trivial move | |
1161 | if (level == max_overlapped_level && | |
1162 | (options.bottommost_level_compaction == | |
1163 | BottommostLevelCompaction::kForce || | |
1164 | options.bottommost_level_compaction == | |
1165 | BottommostLevelCompaction::kForceOptimized)) { | |
1166 | disallow_trivial_move = true; | |
1167 | } | |
1168 | } | |
1e59de90 TL |
1169 | // trim_ts need real compaction to remove latest record |
1170 | if (!trim_ts.empty()) { | |
1171 | disallow_trivial_move = true; | |
1172 | } | |
20effc67 TL |
1173 | s = RunManualCompaction(cfd, level, output_level, options, begin, end, |
1174 | exclusive, disallow_trivial_move, | |
1e59de90 | 1175 | max_file_num_to_ignore, trim_ts); |
20effc67 TL |
1176 | if (!s.ok()) { |
1177 | break; | |
1178 | } | |
1179 | if (output_level == ColumnFamilyData::kCompactToBaseLevel) { | |
1180 | final_output_level = cfd->NumberLevels() - 1; | |
1181 | } else if (output_level > final_output_level) { | |
1182 | final_output_level = output_level; | |
1183 | } | |
1184 | TEST_SYNC_POINT("DBImpl::RunManualCompaction()::1"); | |
1185 | TEST_SYNC_POINT("DBImpl::RunManualCompaction()::2"); | |
7c673cae | 1186 | } |
7c673cae FG |
1187 | } |
1188 | } | |
20effc67 | 1189 | if (!s.ok() || final_output_level == kInvalidLevel) { |
7c673cae FG |
1190 | LogFlush(immutable_db_options_.info_log); |
1191 | return s; | |
1192 | } | |
1193 | ||
1194 | if (options.change_level) { | |
20effc67 TL |
1195 | TEST_SYNC_POINT("DBImpl::CompactRange:BeforeRefit:1"); |
1196 | TEST_SYNC_POINT("DBImpl::CompactRange:BeforeRefit:2"); | |
1197 | ||
7c673cae FG |
1198 | ROCKS_LOG_INFO(immutable_db_options_.info_log, |
1199 | "[RefitLevel] waiting for background threads to stop"); | |
20effc67 | 1200 | DisableManualCompaction(); |
7c673cae FG |
1201 | s = PauseBackgroundWork(); |
1202 | if (s.ok()) { | |
20effc67 | 1203 | TEST_SYNC_POINT("DBImpl::CompactRange:PreRefitLevel"); |
7c673cae | 1204 | s = ReFitLevel(cfd, final_output_level, options.target_level); |
20effc67 TL |
1205 | TEST_SYNC_POINT("DBImpl::CompactRange:PostRefitLevel"); |
1206 | // ContinueBackgroundWork always return Status::OK(). | |
1207 | Status temp_s = ContinueBackgroundWork(); | |
1208 | assert(temp_s.ok()); | |
7c673cae | 1209 | } |
20effc67 | 1210 | EnableManualCompaction(); |
1e59de90 TL |
1211 | TEST_SYNC_POINT( |
1212 | "DBImpl::CompactRange:PostRefitLevel:ManualCompactionEnabled"); | |
7c673cae FG |
1213 | } |
1214 | LogFlush(immutable_db_options_.info_log); | |
1215 | ||
1216 | { | |
1217 | InstrumentedMutexLock l(&mutex_); | |
1218 | // an automatic compaction that has been scheduled might have been | |
1219 | // preempted by the manual compactions. Need to schedule it back. | |
1220 | MaybeScheduleFlushOrCompaction(); | |
1221 | } | |
1222 | ||
1223 | return s; | |
1224 | } | |
1225 | ||
11fdf7f2 TL |
1226 | Status DBImpl::CompactFiles(const CompactionOptions& compact_options, |
1227 | ColumnFamilyHandle* column_family, | |
1228 | const std::vector<std::string>& input_file_names, | |
1229 | const int output_level, const int output_path_id, | |
494da23a TL |
1230 | std::vector<std::string>* const output_file_names, |
1231 | CompactionJobInfo* compaction_job_info) { | |
7c673cae | 1232 | #ifdef ROCKSDB_LITE |
11fdf7f2 TL |
1233 | (void)compact_options; |
1234 | (void)column_family; | |
1235 | (void)input_file_names; | |
1236 | (void)output_level; | |
1237 | (void)output_path_id; | |
1238 | (void)output_file_names; | |
494da23a | 1239 | (void)compaction_job_info; |
11fdf7f2 | 1240 | // not supported in lite version |
7c673cae FG |
1241 | return Status::NotSupported("Not supported in ROCKSDB LITE"); |
1242 | #else | |
1243 | if (column_family == nullptr) { | |
1244 | return Status::InvalidArgument("ColumnFamilyHandle must be non-null."); | |
1245 | } | |
1246 | ||
20effc67 TL |
1247 | auto cfd = |
1248 | static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd(); | |
7c673cae FG |
1249 | assert(cfd); |
1250 | ||
1251 | Status s; | |
1e59de90 | 1252 | JobContext job_context(next_job_id_.fetch_add(1), true); |
7c673cae FG |
1253 | LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, |
1254 | immutable_db_options_.info_log.get()); | |
1255 | ||
1256 | // Perform CompactFiles | |
494da23a | 1257 | TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile2"); |
1e59de90 TL |
1258 | TEST_SYNC_POINT_CALLBACK( |
1259 | "TestCompactFiles:PausingManualCompaction:3", | |
1260 | reinterpret_cast<void*>( | |
1261 | const_cast<std::atomic<int>*>(&manual_compaction_paused_))); | |
7c673cae FG |
1262 | { |
1263 | InstrumentedMutexLock l(&mutex_); | |
1264 | ||
1265 | // This call will unlock/lock the mutex to wait for current running | |
1266 | // IngestExternalFile() calls to finish. | |
1267 | WaitForIngestFile(); | |
1268 | ||
494da23a TL |
1269 | // We need to get current after `WaitForIngestFile`, because |
1270 | // `IngestExternalFile` may add files that overlap with `input_file_names` | |
1271 | auto* current = cfd->current(); | |
1272 | current->Ref(); | |
1273 | ||
1274 | s = CompactFilesImpl(compact_options, cfd, current, input_file_names, | |
11fdf7f2 | 1275 | output_file_names, output_level, output_path_id, |
494da23a TL |
1276 | &job_context, &log_buffer, compaction_job_info); |
1277 | ||
1278 | current->Unref(); | |
7c673cae FG |
1279 | } |
1280 | ||
1281 | // Find and delete obsolete files | |
1282 | { | |
1283 | InstrumentedMutexLock l(&mutex_); | |
1284 | // If !s.ok(), this means that Compaction failed. In that case, we want | |
1285 | // to delete all obsolete files we might have created and we force | |
1286 | // FindObsoleteFiles(). This is because job_context does not | |
1287 | // catch all created files if compaction failed. | |
1288 | FindObsoleteFiles(&job_context, !s.ok()); | |
1289 | } // release the mutex | |
1290 | ||
1291 | // delete unnecessary files if any, this is done outside the mutex | |
11fdf7f2 TL |
1292 | if (job_context.HaveSomethingToClean() || |
1293 | job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { | |
7c673cae FG |
1294 | // Have to flush the info logs before bg_compaction_scheduled_-- |
1295 | // because if bg_flush_scheduled_ becomes 0 and the lock is | |
1296 | // released, the deconstructor of DB can kick in and destroy all the | |
1297 | // states of DB so info_log might not be available after that point. | |
1298 | // It also applies to access other states that DB owns. | |
1299 | log_buffer.FlushBufferToLog(); | |
1300 | if (job_context.HaveSomethingToDelete()) { | |
1301 | // no mutex is locked here. No need to Unlock() and Lock() here. | |
1302 | PurgeObsoleteFiles(job_context); | |
1303 | } | |
1304 | job_context.Clean(); | |
1305 | } | |
1306 | ||
1307 | return s; | |
1308 | #endif // ROCKSDB_LITE | |
1309 | } | |
1310 | ||
1311 | #ifndef ROCKSDB_LITE | |
1312 | Status DBImpl::CompactFilesImpl( | |
1313 | const CompactionOptions& compact_options, ColumnFamilyData* cfd, | |
1314 | Version* version, const std::vector<std::string>& input_file_names, | |
11fdf7f2 | 1315 | std::vector<std::string>* const output_file_names, const int output_level, |
494da23a TL |
1316 | int output_path_id, JobContext* job_context, LogBuffer* log_buffer, |
1317 | CompactionJobInfo* compaction_job_info) { | |
7c673cae FG |
1318 | mutex_.AssertHeld(); |
1319 | ||
1320 | if (shutting_down_.load(std::memory_order_acquire)) { | |
1321 | return Status::ShutdownInProgress(); | |
1322 | } | |
20effc67 | 1323 | if (manual_compaction_paused_.load(std::memory_order_acquire) > 0) { |
f67539c2 TL |
1324 | return Status::Incomplete(Status::SubCode::kManualCompactionPaused); |
1325 | } | |
7c673cae FG |
1326 | |
1327 | std::unordered_set<uint64_t> input_set; | |
494da23a | 1328 | for (const auto& file_name : input_file_names) { |
7c673cae FG |
1329 | input_set.insert(TableFileNameToNumber(file_name)); |
1330 | } | |
1331 | ||
1332 | ColumnFamilyMetaData cf_meta; | |
1333 | // TODO(yhchiang): can directly use version here if none of the | |
1334 | // following functions call is pluggable to external developers. | |
1335 | version->GetColumnFamilyMetaData(&cf_meta); | |
1336 | ||
1337 | if (output_path_id < 0) { | |
11fdf7f2 | 1338 | if (cfd->ioptions()->cf_paths.size() == 1U) { |
7c673cae FG |
1339 | output_path_id = 0; |
1340 | } else { | |
1341 | return Status::NotSupported( | |
1342 | "Automatic output path selection is not " | |
1343 | "yet supported in CompactFiles()"); | |
1344 | } | |
1345 | } | |
1346 | ||
1347 | Status s = cfd->compaction_picker()->SanitizeCompactionInputFiles( | |
1348 | &input_set, cf_meta, output_level); | |
1349 | if (!s.ok()) { | |
1350 | return s; | |
1351 | } | |
1352 | ||
1353 | std::vector<CompactionInputFiles> input_files; | |
1354 | s = cfd->compaction_picker()->GetCompactionInputsFromFileNumbers( | |
1355 | &input_files, &input_set, version->storage_info(), compact_options); | |
1356 | if (!s.ok()) { | |
1357 | return s; | |
1358 | } | |
1359 | ||
494da23a | 1360 | for (const auto& inputs : input_files) { |
7c673cae FG |
1361 | if (cfd->compaction_picker()->AreFilesInCompaction(inputs.files)) { |
1362 | return Status::Aborted( | |
1363 | "Some of the necessary compaction input " | |
1364 | "files are already being compacted"); | |
1365 | } | |
1366 | } | |
11fdf7f2 TL |
1367 | bool sfm_reserved_compact_space = false; |
1368 | // First check if we have enough room to do the compaction | |
1369 | bool enough_room = EnoughRoomForCompaction( | |
1370 | cfd, input_files, &sfm_reserved_compact_space, log_buffer); | |
1371 | ||
1372 | if (!enough_room) { | |
1373 | // m's vars will get set properly at the end of this function, | |
1374 | // as long as status == CompactionTooLarge | |
1375 | return Status::CompactionTooLarge(); | |
1376 | } | |
7c673cae FG |
1377 | |
1378 | // At this point, CompactFiles will be run. | |
1379 | bg_compaction_scheduled_++; | |
1380 | ||
494da23a | 1381 | std::unique_ptr<Compaction> c; |
7c673cae FG |
1382 | assert(cfd->compaction_picker()); |
1383 | c.reset(cfd->compaction_picker()->CompactFiles( | |
1384 | compact_options, input_files, output_level, version->storage_info(), | |
20effc67 | 1385 | *cfd->GetLatestMutableCFOptions(), mutable_db_options_, output_path_id)); |
11fdf7f2 TL |
1386 | // we already sanitized the set of input files and checked for conflicts |
1387 | // without releasing the lock, so we're guaranteed a compaction can be formed. | |
1388 | assert(c != nullptr); | |
1389 | ||
7c673cae FG |
1390 | c->SetInputVersion(version); |
1391 | // deletion compaction currently not allowed in CompactFiles. | |
1392 | assert(!c->deletion_compaction()); | |
1393 | ||
494da23a | 1394 | std::vector<SequenceNumber> snapshot_seqs; |
7c673cae | 1395 | SequenceNumber earliest_write_conflict_snapshot; |
494da23a TL |
1396 | SnapshotChecker* snapshot_checker; |
1397 | GetSnapshotContext(job_context, &snapshot_seqs, | |
1398 | &earliest_write_conflict_snapshot, &snapshot_checker); | |
7c673cae | 1399 | |
f67539c2 TL |
1400 | std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem( |
1401 | new std::list<uint64_t>::iterator( | |
1402 | CaptureCurrentFileNumberInPendingOutputs())); | |
7c673cae FG |
1403 | |
1404 | assert(is_snapshot_supported_ || snapshots_.empty()); | |
494da23a | 1405 | CompactionJobStats compaction_job_stats; |
7c673cae | 1406 | CompactionJob compaction_job( |
1e59de90 | 1407 | job_context->job_id, c.get(), immutable_db_options_, mutable_db_options_, |
f67539c2 | 1408 | file_options_for_compaction_, versions_.get(), &shutting_down_, |
1e59de90 | 1409 | log_buffer, directories_.GetDbDir(), |
20effc67 TL |
1410 | GetDataDir(c->column_family_data(), c->output_path_id()), |
1411 | GetDataDir(c->column_family_data(), 0), stats_, &mutex_, &error_handler_, | |
1412 | snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, | |
1e59de90 | 1413 | job_context, table_cache_, &event_logger_, |
11fdf7f2 | 1414 | c->mutable_cf_options()->paranoid_file_checks, |
7c673cae | 1415 | c->mutable_cf_options()->report_bg_io_stats, dbname_, |
20effc67 | 1416 | &compaction_job_stats, Env::Priority::USER, io_tracer_, |
1e59de90 TL |
1417 | kManualCompactionCanceledFalse_, db_id_, db_session_id_, |
1418 | c->column_family_data()->GetFullHistoryTsLow(), c->trim_ts(), | |
1419 | &blob_callback_, &bg_compaction_scheduled_, | |
1420 | &bg_bottom_compaction_scheduled_); | |
7c673cae FG |
1421 | |
1422 | // Creating a compaction influences the compaction score because the score | |
1423 | // takes running compactions into account (by skipping files that are already | |
1424 | // being compacted). Since we just changed compaction score, we recalculate it | |
1425 | // here. | |
1426 | version->storage_info()->ComputeCompactionScore(*cfd->ioptions(), | |
1427 | *c->mutable_cf_options()); | |
1428 | ||
1429 | compaction_job.Prepare(); | |
1430 | ||
1431 | mutex_.Unlock(); | |
1432 | TEST_SYNC_POINT("CompactFilesImpl:0"); | |
1433 | TEST_SYNC_POINT("CompactFilesImpl:1"); | |
1e59de90 TL |
1434 | // Ignore the status here, as it will be checked in the Install down below... |
1435 | compaction_job.Run().PermitUncheckedError(); | |
7c673cae FG |
1436 | TEST_SYNC_POINT("CompactFilesImpl:2"); |
1437 | TEST_SYNC_POINT("CompactFilesImpl:3"); | |
1438 | mutex_.Lock(); | |
1439 | ||
1440 | Status status = compaction_job.Install(*c->mutable_cf_options()); | |
1441 | if (status.ok()) { | |
20effc67 | 1442 | assert(compaction_job.io_status().ok()); |
494da23a TL |
1443 | InstallSuperVersionAndScheduleWork(c->column_family_data(), |
1444 | &job_context->superversion_contexts[0], | |
1445 | *c->mutable_cf_options()); | |
7c673cae | 1446 | } |
20effc67 TL |
1447 | // status above captures any error during compaction_job.Install, so its ok |
1448 | // not check compaction_job.io_status() explicitly if we're not calling | |
1449 | // SetBGError | |
1450 | compaction_job.io_status().PermitUncheckedError(); | |
7c673cae | 1451 | c->ReleaseCompactionFiles(s); |
11fdf7f2 TL |
1452 | #ifndef ROCKSDB_LITE |
1453 | // Need to make sure SstFileManager does its bookkeeping | |
1454 | auto sfm = static_cast<SstFileManagerImpl*>( | |
1455 | immutable_db_options_.sst_file_manager.get()); | |
1456 | if (sfm && sfm_reserved_compact_space) { | |
1457 | sfm->OnCompactionCompletion(c.get()); | |
1458 | } | |
1459 | #endif // ROCKSDB_LITE | |
7c673cae FG |
1460 | |
1461 | ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); | |
1462 | ||
494da23a TL |
1463 | if (compaction_job_info != nullptr) { |
1464 | BuildCompactionJobInfo(cfd, c.get(), s, compaction_job_stats, | |
1465 | job_context->job_id, version, compaction_job_info); | |
1466 | } | |
1467 | ||
7c673cae FG |
1468 | if (status.ok()) { |
1469 | // Done | |
f67539c2 | 1470 | } else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) { |
7c673cae | 1471 | // Ignore compaction errors found during shutting down |
f67539c2 TL |
1472 | } else if (status.IsManualCompactionPaused()) { |
1473 | // Don't report stopping manual compaction as error | |
1474 | ROCKS_LOG_INFO(immutable_db_options_.info_log, | |
1475 | "[%s] [JOB %d] Stopping manual compaction", | |
1476 | c->column_family_data()->GetName().c_str(), | |
1477 | job_context->job_id); | |
7c673cae FG |
1478 | } else { |
1479 | ROCKS_LOG_WARN(immutable_db_options_.info_log, | |
1480 | "[%s] [JOB %d] Compaction error: %s", | |
1481 | c->column_family_data()->GetName().c_str(), | |
1482 | job_context->job_id, status.ToString().c_str()); | |
20effc67 TL |
1483 | IOStatus io_s = compaction_job.io_status(); |
1484 | if (!io_s.ok()) { | |
1e59de90 | 1485 | error_handler_.SetBGError(io_s, BackgroundErrorReason::kCompaction); |
20effc67 | 1486 | } else { |
1e59de90 | 1487 | error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction); |
20effc67 | 1488 | } |
11fdf7f2 TL |
1489 | } |
1490 | ||
1491 | if (output_file_names != nullptr) { | |
20effc67 | 1492 | for (const auto& newf : c->edit()->GetNewFiles()) { |
1e59de90 TL |
1493 | output_file_names->push_back(TableFileName( |
1494 | c->immutable_options()->cf_paths, newf.second.fd.GetNumber(), | |
1495 | newf.second.fd.GetPathId())); | |
1496 | } | |
1497 | ||
1498 | for (const auto& blob_file : c->edit()->GetBlobFileAdditions()) { | |
1499 | output_file_names->push_back( | |
1500 | BlobFileName(c->immutable_options()->cf_paths.front().path, | |
1501 | blob_file.GetBlobFileNumber())); | |
7c673cae FG |
1502 | } |
1503 | } | |
1504 | ||
1505 | c.reset(); | |
1506 | ||
1507 | bg_compaction_scheduled_--; | |
1508 | if (bg_compaction_scheduled_ == 0) { | |
1509 | bg_cv_.SignalAll(); | |
1510 | } | |
494da23a | 1511 | MaybeScheduleFlushOrCompaction(); |
11fdf7f2 | 1512 | TEST_SYNC_POINT("CompactFilesImpl:End"); |
7c673cae FG |
1513 | |
1514 | return status; | |
1515 | } | |
1516 | #endif // ROCKSDB_LITE | |
1517 | ||
1518 | Status DBImpl::PauseBackgroundWork() { | |
1519 | InstrumentedMutexLock guard_lock(&mutex_); | |
1520 | bg_compaction_paused_++; | |
11fdf7f2 TL |
1521 | while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0 || |
1522 | bg_flush_scheduled_ > 0) { | |
7c673cae FG |
1523 | bg_cv_.Wait(); |
1524 | } | |
1525 | bg_work_paused_++; | |
1526 | return Status::OK(); | |
1527 | } | |
1528 | ||
1529 | Status DBImpl::ContinueBackgroundWork() { | |
1530 | InstrumentedMutexLock guard_lock(&mutex_); | |
1531 | if (bg_work_paused_ == 0) { | |
1532 | return Status::InvalidArgument(); | |
1533 | } | |
1534 | assert(bg_work_paused_ > 0); | |
1535 | assert(bg_compaction_paused_ > 0); | |
1536 | bg_compaction_paused_--; | |
1537 | bg_work_paused_--; | |
1538 | // It's sufficient to check just bg_work_paused_ here since | |
1539 | // bg_work_paused_ is always no greater than bg_compaction_paused_ | |
1540 | if (bg_work_paused_ == 0) { | |
1541 | MaybeScheduleFlushOrCompaction(); | |
1542 | } | |
1543 | return Status::OK(); | |
1544 | } | |
1545 | ||
494da23a TL |
1546 | void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c, |
1547 | const Status& st, | |
1548 | const CompactionJobStats& job_stats, | |
1549 | int job_id) { | |
7c673cae | 1550 | #ifndef ROCKSDB_LITE |
494da23a | 1551 | if (immutable_db_options_.listeners.empty()) { |
7c673cae FG |
1552 | return; |
1553 | } | |
1554 | mutex_.AssertHeld(); | |
1555 | if (shutting_down_.load(std::memory_order_acquire)) { | |
1556 | return; | |
1557 | } | |
f67539c2 | 1558 | if (c->is_manual_compaction() && |
20effc67 | 1559 | manual_compaction_paused_.load(std::memory_order_acquire) > 0) { |
f67539c2 TL |
1560 | return; |
1561 | } | |
1e59de90 TL |
1562 | |
1563 | c->SetNotifyOnCompactionCompleted(); | |
11fdf7f2 TL |
1564 | Version* current = cfd->current(); |
1565 | current->Ref(); | |
7c673cae FG |
1566 | // release lock while notifying events |
1567 | mutex_.Unlock(); | |
494da23a | 1568 | TEST_SYNC_POINT("DBImpl::NotifyOnCompactionBegin::UnlockMutex"); |
7c673cae | 1569 | { |
f67539c2 | 1570 | CompactionJobInfo info{}; |
20effc67 | 1571 | BuildCompactionJobInfo(cfd, c, st, job_stats, job_id, current, &info); |
494da23a TL |
1572 | for (auto listener : immutable_db_options_.listeners) { |
1573 | listener->OnCompactionBegin(this, info); | |
1574 | } | |
20effc67 | 1575 | info.status.PermitUncheckedError(); |
494da23a TL |
1576 | } |
1577 | mutex_.Lock(); | |
1578 | current->Unref(); | |
1579 | #else | |
1580 | (void)cfd; | |
1581 | (void)c; | |
1582 | (void)st; | |
1583 | (void)job_stats; | |
1584 | (void)job_id; | |
1585 | #endif // ROCKSDB_LITE | |
1586 | } | |
1587 | ||
1588 | void DBImpl::NotifyOnCompactionCompleted( | |
1589 | ColumnFamilyData* cfd, Compaction* c, const Status& st, | |
1590 | const CompactionJobStats& compaction_job_stats, const int job_id) { | |
1591 | #ifndef ROCKSDB_LITE | |
1592 | if (immutable_db_options_.listeners.size() == 0U) { | |
1593 | return; | |
1594 | } | |
1595 | mutex_.AssertHeld(); | |
1596 | if (shutting_down_.load(std::memory_order_acquire)) { | |
1597 | return; | |
1598 | } | |
1e59de90 TL |
1599 | |
1600 | if (c->ShouldNotifyOnCompactionCompleted() == false) { | |
f67539c2 TL |
1601 | return; |
1602 | } | |
1e59de90 | 1603 | |
494da23a TL |
1604 | Version* current = cfd->current(); |
1605 | current->Ref(); | |
1606 | // release lock while notifying events | |
1607 | mutex_.Unlock(); | |
1608 | TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex"); | |
1609 | { | |
f67539c2 | 1610 | CompactionJobInfo info{}; |
494da23a TL |
1611 | BuildCompactionJobInfo(cfd, c, st, compaction_job_stats, job_id, current, |
1612 | &info); | |
7c673cae FG |
1613 | for (auto listener : immutable_db_options_.listeners) { |
1614 | listener->OnCompactionCompleted(this, info); | |
1615 | } | |
1616 | } | |
1617 | mutex_.Lock(); | |
11fdf7f2 | 1618 | current->Unref(); |
7c673cae FG |
1619 | // no need to signal bg_cv_ as it will be signaled at the end of the |
1620 | // flush process. | |
11fdf7f2 TL |
1621 | #else |
1622 | (void)cfd; | |
1623 | (void)c; | |
1624 | (void)st; | |
1625 | (void)compaction_job_stats; | |
1626 | (void)job_id; | |
7c673cae FG |
1627 | #endif // ROCKSDB_LITE |
1628 | } | |
1629 | ||
1630 | // REQUIREMENT: block all background work by calling PauseBackgroundWork() | |
1631 | // before calling this function | |
1632 | Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { | |
1633 | assert(level < cfd->NumberLevels()); | |
1634 | if (target_level >= cfd->NumberLevels()) { | |
1635 | return Status::InvalidArgument("Target level exceeds number of levels"); | |
1636 | } | |
1637 | ||
11fdf7f2 | 1638 | SuperVersionContext sv_context(/* create_superversion */ true); |
7c673cae | 1639 | |
7c673cae FG |
1640 | InstrumentedMutexLock guard_lock(&mutex_); |
1641 | ||
1642 | // only allow one thread refitting | |
1643 | if (refitting_level_) { | |
1644 | ROCKS_LOG_INFO(immutable_db_options_.info_log, | |
1645 | "[ReFitLevel] another thread is refitting"); | |
1646 | return Status::NotSupported("another thread is refitting"); | |
1647 | } | |
1648 | refitting_level_ = true; | |
1649 | ||
1650 | const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions(); | |
1651 | // move to a smaller level | |
1652 | int to_level = target_level; | |
1653 | if (target_level < 0) { | |
1654 | to_level = FindMinimumEmptyLevelFitting(cfd, mutable_cf_options, level); | |
1655 | } | |
1656 | ||
1657 | auto* vstorage = cfd->current()->storage_info(); | |
20effc67 TL |
1658 | if (to_level != level) { |
1659 | if (to_level > level) { | |
1660 | if (level == 0) { | |
1661 | refitting_level_ = false; | |
7c673cae | 1662 | return Status::NotSupported( |
20effc67 TL |
1663 | "Cannot change from level 0 to other levels."); |
1664 | } | |
1665 | // Check levels are empty for a trivial move | |
1666 | for (int l = level + 1; l <= to_level; l++) { | |
1667 | if (vstorage->NumLevelFiles(l) > 0) { | |
1668 | refitting_level_ = false; | |
1669 | return Status::NotSupported( | |
1670 | "Levels between source and target are not empty for a move."); | |
1671 | } | |
1672 | } | |
1673 | } else { | |
1674 | // to_level < level | |
1675 | // Check levels are empty for a trivial move | |
1676 | for (int l = to_level; l < level; l++) { | |
1677 | if (vstorage->NumLevelFiles(l) > 0) { | |
1678 | refitting_level_ = false; | |
1679 | return Status::NotSupported( | |
1680 | "Levels between source and target are not empty for a move."); | |
1681 | } | |
7c673cae FG |
1682 | } |
1683 | } | |
7c673cae FG |
1684 | ROCKS_LOG_DEBUG(immutable_db_options_.info_log, |
1685 | "[%s] Before refitting:\n%s", cfd->GetName().c_str(), | |
1686 | cfd->current()->DebugString().data()); | |
1687 | ||
1688 | VersionEdit edit; | |
1689 | edit.SetColumnFamily(cfd->GetID()); | |
1690 | for (const auto& f : vstorage->LevelFiles(level)) { | |
1691 | edit.DeleteFile(level, f->fd.GetNumber()); | |
1e59de90 TL |
1692 | edit.AddFile( |
1693 | to_level, f->fd.GetNumber(), f->fd.GetPathId(), f->fd.GetFileSize(), | |
1694 | f->smallest, f->largest, f->fd.smallest_seqno, f->fd.largest_seqno, | |
1695 | f->marked_for_compaction, f->temperature, f->oldest_blob_file_number, | |
1696 | f->oldest_ancester_time, f->file_creation_time, f->file_checksum, | |
1697 | f->file_checksum_func_name, f->unique_id); | |
7c673cae FG |
1698 | } |
1699 | ROCKS_LOG_DEBUG(immutable_db_options_.info_log, | |
1700 | "[%s] Apply version edit:\n%s", cfd->GetName().c_str(), | |
1701 | edit.DebugString().data()); | |
1702 | ||
1e59de90 TL |
1703 | Status status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, |
1704 | &mutex_, directories_.GetDbDir()); | |
1705 | ||
11fdf7f2 | 1706 | InstallSuperVersionAndScheduleWork(cfd, &sv_context, mutable_cf_options); |
7c673cae FG |
1707 | |
1708 | ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] LogAndApply: %s\n", | |
1709 | cfd->GetName().c_str(), status.ToString().data()); | |
1710 | ||
1711 | if (status.ok()) { | |
1712 | ROCKS_LOG_DEBUG(immutable_db_options_.info_log, | |
1713 | "[%s] After refitting:\n%s", cfd->GetName().c_str(), | |
1714 | cfd->current()->DebugString().data()); | |
1715 | } | |
1e59de90 TL |
1716 | sv_context.Clean(); |
1717 | refitting_level_ = false; | |
1718 | ||
1719 | return status; | |
7c673cae FG |
1720 | } |
1721 | ||
1722 | refitting_level_ = false; | |
1e59de90 | 1723 | return Status::OK(); |
7c673cae FG |
1724 | } |
1725 | ||
1726 | int DBImpl::NumberLevels(ColumnFamilyHandle* column_family) { | |
20effc67 | 1727 | auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family); |
7c673cae FG |
1728 | return cfh->cfd()->NumberLevels(); |
1729 | } | |
1730 | ||
11fdf7f2 | 1731 | int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle* /*column_family*/) { |
7c673cae FG |
1732 | return 0; |
1733 | } | |
1734 | ||
1735 | int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) { | |
20effc67 | 1736 | auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family); |
7c673cae | 1737 | InstrumentedMutexLock l(&mutex_); |
11fdf7f2 TL |
1738 | return cfh->cfd() |
1739 | ->GetSuperVersion() | |
1740 | ->mutable_cf_options.level0_stop_writes_trigger; | |
7c673cae FG |
1741 | } |
1742 | ||
1743 | Status DBImpl::Flush(const FlushOptions& flush_options, | |
1744 | ColumnFamilyHandle* column_family) { | |
20effc67 | 1745 | auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family); |
11fdf7f2 TL |
1746 | ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.", |
1747 | cfh->GetName().c_str()); | |
494da23a TL |
1748 | Status s; |
1749 | if (immutable_db_options_.atomic_flush) { | |
1750 | s = AtomicFlushMemTables({cfh->cfd()}, flush_options, | |
1751 | FlushReason::kManualFlush); | |
1752 | } else { | |
1753 | s = FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualFlush); | |
1754 | } | |
1755 | ||
11fdf7f2 TL |
1756 | ROCKS_LOG_INFO(immutable_db_options_.info_log, |
1757 | "[%s] Manual flush finished, status: %s\n", | |
1758 | cfh->GetName().c_str(), s.ToString().c_str()); | |
1759 | return s; | |
1760 | } | |
1761 | ||
494da23a TL |
1762 | Status DBImpl::Flush(const FlushOptions& flush_options, |
1763 | const std::vector<ColumnFamilyHandle*>& column_families) { | |
11fdf7f2 | 1764 | Status s; |
494da23a TL |
1765 | if (!immutable_db_options_.atomic_flush) { |
1766 | for (auto cfh : column_families) { | |
1767 | s = Flush(flush_options, cfh); | |
1768 | if (!s.ok()) { | |
1769 | break; | |
1770 | } | |
11fdf7f2 | 1771 | } |
494da23a TL |
1772 | } else { |
1773 | ROCKS_LOG_INFO(immutable_db_options_.info_log, | |
1774 | "Manual atomic flush start.\n" | |
1775 | "=====Column families:====="); | |
1776 | for (auto cfh : column_families) { | |
1777 | auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh); | |
1778 | ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s", | |
1779 | cfhi->GetName().c_str()); | |
11fdf7f2 | 1780 | } |
494da23a TL |
1781 | ROCKS_LOG_INFO(immutable_db_options_.info_log, |
1782 | "=====End of column families list====="); | |
1783 | autovector<ColumnFamilyData*> cfds; | |
1784 | std::for_each(column_families.begin(), column_families.end(), | |
1785 | [&cfds](ColumnFamilyHandle* elem) { | |
1786 | auto cfh = static_cast<ColumnFamilyHandleImpl*>(elem); | |
1787 | cfds.emplace_back(cfh->cfd()); | |
1788 | }); | |
1789 | s = AtomicFlushMemTables(cfds, flush_options, FlushReason::kManualFlush); | |
1790 | ROCKS_LOG_INFO(immutable_db_options_.info_log, | |
1791 | "Manual atomic flush finished, status: %s\n" | |
1792 | "=====Column families:=====", | |
1793 | s.ToString().c_str()); | |
1794 | for (auto cfh : column_families) { | |
1795 | auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh); | |
1796 | ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s", | |
1797 | cfhi->GetName().c_str()); | |
11fdf7f2 | 1798 | } |
494da23a TL |
1799 | ROCKS_LOG_INFO(immutable_db_options_.info_log, |
1800 | "=====End of column families list====="); | |
11fdf7f2 | 1801 | } |
11fdf7f2 | 1802 | return s; |
7c673cae FG |
1803 | } |
1804 | ||
f67539c2 TL |
1805 | Status DBImpl::RunManualCompaction( |
1806 | ColumnFamilyData* cfd, int input_level, int output_level, | |
1807 | const CompactRangeOptions& compact_range_options, const Slice* begin, | |
1808 | const Slice* end, bool exclusive, bool disallow_trivial_move, | |
1e59de90 | 1809 | uint64_t max_file_num_to_ignore, const std::string& trim_ts) { |
7c673cae FG |
1810 | assert(input_level == ColumnFamilyData::kCompactAllLevels || |
1811 | input_level >= 0); | |
1812 | ||
1813 | InternalKey begin_storage, end_storage; | |
1e59de90 | 1814 | CompactionArg* ca = nullptr; |
7c673cae FG |
1815 | |
1816 | bool scheduled = false; | |
1e59de90 TL |
1817 | bool unscheduled = false; |
1818 | Env::Priority thread_pool_priority = Env::Priority::TOTAL; | |
7c673cae | 1819 | bool manual_conflict = false; |
1e59de90 TL |
1820 | |
1821 | ManualCompactionState manual( | |
1822 | cfd, input_level, output_level, compact_range_options.target_path_id, | |
1823 | exclusive, disallow_trivial_move, compact_range_options.canceled); | |
7c673cae FG |
1824 | // For universal compaction, we enforce every manual compaction to compact |
1825 | // all files. | |
1826 | if (begin == nullptr || | |
1827 | cfd->ioptions()->compaction_style == kCompactionStyleUniversal || | |
1828 | cfd->ioptions()->compaction_style == kCompactionStyleFIFO) { | |
1829 | manual.begin = nullptr; | |
1830 | } else { | |
11fdf7f2 | 1831 | begin_storage.SetMinPossibleForUserKey(*begin); |
7c673cae FG |
1832 | manual.begin = &begin_storage; |
1833 | } | |
1834 | if (end == nullptr || | |
1835 | cfd->ioptions()->compaction_style == kCompactionStyleUniversal || | |
1836 | cfd->ioptions()->compaction_style == kCompactionStyleFIFO) { | |
1837 | manual.end = nullptr; | |
1838 | } else { | |
11fdf7f2 | 1839 | end_storage.SetMaxPossibleForUserKey(*end); |
7c673cae FG |
1840 | manual.end = &end_storage; |
1841 | } | |
1842 | ||
1843 | TEST_SYNC_POINT("DBImpl::RunManualCompaction:0"); | |
1844 | TEST_SYNC_POINT("DBImpl::RunManualCompaction:1"); | |
1845 | InstrumentedMutexLock l(&mutex_); | |
1846 | ||
1e59de90 TL |
1847 | if (manual_compaction_paused_ > 0) { |
1848 | // Does not make sense to `AddManualCompaction()` in this scenario since | |
1849 | // `DisableManualCompaction()` just waited for the manual compaction queue | |
1850 | // to drain. So return immediately. | |
1851 | TEST_SYNC_POINT("DBImpl::RunManualCompaction:PausedAtStart"); | |
1852 | manual.status = | |
1853 | Status::Incomplete(Status::SubCode::kManualCompactionPaused); | |
1854 | manual.done = true; | |
1855 | return manual.status; | |
1856 | } | |
1857 | ||
7c673cae FG |
1858 | // When a manual compaction arrives, temporarily disable scheduling of |
1859 | // non-manual compactions and wait until the number of scheduled compaction | |
1e59de90 TL |
1860 | // jobs drops to zero. This used to be needed to ensure that this manual |
1861 | // compaction can compact any range of keys/files. Now it is optional | |
1862 | // (see `CompactRangeOptions::exclusive_manual_compaction`). The use case for | |
1863 | // `exclusive_manual_compaction=true` is unclear beyond not trusting the code. | |
7c673cae FG |
1864 | // |
1865 | // HasPendingManualCompaction() is true when at least one thread is inside | |
1866 | // RunManualCompaction(), i.e. during that time no other compaction will | |
1867 | // get scheduled (see MaybeScheduleFlushOrCompaction). | |
1868 | // | |
1869 | // Note that the following loop doesn't stop more that one thread calling | |
1870 | // RunManualCompaction() from getting to the second while loop below. | |
1871 | // However, only one of them will actually schedule compaction, while | |
1872 | // others will wait on a condition variable until it completes. | |
1873 | ||
1874 | AddManualCompaction(&manual); | |
1875 | TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_); | |
1876 | if (exclusive) { | |
1e59de90 TL |
1877 | // Limitation: there's no way to wake up the below loop when user sets |
1878 | // `*manual.canceled`. So `CompactRangeOptions::exclusive_manual_compaction` | |
1879 | // and `CompactRangeOptions::canceled` might not work well together. | |
11fdf7f2 TL |
1880 | while (bg_bottom_compaction_scheduled_ > 0 || |
1881 | bg_compaction_scheduled_ > 0) { | |
1e59de90 TL |
1882 | if (manual_compaction_paused_ > 0 || manual.canceled == true) { |
1883 | // Pretend the error came from compaction so the below cleanup/error | |
1884 | // handling code can process it. | |
1885 | manual.done = true; | |
1886 | manual.status = | |
1887 | Status::Incomplete(Status::SubCode::kManualCompactionPaused); | |
1888 | break; | |
1889 | } | |
11fdf7f2 | 1890 | TEST_SYNC_POINT("DBImpl::RunManualCompaction:WaitScheduled"); |
7c673cae FG |
1891 | ROCKS_LOG_INFO( |
1892 | immutable_db_options_.info_log, | |
1893 | "[%s] Manual compaction waiting for all other scheduled background " | |
1894 | "compactions to finish", | |
1895 | cfd->GetName().c_str()); | |
1896 | bg_cv_.Wait(); | |
1897 | } | |
1898 | } | |
1899 | ||
494da23a TL |
1900 | LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, |
1901 | immutable_db_options_.info_log.get()); | |
1e59de90 TL |
1902 | |
1903 | ROCKS_LOG_BUFFER(&log_buffer, "[%s] Manual compaction starting", | |
1904 | cfd->GetName().c_str()); | |
1905 | ||
7c673cae FG |
1906 | // We don't check bg_error_ here, because if we get the error in compaction, |
1907 | // the compaction will set manual.status to bg_error_ and set manual.done to | |
1908 | // true. | |
1909 | while (!manual.done) { | |
1910 | assert(HasPendingManualCompaction()); | |
1911 | manual_conflict = false; | |
11fdf7f2 | 1912 | Compaction* compaction = nullptr; |
7c673cae FG |
1913 | if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) || |
1914 | scheduled || | |
11fdf7f2 TL |
1915 | (((manual.manual_end = &manual.tmp_storage1) != nullptr) && |
1916 | ((compaction = manual.cfd->CompactRange( | |
20effc67 TL |
1917 | *manual.cfd->GetLatestMutableCFOptions(), mutable_db_options_, |
1918 | manual.input_level, manual.output_level, compact_range_options, | |
1919 | manual.begin, manual.end, &manual.manual_end, &manual_conflict, | |
1e59de90 | 1920 | max_file_num_to_ignore, trim_ts)) == nullptr && |
11fdf7f2 | 1921 | manual_conflict))) { |
7c673cae FG |
1922 | // exclusive manual compactions should not see a conflict during |
1923 | // CompactRange | |
1924 | assert(!exclusive || !manual_conflict); | |
1925 | // Running either this or some other manual compaction | |
1926 | bg_cv_.Wait(); | |
1e59de90 TL |
1927 | if (manual_compaction_paused_ > 0 && scheduled && !unscheduled) { |
1928 | assert(thread_pool_priority != Env::Priority::TOTAL); | |
1929 | // unschedule all manual compactions | |
1930 | auto unscheduled_task_num = env_->UnSchedule( | |
1931 | GetTaskTag(TaskType::kManualCompaction), thread_pool_priority); | |
1932 | if (unscheduled_task_num > 0) { | |
1933 | ROCKS_LOG_INFO( | |
1934 | immutable_db_options_.info_log, | |
1935 | "[%s] Unscheduled %d number of manual compactions from the " | |
1936 | "thread-pool", | |
1937 | cfd->GetName().c_str(), unscheduled_task_num); | |
1938 | // it may unschedule other manual compactions, notify others. | |
1939 | bg_cv_.SignalAll(); | |
1940 | } | |
1941 | unscheduled = true; | |
1942 | TEST_SYNC_POINT("DBImpl::RunManualCompaction:Unscheduled"); | |
1943 | } | |
7c673cae FG |
1944 | if (scheduled && manual.incomplete == true) { |
1945 | assert(!manual.in_progress); | |
1946 | scheduled = false; | |
1947 | manual.incomplete = false; | |
1948 | } | |
1949 | } else if (!scheduled) { | |
11fdf7f2 | 1950 | if (compaction == nullptr) { |
7c673cae FG |
1951 | manual.done = true; |
1952 | bg_cv_.SignalAll(); | |
1953 | continue; | |
1954 | } | |
1955 | ca = new CompactionArg; | |
1956 | ca->db = this; | |
11fdf7f2 TL |
1957 | ca->prepicked_compaction = new PrepickedCompaction; |
1958 | ca->prepicked_compaction->manual_compaction_state = &manual; | |
1959 | ca->prepicked_compaction->compaction = compaction; | |
494da23a TL |
1960 | if (!RequestCompactionToken( |
1961 | cfd, true, &ca->prepicked_compaction->task_token, &log_buffer)) { | |
1962 | // Don't throttle manual compaction, only count outstanding tasks. | |
1963 | assert(false); | |
1964 | } | |
7c673cae | 1965 | manual.incomplete = false; |
20effc67 TL |
1966 | if (compaction->bottommost_level() && |
1967 | env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) { | |
1e59de90 TL |
1968 | bg_bottom_compaction_scheduled_++; |
1969 | ca->compaction_pri_ = Env::Priority::BOTTOM; | |
1970 | env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, | |
1971 | Env::Priority::BOTTOM, | |
1972 | GetTaskTag(TaskType::kManualCompaction), | |
1973 | &DBImpl::UnscheduleCompactionCallback); | |
1974 | thread_pool_priority = Env::Priority::BOTTOM; | |
1975 | } else { | |
1976 | bg_compaction_scheduled_++; | |
1977 | ca->compaction_pri_ = Env::Priority::LOW; | |
1978 | env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, | |
1979 | GetTaskTag(TaskType::kManualCompaction), | |
1980 | &DBImpl::UnscheduleCompactionCallback); | |
1981 | thread_pool_priority = Env::Priority::LOW; | |
20effc67 | 1982 | } |
7c673cae | 1983 | scheduled = true; |
1e59de90 | 1984 | TEST_SYNC_POINT("DBImpl::RunManualCompaction:Scheduled"); |
7c673cae FG |
1985 | } |
1986 | } | |
1987 | ||
494da23a | 1988 | log_buffer.FlushBufferToLog(); |
7c673cae FG |
1989 | assert(!manual.in_progress); |
1990 | assert(HasPendingManualCompaction()); | |
1991 | RemoveManualCompaction(&manual); | |
1e59de90 TL |
1992 | // if the manual job is unscheduled, try schedule other jobs in case there's |
1993 | // any unscheduled compaction job which was blocked by exclusive manual | |
1994 | // compaction. | |
1995 | if (manual.status.IsIncomplete() && | |
1996 | manual.status.subcode() == Status::SubCode::kManualCompactionPaused) { | |
1997 | MaybeScheduleFlushOrCompaction(); | |
1998 | } | |
7c673cae FG |
1999 | bg_cv_.SignalAll(); |
2000 | return manual.status; | |
2001 | } | |
2002 | ||
494da23a TL |
2003 | void DBImpl::GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds, |
2004 | FlushRequest* req) { | |
2005 | assert(req != nullptr); | |
2006 | req->reserve(cfds.size()); | |
2007 | for (const auto cfd : cfds) { | |
2008 | if (nullptr == cfd) { | |
2009 | // cfd may be null, see DBImpl::ScheduleFlushes | |
2010 | continue; | |
2011 | } | |
2012 | uint64_t max_memtable_id = cfd->imm()->GetLatestMemTableID(); | |
2013 | req->emplace_back(cfd, max_memtable_id); | |
2014 | } | |
2015 | } | |
2016 | ||
7c673cae FG |
2017 | Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, |
2018 | const FlushOptions& flush_options, | |
1e59de90 TL |
2019 | FlushReason flush_reason, |
2020 | bool entered_write_thread) { | |
2021 | // This method should not be called if atomic_flush is true. | |
2022 | assert(!immutable_db_options_.atomic_flush); | |
2023 | if (!flush_options.wait && write_controller_.IsStopped()) { | |
2024 | std::ostringstream oss; | |
2025 | oss << "Writes have been stopped, thus unable to perform manual flush. " | |
2026 | "Please try again later after writes are resumed"; | |
2027 | return Status::TryAgain(oss.str()); | |
2028 | } | |
7c673cae | 2029 | Status s; |
11fdf7f2 TL |
2030 | if (!flush_options.allow_write_stall) { |
2031 | bool flush_needed = true; | |
2032 | s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed); | |
2033 | TEST_SYNC_POINT("DBImpl::FlushMemTable:StallWaitDone"); | |
2034 | if (!s.ok() || !flush_needed) { | |
2035 | return s; | |
2036 | } | |
2037 | } | |
20effc67 | 2038 | |
1e59de90 TL |
2039 | const bool needs_to_join_write_thread = !entered_write_thread; |
2040 | autovector<FlushRequest> flush_reqs; | |
2041 | autovector<uint64_t> memtable_ids_to_wait; | |
7c673cae FG |
2042 | { |
2043 | WriteContext context; | |
2044 | InstrumentedMutexLock guard_lock(&mutex_); | |
2045 | ||
7c673cae | 2046 | WriteThread::Writer w; |
f67539c2 | 2047 | WriteThread::Writer nonmem_w; |
1e59de90 | 2048 | if (needs_to_join_write_thread) { |
7c673cae | 2049 | write_thread_.EnterUnbatched(&w, &mutex_); |
f67539c2 TL |
2050 | if (two_write_queues_) { |
2051 | nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); | |
2052 | } | |
7c673cae | 2053 | } |
f67539c2 | 2054 | WaitForPendingWrites(); |
7c673cae | 2055 | |
1e59de90 TL |
2056 | if (flush_reason != FlushReason::kErrorRecoveryRetryFlush && |
2057 | (!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load())) { | |
2058 | // Note that, when flush reason is kErrorRecoveryRetryFlush, during the | |
2059 | // auto retry resume, we want to avoid creating new small memtables. | |
2060 | // Therefore, SwitchMemtable will not be called. Also, since ResumeImpl | |
2061 | // will iterate through all the CFs and call FlushMemtable during auto | |
2062 | // retry resume, it is possible that in some CFs, | |
2063 | // cfd->imm()->NumNotFlushed() = 0. In this case, so no flush request will | |
2064 | // be created and scheduled, status::OK() will be returned. | |
2065 | s = SwitchMemtable(cfd, &context); | |
494da23a | 2066 | } |
1e59de90 | 2067 | const uint64_t flush_memtable_id = std::numeric_limits<uint64_t>::max(); |
494da23a TL |
2068 | if (s.ok()) { |
2069 | if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() || | |
2070 | !cached_recoverable_state_empty_.load()) { | |
1e59de90 TL |
2071 | FlushRequest req{{cfd, flush_memtable_id}}; |
2072 | flush_reqs.emplace_back(std::move(req)); | |
2073 | memtable_ids_to_wait.emplace_back(cfd->imm()->GetLatestMemTableID()); | |
494da23a | 2074 | } |
20effc67 TL |
2075 | if (immutable_db_options_.persist_stats_to_disk && |
2076 | flush_reason != FlushReason::kErrorRecoveryRetryFlush) { | |
f67539c2 TL |
2077 | ColumnFamilyData* cfd_stats = |
2078 | versions_->GetColumnFamilySet()->GetColumnFamily( | |
2079 | kPersistentStatsColumnFamilyName); | |
2080 | if (cfd_stats != nullptr && cfd_stats != cfd && | |
2081 | !cfd_stats->mem()->IsEmpty()) { | |
2082 | // only force flush stats CF when it will be the only CF lagging | |
2083 | // behind after the current flush | |
2084 | bool stats_cf_flush_needed = true; | |
2085 | for (auto* loop_cfd : *versions_->GetColumnFamilySet()) { | |
2086 | if (loop_cfd == cfd_stats || loop_cfd == cfd) { | |
2087 | continue; | |
2088 | } | |
2089 | if (loop_cfd->GetLogNumber() <= cfd_stats->GetLogNumber()) { | |
2090 | stats_cf_flush_needed = false; | |
2091 | } | |
2092 | } | |
2093 | if (stats_cf_flush_needed) { | |
2094 | ROCKS_LOG_INFO(immutable_db_options_.info_log, | |
2095 | "Force flushing stats CF with manual flush of %s " | |
2096 | "to avoid holding old logs", | |
2097 | cfd->GetName().c_str()); | |
2098 | s = SwitchMemtable(cfd_stats, &context); | |
1e59de90 TL |
2099 | FlushRequest req{{cfd_stats, flush_memtable_id}}; |
2100 | flush_reqs.emplace_back(std::move(req)); | |
2101 | memtable_ids_to_wait.emplace_back( | |
2102 | cfd->imm()->GetLatestMemTableID()); | |
f67539c2 TL |
2103 | } |
2104 | } | |
2105 | } | |
11fdf7f2 | 2106 | } |
1e59de90 TL |
2107 | |
2108 | if (s.ok() && !flush_reqs.empty()) { | |
2109 | for (const auto& req : flush_reqs) { | |
2110 | assert(req.size() == 1); | |
2111 | ColumnFamilyData* loop_cfd = req[0].first; | |
11fdf7f2 TL |
2112 | loop_cfd->imm()->FlushRequested(); |
2113 | } | |
f67539c2 TL |
2114 | // If the caller wants to wait for this flush to complete, it indicates |
2115 | // that the caller expects the ColumnFamilyData not to be free'ed by | |
2116 | // other threads which may drop the column family concurrently. | |
2117 | // Therefore, we increase the cfd's ref count. | |
2118 | if (flush_options.wait) { | |
1e59de90 TL |
2119 | for (const auto& req : flush_reqs) { |
2120 | assert(req.size() == 1); | |
2121 | ColumnFamilyData* loop_cfd = req[0].first; | |
f67539c2 TL |
2122 | loop_cfd->Ref(); |
2123 | } | |
2124 | } | |
1e59de90 TL |
2125 | for (const auto& req : flush_reqs) { |
2126 | SchedulePendingFlush(req, flush_reason); | |
2127 | } | |
11fdf7f2 TL |
2128 | MaybeScheduleFlushOrCompaction(); |
2129 | } | |
7c673cae | 2130 | |
1e59de90 | 2131 | if (needs_to_join_write_thread) { |
7c673cae | 2132 | write_thread_.ExitUnbatched(&w); |
f67539c2 TL |
2133 | if (two_write_queues_) { |
2134 | nonmem_write_thread_.ExitUnbatched(&nonmem_w); | |
2135 | } | |
7c673cae | 2136 | } |
7c673cae | 2137 | } |
f67539c2 TL |
2138 | TEST_SYNC_POINT("DBImpl::FlushMemTable:AfterScheduleFlush"); |
2139 | TEST_SYNC_POINT("DBImpl::FlushMemTable:BeforeWaitForBgFlush"); | |
7c673cae | 2140 | if (s.ok() && flush_options.wait) { |
11fdf7f2 TL |
2141 | autovector<ColumnFamilyData*> cfds; |
2142 | autovector<const uint64_t*> flush_memtable_ids; | |
1e59de90 TL |
2143 | assert(flush_reqs.size() == memtable_ids_to_wait.size()); |
2144 | for (size_t i = 0; i < flush_reqs.size(); ++i) { | |
2145 | assert(flush_reqs[i].size() == 1); | |
2146 | cfds.push_back(flush_reqs[i][0].first); | |
2147 | flush_memtable_ids.push_back(&(memtable_ids_to_wait[i])); | |
11fdf7f2 | 2148 | } |
20effc67 TL |
2149 | s = WaitForFlushMemTables( |
2150 | cfds, flush_memtable_ids, | |
2151 | (flush_reason == FlushReason::kErrorRecovery || | |
2152 | flush_reason == FlushReason::kErrorRecoveryRetryFlush)); | |
f67539c2 TL |
2153 | InstrumentedMutexLock lock_guard(&mutex_); |
2154 | for (auto* tmp_cfd : cfds) { | |
2155 | tmp_cfd->UnrefAndTryDelete(); | |
2156 | } | |
7c673cae | 2157 | } |
f67539c2 | 2158 | TEST_SYNC_POINT("DBImpl::FlushMemTable:FlushMemTableFinished"); |
7c673cae FG |
2159 | return s; |
2160 | } | |
2161 | ||
f67539c2 | 2162 | // Flush all elements in 'column_family_datas' |
494da23a TL |
2163 | // and atomically record the result to the MANIFEST. |
2164 | Status DBImpl::AtomicFlushMemTables( | |
2165 | const autovector<ColumnFamilyData*>& column_family_datas, | |
2166 | const FlushOptions& flush_options, FlushReason flush_reason, | |
1e59de90 TL |
2167 | bool entered_write_thread) { |
2168 | assert(immutable_db_options_.atomic_flush); | |
2169 | if (!flush_options.wait && write_controller_.IsStopped()) { | |
2170 | std::ostringstream oss; | |
2171 | oss << "Writes have been stopped, thus unable to perform manual flush. " | |
2172 | "Please try again later after writes are resumed"; | |
2173 | return Status::TryAgain(oss.str()); | |
2174 | } | |
494da23a TL |
2175 | Status s; |
2176 | if (!flush_options.allow_write_stall) { | |
2177 | int num_cfs_to_flush = 0; | |
2178 | for (auto cfd : column_family_datas) { | |
2179 | bool flush_needed = true; | |
2180 | s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed); | |
2181 | if (!s.ok()) { | |
2182 | return s; | |
2183 | } else if (flush_needed) { | |
2184 | ++num_cfs_to_flush; | |
2185 | } | |
2186 | } | |
2187 | if (0 == num_cfs_to_flush) { | |
2188 | return s; | |
2189 | } | |
2190 | } | |
1e59de90 | 2191 | const bool needs_to_join_write_thread = !entered_write_thread; |
494da23a TL |
2192 | FlushRequest flush_req; |
2193 | autovector<ColumnFamilyData*> cfds; | |
2194 | { | |
2195 | WriteContext context; | |
2196 | InstrumentedMutexLock guard_lock(&mutex_); | |
2197 | ||
2198 | WriteThread::Writer w; | |
f67539c2 | 2199 | WriteThread::Writer nonmem_w; |
1e59de90 | 2200 | if (needs_to_join_write_thread) { |
494da23a | 2201 | write_thread_.EnterUnbatched(&w, &mutex_); |
f67539c2 TL |
2202 | if (two_write_queues_) { |
2203 | nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); | |
2204 | } | |
494da23a | 2205 | } |
f67539c2 | 2206 | WaitForPendingWrites(); |
494da23a TL |
2207 | |
2208 | for (auto cfd : column_family_datas) { | |
2209 | if (cfd->IsDropped()) { | |
2210 | continue; | |
2211 | } | |
2212 | if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() || | |
2213 | !cached_recoverable_state_empty_.load()) { | |
2214 | cfds.emplace_back(cfd); | |
2215 | } | |
2216 | } | |
2217 | for (auto cfd : cfds) { | |
20effc67 TL |
2218 | if ((cfd->mem()->IsEmpty() && cached_recoverable_state_empty_.load()) || |
2219 | flush_reason == FlushReason::kErrorRecoveryRetryFlush) { | |
494da23a TL |
2220 | continue; |
2221 | } | |
2222 | cfd->Ref(); | |
2223 | s = SwitchMemtable(cfd, &context); | |
f67539c2 | 2224 | cfd->UnrefAndTryDelete(); |
494da23a TL |
2225 | if (!s.ok()) { |
2226 | break; | |
2227 | } | |
2228 | } | |
2229 | if (s.ok()) { | |
2230 | AssignAtomicFlushSeq(cfds); | |
2231 | for (auto cfd : cfds) { | |
2232 | cfd->imm()->FlushRequested(); | |
2233 | } | |
f67539c2 TL |
2234 | // If the caller wants to wait for this flush to complete, it indicates |
2235 | // that the caller expects the ColumnFamilyData not to be free'ed by | |
2236 | // other threads which may drop the column family concurrently. | |
2237 | // Therefore, we increase the cfd's ref count. | |
2238 | if (flush_options.wait) { | |
2239 | for (auto cfd : cfds) { | |
2240 | cfd->Ref(); | |
2241 | } | |
2242 | } | |
494da23a TL |
2243 | GenerateFlushRequest(cfds, &flush_req); |
2244 | SchedulePendingFlush(flush_req, flush_reason); | |
2245 | MaybeScheduleFlushOrCompaction(); | |
2246 | } | |
2247 | ||
1e59de90 | 2248 | if (needs_to_join_write_thread) { |
494da23a | 2249 | write_thread_.ExitUnbatched(&w); |
f67539c2 TL |
2250 | if (two_write_queues_) { |
2251 | nonmem_write_thread_.ExitUnbatched(&nonmem_w); | |
2252 | } | |
494da23a TL |
2253 | } |
2254 | } | |
2255 | TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:AfterScheduleFlush"); | |
f67539c2 | 2256 | TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush"); |
494da23a TL |
2257 | if (s.ok() && flush_options.wait) { |
2258 | autovector<const uint64_t*> flush_memtable_ids; | |
2259 | for (auto& iter : flush_req) { | |
2260 | flush_memtable_ids.push_back(&(iter.second)); | |
2261 | } | |
20effc67 TL |
2262 | s = WaitForFlushMemTables( |
2263 | cfds, flush_memtable_ids, | |
2264 | (flush_reason == FlushReason::kErrorRecovery || | |
2265 | flush_reason == FlushReason::kErrorRecoveryRetryFlush)); | |
f67539c2 TL |
2266 | InstrumentedMutexLock lock_guard(&mutex_); |
2267 | for (auto* cfd : cfds) { | |
2268 | cfd->UnrefAndTryDelete(); | |
2269 | } | |
494da23a TL |
2270 | } |
2271 | return s; | |
2272 | } | |
2273 | ||
11fdf7f2 TL |
2274 | // Calling FlushMemTable(), whether from DB::Flush() or from Backup Engine, can |
2275 | // cause write stall, for example if one memtable is being flushed already. | |
2276 | // This method tries to avoid write stall (similar to CompactRange() behavior) | |
2277 | // it emulates how the SuperVersion / LSM would change if flush happens, checks | |
2278 | // it against various constrains and delays flush if it'd cause write stall. | |
1e59de90 | 2279 | // Caller should check status and flush_needed to see if flush already happened. |
11fdf7f2 | 2280 | Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd, |
494da23a | 2281 | bool* flush_needed) { |
11fdf7f2 TL |
2282 | { |
2283 | *flush_needed = true; | |
2284 | InstrumentedMutexLock l(&mutex_); | |
2285 | uint64_t orig_active_memtable_id = cfd->mem()->GetID(); | |
2286 | WriteStallCondition write_stall_condition = WriteStallCondition::kNormal; | |
2287 | do { | |
2288 | if (write_stall_condition != WriteStallCondition::kNormal) { | |
494da23a TL |
2289 | // Same error handling as user writes: Don't wait if there's a |
2290 | // background error, even if it's a soft error. We might wait here | |
2291 | // indefinitely as the pending flushes/compactions may never finish | |
2292 | // successfully, resulting in the stall condition lasting indefinitely | |
2293 | if (error_handler_.IsBGWorkStopped()) { | |
2294 | return error_handler_.GetBGError(); | |
2295 | } | |
2296 | ||
11fdf7f2 TL |
2297 | TEST_SYNC_POINT("DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait"); |
2298 | ROCKS_LOG_INFO(immutable_db_options_.info_log, | |
2299 | "[%s] WaitUntilFlushWouldNotStallWrites" | |
2300 | " waiting on stall conditions to clear", | |
2301 | cfd->GetName().c_str()); | |
2302 | bg_cv_.Wait(); | |
2303 | } | |
f67539c2 TL |
2304 | if (cfd->IsDropped()) { |
2305 | return Status::ColumnFamilyDropped(); | |
2306 | } | |
2307 | if (shutting_down_.load(std::memory_order_acquire)) { | |
11fdf7f2 TL |
2308 | return Status::ShutdownInProgress(); |
2309 | } | |
2310 | ||
2311 | uint64_t earliest_memtable_id = | |
2312 | std::min(cfd->mem()->GetID(), cfd->imm()->GetEarliestMemTableID()); | |
2313 | if (earliest_memtable_id > orig_active_memtable_id) { | |
2314 | // We waited so long that the memtable we were originally waiting on was | |
2315 | // flushed. | |
2316 | *flush_needed = false; | |
2317 | return Status::OK(); | |
2318 | } | |
2319 | ||
2320 | const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions(); | |
2321 | const auto* vstorage = cfd->current()->storage_info(); | |
2322 | ||
2323 | // Skip stalling check if we're below auto-flush and auto-compaction | |
2324 | // triggers. If it stalled in these conditions, that'd mean the stall | |
2325 | // triggers are so low that stalling is needed for any background work. In | |
2326 | // that case we shouldn't wait since background work won't be scheduled. | |
2327 | if (cfd->imm()->NumNotFlushed() < | |
2328 | cfd->ioptions()->min_write_buffer_number_to_merge && | |
2329 | vstorage->l0_delay_trigger_count() < | |
2330 | mutable_cf_options.level0_file_num_compaction_trigger) { | |
2331 | break; | |
2332 | } | |
2333 | ||
2334 | // check whether one extra immutable memtable or an extra L0 file would | |
2335 | // cause write stalling mode to be entered. It could still enter stall | |
2336 | // mode due to pending compaction bytes, but that's less common | |
1e59de90 TL |
2337 | write_stall_condition = ColumnFamilyData::GetWriteStallConditionAndCause( |
2338 | cfd->imm()->NumNotFlushed() + 1, | |
2339 | vstorage->l0_delay_trigger_count() + 1, | |
2340 | vstorage->estimated_compaction_needed_bytes(), | |
2341 | mutable_cf_options, *cfd->ioptions()) | |
2342 | .first; | |
11fdf7f2 TL |
2343 | } while (write_stall_condition != WriteStallCondition::kNormal); |
2344 | } | |
2345 | return Status::OK(); | |
2346 | } | |
2347 | ||
2348 | // Wait for memtables to be flushed for multiple column families. | |
2349 | // let N = cfds.size() | |
2350 | // for i in [0, N), | |
2351 | // 1) if flush_memtable_ids[i] is not null, then the memtables with lower IDs | |
2352 | // have to be flushed for THIS column family; | |
2353 | // 2) if flush_memtable_ids[i] is null, then all memtables in THIS column | |
2354 | // family have to be flushed. | |
2355 | // Finish waiting when ALL column families finish flushing memtables. | |
494da23a TL |
2356 | // resuming_from_bg_err indicates whether the caller is trying to resume from |
2357 | // background error or in normal processing. | |
11fdf7f2 TL |
2358 | Status DBImpl::WaitForFlushMemTables( |
2359 | const autovector<ColumnFamilyData*>& cfds, | |
494da23a TL |
2360 | const autovector<const uint64_t*>& flush_memtable_ids, |
2361 | bool resuming_from_bg_err) { | |
11fdf7f2 | 2362 | int num = static_cast<int>(cfds.size()); |
7c673cae FG |
2363 | // Wait until the compaction completes |
2364 | InstrumentedMutexLock l(&mutex_); | |
1e59de90 | 2365 | Status s; |
494da23a TL |
2366 | // If the caller is trying to resume from bg error, then |
2367 | // error_handler_.IsDBStopped() is true. | |
2368 | while (resuming_from_bg_err || !error_handler_.IsDBStopped()) { | |
7c673cae | 2369 | if (shutting_down_.load(std::memory_order_acquire)) { |
1e59de90 TL |
2370 | s = Status::ShutdownInProgress(); |
2371 | return s; | |
7c673cae | 2372 | } |
494da23a | 2373 | // If an error has occurred during resumption, then no need to wait. |
1e59de90 TL |
2374 | // But flush operation may fail because of this error, so need to |
2375 | // return the status. | |
494da23a | 2376 | if (!error_handler_.GetRecoveryError().ok()) { |
1e59de90 | 2377 | s = error_handler_.GetRecoveryError(); |
494da23a TL |
2378 | break; |
2379 | } | |
20effc67 TL |
2380 | // If BGWorkStopped, which indicate that there is a BG error and |
2381 | // 1) soft error but requires no BG work, 2) no in auto_recovery_ | |
2382 | if (!resuming_from_bg_err && error_handler_.IsBGWorkStopped() && | |
2383 | error_handler_.GetBGError().severity() < Status::Severity::kHardError) { | |
1e59de90 TL |
2384 | s = error_handler_.GetBGError(); |
2385 | return s; | |
20effc67 TL |
2386 | } |
2387 | ||
11fdf7f2 TL |
2388 | // Number of column families that have been dropped. |
2389 | int num_dropped = 0; | |
2390 | // Number of column families that have finished flush. | |
2391 | int num_finished = 0; | |
2392 | for (int i = 0; i < num; ++i) { | |
2393 | if (cfds[i]->IsDropped()) { | |
2394 | ++num_dropped; | |
2395 | } else if (cfds[i]->imm()->NumNotFlushed() == 0 || | |
2396 | (flush_memtable_ids[i] != nullptr && | |
2397 | cfds[i]->imm()->GetEarliestMemTableID() > | |
2398 | *flush_memtable_ids[i])) { | |
2399 | ++num_finished; | |
2400 | } | |
2401 | } | |
2402 | if (1 == num_dropped && 1 == num) { | |
1e59de90 TL |
2403 | s = Status::ColumnFamilyDropped(); |
2404 | return s; | |
7c673cae | 2405 | } |
11fdf7f2 TL |
2406 | // Column families involved in this flush request have either been dropped |
2407 | // or finished flush. Then it's time to finish waiting. | |
2408 | if (num_dropped + num_finished == num) { | |
2409 | break; | |
2410 | } | |
7c673cae FG |
2411 | bg_cv_.Wait(); |
2412 | } | |
494da23a TL |
2413 | // If not resuming from bg error, and an error has caused the DB to stop, |
2414 | // then report the bg error to caller. | |
2415 | if (!resuming_from_bg_err && error_handler_.IsDBStopped()) { | |
11fdf7f2 | 2416 | s = error_handler_.GetBGError(); |
7c673cae FG |
2417 | } |
2418 | return s; | |
2419 | } | |
2420 | ||
2421 | Status DBImpl::EnableAutoCompaction( | |
2422 | const std::vector<ColumnFamilyHandle*>& column_family_handles) { | |
2423 | Status s; | |
2424 | for (auto cf_ptr : column_family_handles) { | |
2425 | Status status = | |
2426 | this->SetOptions(cf_ptr, {{"disable_auto_compactions", "false"}}); | |
2427 | if (!status.ok()) { | |
2428 | s = status; | |
2429 | } | |
2430 | } | |
2431 | ||
2432 | return s; | |
2433 | } | |
2434 | ||
1e59de90 TL |
2435 | // NOTE: Calling DisableManualCompaction() may overwrite the |
2436 | // user-provided canceled variable in CompactRangeOptions | |
f67539c2 | 2437 | void DBImpl::DisableManualCompaction() { |
20effc67 TL |
2438 | InstrumentedMutexLock l(&mutex_); |
2439 | manual_compaction_paused_.fetch_add(1, std::memory_order_release); | |
1e59de90 TL |
2440 | |
2441 | // Mark the canceled as true when the cancellation is triggered by | |
2442 | // manual_compaction_paused (may overwrite user-provided `canceled`) | |
2443 | for (const auto& manual_compaction : manual_compaction_dequeue_) { | |
2444 | manual_compaction->canceled = true; | |
2445 | } | |
2446 | ||
2447 | // Wake up manual compactions waiting to start. | |
2448 | bg_cv_.SignalAll(); | |
2449 | ||
20effc67 TL |
2450 | // Wait for any pending manual compactions to finish (typically through |
2451 | // failing with `Status::Incomplete`) prior to returning. This way we are | |
2452 | // guaranteed no pending manual compaction will commit while manual | |
2453 | // compactions are "disabled". | |
2454 | while (HasPendingManualCompaction()) { | |
2455 | bg_cv_.Wait(); | |
2456 | } | |
f67539c2 TL |
2457 | } |
2458 | ||
1e59de90 TL |
2459 | // NOTE: In contrast to DisableManualCompaction(), calling |
2460 | // EnableManualCompaction() does NOT overwrite the user-provided *canceled | |
2461 | // variable to be false since there is NO CHANCE a canceled compaction | |
2462 | // is uncanceled. In other words, a canceled compaction must have been | |
2463 | // dropped out of the manual compaction queue, when we disable it. | |
f67539c2 | 2464 | void DBImpl::EnableManualCompaction() { |
20effc67 TL |
2465 | InstrumentedMutexLock l(&mutex_); |
2466 | assert(manual_compaction_paused_ > 0); | |
2467 | manual_compaction_paused_.fetch_sub(1, std::memory_order_release); | |
f67539c2 TL |
2468 | } |
2469 | ||
7c673cae FG |
2470 | void DBImpl::MaybeScheduleFlushOrCompaction() { |
2471 | mutex_.AssertHeld(); | |
2472 | if (!opened_successfully_) { | |
2473 | // Compaction may introduce data race to DB open | |
2474 | return; | |
2475 | } | |
2476 | if (bg_work_paused_ > 0) { | |
2477 | // we paused the background work | |
2478 | return; | |
11fdf7f2 | 2479 | } else if (error_handler_.IsBGWorkStopped() && |
494da23a | 2480 | !error_handler_.IsRecoveryInProgress()) { |
11fdf7f2 TL |
2481 | // There has been a hard error and this call is not part of the recovery |
2482 | // sequence. Bail out here so we don't get into an endless loop of | |
2483 | // scheduling BG work which will again call this function | |
2484 | return; | |
7c673cae FG |
2485 | } else if (shutting_down_.load(std::memory_order_acquire)) { |
2486 | // DB is being deleted; no more background compactions | |
2487 | return; | |
2488 | } | |
11fdf7f2 TL |
2489 | auto bg_job_limits = GetBGJobLimits(); |
2490 | bool is_flush_pool_empty = | |
2491 | env_->GetBackgroundThreads(Env::Priority::HIGH) == 0; | |
2492 | while (!is_flush_pool_empty && unscheduled_flushes_ > 0 && | |
2493 | bg_flush_scheduled_ < bg_job_limits.max_flushes) { | |
7c673cae | 2494 | bg_flush_scheduled_++; |
494da23a TL |
2495 | FlushThreadArg* fta = new FlushThreadArg; |
2496 | fta->db_ = this; | |
2497 | fta->thread_pri_ = Env::Priority::HIGH; | |
2498 | env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::HIGH, this, | |
2499 | &DBImpl::UnscheduleFlushCallback); | |
f67539c2 TL |
2500 | --unscheduled_flushes_; |
2501 | TEST_SYNC_POINT_CALLBACK( | |
2502 | "DBImpl::MaybeScheduleFlushOrCompaction:AfterSchedule:0", | |
2503 | &unscheduled_flushes_); | |
7c673cae FG |
2504 | } |
2505 | ||
11fdf7f2 TL |
2506 | // special case -- if high-pri (flush) thread pool is empty, then schedule |
2507 | // flushes in low-pri (compaction) thread pool. | |
2508 | if (is_flush_pool_empty) { | |
7c673cae FG |
2509 | while (unscheduled_flushes_ > 0 && |
2510 | bg_flush_scheduled_ + bg_compaction_scheduled_ < | |
11fdf7f2 | 2511 | bg_job_limits.max_flushes) { |
7c673cae | 2512 | bg_flush_scheduled_++; |
494da23a TL |
2513 | FlushThreadArg* fta = new FlushThreadArg; |
2514 | fta->db_ = this; | |
2515 | fta->thread_pri_ = Env::Priority::LOW; | |
2516 | env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::LOW, this, | |
2517 | &DBImpl::UnscheduleFlushCallback); | |
f67539c2 | 2518 | --unscheduled_flushes_; |
7c673cae FG |
2519 | } |
2520 | } | |
2521 | ||
2522 | if (bg_compaction_paused_ > 0) { | |
2523 | // we paused the background compaction | |
2524 | return; | |
11fdf7f2 TL |
2525 | } else if (error_handler_.IsBGWorkStopped()) { |
2526 | // Compaction is not part of the recovery sequence from a hard error. We | |
2527 | // might get here because recovery might do a flush and install a new | |
2528 | // super version, which will try to schedule pending compactions. Bail | |
2529 | // out here and let the higher level recovery handle compactions | |
2530 | return; | |
7c673cae FG |
2531 | } |
2532 | ||
2533 | if (HasExclusiveManualCompaction()) { | |
2534 | // only manual compactions are allowed to run. don't schedule automatic | |
2535 | // compactions | |
11fdf7f2 | 2536 | TEST_SYNC_POINT("DBImpl::MaybeScheduleFlushOrCompaction:Conflict"); |
7c673cae FG |
2537 | return; |
2538 | } | |
2539 | ||
1e59de90 TL |
2540 | while (bg_compaction_scheduled_ + bg_bottom_compaction_scheduled_ < |
2541 | bg_job_limits.max_compactions && | |
7c673cae FG |
2542 | unscheduled_compactions_ > 0) { |
2543 | CompactionArg* ca = new CompactionArg; | |
2544 | ca->db = this; | |
1e59de90 | 2545 | ca->compaction_pri_ = Env::Priority::LOW; |
11fdf7f2 | 2546 | ca->prepicked_compaction = nullptr; |
7c673cae FG |
2547 | bg_compaction_scheduled_++; |
2548 | unscheduled_compactions_--; | |
2549 | env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this, | |
494da23a | 2550 | &DBImpl::UnscheduleCompactionCallback); |
7c673cae FG |
2551 | } |
2552 | } | |
2553 | ||
11fdf7f2 | 2554 | DBImpl::BGJobLimits DBImpl::GetBGJobLimits() const { |
7c673cae | 2555 | mutex_.AssertHeld(); |
20effc67 | 2556 | return GetBGJobLimits(mutable_db_options_.max_background_flushes, |
11fdf7f2 TL |
2557 | mutable_db_options_.max_background_compactions, |
2558 | mutable_db_options_.max_background_jobs, | |
2559 | write_controller_.NeedSpeedupCompaction()); | |
2560 | } | |
2561 | ||
2562 | DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes, | |
2563 | int max_background_compactions, | |
2564 | int max_background_jobs, | |
2565 | bool parallelize_compactions) { | |
2566 | BGJobLimits res; | |
2567 | if (max_background_flushes == -1 && max_background_compactions == -1) { | |
2568 | // for our first stab implementing max_background_jobs, simply allocate a | |
2569 | // quarter of the threads to flushes. | |
2570 | res.max_flushes = std::max(1, max_background_jobs / 4); | |
2571 | res.max_compactions = std::max(1, max_background_jobs - res.max_flushes); | |
7c673cae | 2572 | } else { |
11fdf7f2 TL |
2573 | // compatibility code in case users haven't migrated to max_background_jobs, |
2574 | // which automatically computes flush/compaction limits | |
2575 | res.max_flushes = std::max(1, max_background_flushes); | |
2576 | res.max_compactions = std::max(1, max_background_compactions); | |
7c673cae | 2577 | } |
11fdf7f2 TL |
2578 | if (!parallelize_compactions) { |
2579 | // throttle background compactions until we deem necessary | |
2580 | res.max_compactions = 1; | |
2581 | } | |
2582 | return res; | |
7c673cae FG |
2583 | } |
2584 | ||
2585 | void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) { | |
11fdf7f2 | 2586 | assert(!cfd->queued_for_compaction()); |
7c673cae FG |
2587 | cfd->Ref(); |
2588 | compaction_queue_.push_back(cfd); | |
11fdf7f2 | 2589 | cfd->set_queued_for_compaction(true); |
7c673cae FG |
2590 | } |
2591 | ||
2592 | ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() { | |
2593 | assert(!compaction_queue_.empty()); | |
2594 | auto cfd = *compaction_queue_.begin(); | |
2595 | compaction_queue_.pop_front(); | |
11fdf7f2 TL |
2596 | assert(cfd->queued_for_compaction()); |
2597 | cfd->set_queued_for_compaction(false); | |
7c673cae FG |
2598 | return cfd; |
2599 | } | |
2600 | ||
11fdf7f2 | 2601 | DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() { |
7c673cae | 2602 | assert(!flush_queue_.empty()); |
11fdf7f2 | 2603 | FlushRequest flush_req = flush_queue_.front(); |
7c673cae | 2604 | flush_queue_.pop_front(); |
1e59de90 TL |
2605 | if (!immutable_db_options_.atomic_flush) { |
2606 | assert(flush_req.size() == 1); | |
2607 | } | |
2608 | for (const auto& elem : flush_req) { | |
2609 | if (!immutable_db_options_.atomic_flush) { | |
2610 | ColumnFamilyData* cfd = elem.first; | |
2611 | assert(cfd); | |
2612 | assert(cfd->queued_for_flush()); | |
2613 | cfd->set_queued_for_flush(false); | |
2614 | } | |
2615 | } | |
11fdf7f2 TL |
2616 | // TODO: need to unset flush reason? |
2617 | return flush_req; | |
7c673cae FG |
2618 | } |
2619 | ||
494da23a TL |
2620 | ColumnFamilyData* DBImpl::PickCompactionFromQueue( |
2621 | std::unique_ptr<TaskLimiterToken>* token, LogBuffer* log_buffer) { | |
2622 | assert(!compaction_queue_.empty()); | |
2623 | assert(*token == nullptr); | |
2624 | autovector<ColumnFamilyData*> throttled_candidates; | |
2625 | ColumnFamilyData* cfd = nullptr; | |
2626 | while (!compaction_queue_.empty()) { | |
2627 | auto first_cfd = *compaction_queue_.begin(); | |
2628 | compaction_queue_.pop_front(); | |
2629 | assert(first_cfd->queued_for_compaction()); | |
2630 | if (!RequestCompactionToken(first_cfd, false, token, log_buffer)) { | |
2631 | throttled_candidates.push_back(first_cfd); | |
2632 | continue; | |
2633 | } | |
2634 | cfd = first_cfd; | |
2635 | cfd->set_queued_for_compaction(false); | |
2636 | break; | |
2637 | } | |
2638 | // Add throttled compaction candidates back to queue in the original order. | |
2639 | for (auto iter = throttled_candidates.rbegin(); | |
2640 | iter != throttled_candidates.rend(); ++iter) { | |
2641 | compaction_queue_.push_front(*iter); | |
2642 | } | |
2643 | return cfd; | |
2644 | } | |
2645 | ||
11fdf7f2 TL |
2646 | void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req, |
2647 | FlushReason flush_reason) { | |
1e59de90 | 2648 | mutex_.AssertHeld(); |
11fdf7f2 TL |
2649 | if (flush_req.empty()) { |
2650 | return; | |
2651 | } | |
1e59de90 TL |
2652 | if (!immutable_db_options_.atomic_flush) { |
2653 | // For the non-atomic flush case, we never schedule multiple column | |
2654 | // families in the same flush request. | |
2655 | assert(flush_req.size() == 1); | |
2656 | ColumnFamilyData* cfd = flush_req[0].first; | |
2657 | assert(cfd); | |
2658 | ||
2659 | if (!cfd->queued_for_flush() && cfd->imm()->IsFlushPending()) { | |
2660 | cfd->Ref(); | |
2661 | cfd->set_queued_for_flush(true); | |
2662 | cfd->SetFlushReason(flush_reason); | |
2663 | ++unscheduled_flushes_; | |
2664 | flush_queue_.push_back(flush_req); | |
2665 | } | |
2666 | } else { | |
2667 | for (auto& iter : flush_req) { | |
2668 | ColumnFamilyData* cfd = iter.first; | |
2669 | cfd->Ref(); | |
2670 | cfd->SetFlushReason(flush_reason); | |
2671 | } | |
2672 | ++unscheduled_flushes_; | |
2673 | flush_queue_.push_back(flush_req); | |
7c673cae FG |
2674 | } |
2675 | } | |
2676 | ||
2677 | void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) { | |
1e59de90 | 2678 | mutex_.AssertHeld(); |
11fdf7f2 | 2679 | if (!cfd->queued_for_compaction() && cfd->NeedsCompaction()) { |
7c673cae FG |
2680 | AddToCompactionQueue(cfd); |
2681 | ++unscheduled_compactions_; | |
2682 | } | |
2683 | } | |
2684 | ||
11fdf7f2 TL |
2685 | void DBImpl::SchedulePendingPurge(std::string fname, std::string dir_to_sync, |
2686 | FileType type, uint64_t number, int job_id) { | |
7c673cae | 2687 | mutex_.AssertHeld(); |
11fdf7f2 | 2688 | PurgeFileInfo file_info(fname, dir_to_sync, type, number, job_id); |
f67539c2 | 2689 | purge_files_.insert({{number, std::move(file_info)}}); |
7c673cae FG |
2690 | } |
2691 | ||
494da23a TL |
2692 | void DBImpl::BGWorkFlush(void* arg) { |
2693 | FlushThreadArg fta = *(reinterpret_cast<FlushThreadArg*>(arg)); | |
2694 | delete reinterpret_cast<FlushThreadArg*>(arg); | |
2695 | ||
2696 | IOSTATS_SET_THREAD_POOL_ID(fta.thread_pri_); | |
7c673cae | 2697 | TEST_SYNC_POINT("DBImpl::BGWorkFlush"); |
20effc67 | 2698 | static_cast_with_check<DBImpl>(fta.db_)->BackgroundCallFlush(fta.thread_pri_); |
7c673cae FG |
2699 | TEST_SYNC_POINT("DBImpl::BGWorkFlush:done"); |
2700 | } | |
2701 | ||
2702 | void DBImpl::BGWorkCompaction(void* arg) { | |
2703 | CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg)); | |
2704 | delete reinterpret_cast<CompactionArg*>(arg); | |
2705 | IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW); | |
2706 | TEST_SYNC_POINT("DBImpl::BGWorkCompaction"); | |
11fdf7f2 TL |
2707 | auto prepicked_compaction = |
2708 | static_cast<PrepickedCompaction*>(ca.prepicked_compaction); | |
20effc67 | 2709 | static_cast_with_check<DBImpl>(ca.db)->BackgroundCallCompaction( |
11fdf7f2 TL |
2710 | prepicked_compaction, Env::Priority::LOW); |
2711 | delete prepicked_compaction; | |
2712 | } | |
2713 | ||
2714 | void DBImpl::BGWorkBottomCompaction(void* arg) { | |
2715 | CompactionArg ca = *(static_cast<CompactionArg*>(arg)); | |
2716 | delete static_cast<CompactionArg*>(arg); | |
2717 | IOSTATS_SET_THREAD_POOL_ID(Env::Priority::BOTTOM); | |
2718 | TEST_SYNC_POINT("DBImpl::BGWorkBottomCompaction"); | |
2719 | auto* prepicked_compaction = ca.prepicked_compaction; | |
1e59de90 | 2720 | assert(prepicked_compaction && prepicked_compaction->compaction); |
11fdf7f2 TL |
2721 | ca.db->BackgroundCallCompaction(prepicked_compaction, Env::Priority::BOTTOM); |
2722 | delete prepicked_compaction; | |
7c673cae FG |
2723 | } |
2724 | ||
2725 | void DBImpl::BGWorkPurge(void* db) { | |
2726 | IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH); | |
2727 | TEST_SYNC_POINT("DBImpl::BGWorkPurge:start"); | |
2728 | reinterpret_cast<DBImpl*>(db)->BackgroundCallPurge(); | |
2729 | TEST_SYNC_POINT("DBImpl::BGWorkPurge:end"); | |
2730 | } | |
2731 | ||
494da23a | 2732 | void DBImpl::UnscheduleCompactionCallback(void* arg) { |
1e59de90 TL |
2733 | CompactionArg* ca_ptr = reinterpret_cast<CompactionArg*>(arg); |
2734 | Env::Priority compaction_pri = ca_ptr->compaction_pri_; | |
2735 | if (Env::Priority::BOTTOM == compaction_pri) { | |
2736 | // Decrement bg_bottom_compaction_scheduled_ if priority is BOTTOM | |
2737 | ca_ptr->db->bg_bottom_compaction_scheduled_--; | |
2738 | } else if (Env::Priority::LOW == compaction_pri) { | |
2739 | // Decrement bg_compaction_scheduled_ if priority is LOW | |
2740 | ca_ptr->db->bg_compaction_scheduled_--; | |
2741 | } | |
2742 | CompactionArg ca = *(ca_ptr); | |
7c673cae | 2743 | delete reinterpret_cast<CompactionArg*>(arg); |
11fdf7f2 | 2744 | if (ca.prepicked_compaction != nullptr) { |
1e59de90 TL |
2745 | // if it's a manual compaction, set status to ManualCompactionPaused |
2746 | if (ca.prepicked_compaction->manual_compaction_state) { | |
2747 | ca.prepicked_compaction->manual_compaction_state->done = true; | |
2748 | ca.prepicked_compaction->manual_compaction_state->status = | |
2749 | Status::Incomplete(Status::SubCode::kManualCompactionPaused); | |
2750 | } | |
11fdf7f2 | 2751 | if (ca.prepicked_compaction->compaction != nullptr) { |
1e59de90 TL |
2752 | ca.prepicked_compaction->compaction->ReleaseCompactionFiles( |
2753 | Status::Incomplete(Status::SubCode::kManualCompactionPaused)); | |
11fdf7f2 TL |
2754 | delete ca.prepicked_compaction->compaction; |
2755 | } | |
2756 | delete ca.prepicked_compaction; | |
7c673cae | 2757 | } |
494da23a TL |
2758 | TEST_SYNC_POINT("DBImpl::UnscheduleCompactionCallback"); |
2759 | } | |
2760 | ||
2761 | void DBImpl::UnscheduleFlushCallback(void* arg) { | |
1e59de90 TL |
2762 | // Decrement bg_flush_scheduled_ in flush callback |
2763 | reinterpret_cast<FlushThreadArg*>(arg)->db_->bg_flush_scheduled_--; | |
2764 | Env::Priority flush_pri = reinterpret_cast<FlushThreadArg*>(arg)->thread_pri_; | |
2765 | if (Env::Priority::LOW == flush_pri) { | |
2766 | TEST_SYNC_POINT("DBImpl::UnscheduleLowFlushCallback"); | |
2767 | } else if (Env::Priority::HIGH == flush_pri) { | |
2768 | TEST_SYNC_POINT("DBImpl::UnscheduleHighFlushCallback"); | |
2769 | } | |
494da23a TL |
2770 | delete reinterpret_cast<FlushThreadArg*>(arg); |
2771 | TEST_SYNC_POINT("DBImpl::UnscheduleFlushCallback"); | |
7c673cae FG |
2772 | } |
2773 | ||
2774 | Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, | |
494da23a TL |
2775 | LogBuffer* log_buffer, FlushReason* reason, |
2776 | Env::Priority thread_pri) { | |
7c673cae FG |
2777 | mutex_.AssertHeld(); |
2778 | ||
11fdf7f2 TL |
2779 | Status status; |
2780 | *reason = FlushReason::kOthers; | |
2781 | // If BG work is stopped due to an error, but a recovery is in progress, | |
2782 | // that means this flush is part of the recovery. So allow it to go through | |
2783 | if (!error_handler_.IsBGWorkStopped()) { | |
2784 | if (shutting_down_.load(std::memory_order_acquire)) { | |
2785 | status = Status::ShutdownInProgress(); | |
2786 | } | |
2787 | } else if (!error_handler_.IsRecoveryInProgress()) { | |
2788 | status = error_handler_.GetBGError(); | |
7c673cae FG |
2789 | } |
2790 | ||
2791 | if (!status.ok()) { | |
2792 | return status; | |
2793 | } | |
2794 | ||
11fdf7f2 TL |
2795 | autovector<BGFlushArg> bg_flush_args; |
2796 | std::vector<SuperVersionContext>& superversion_contexts = | |
2797 | job_context->superversion_contexts; | |
f67539c2 | 2798 | autovector<ColumnFamilyData*> column_families_not_to_flush; |
7c673cae FG |
2799 | while (!flush_queue_.empty()) { |
2800 | // This cfd is already referenced | |
11fdf7f2 TL |
2801 | const FlushRequest& flush_req = PopFirstFromFlushQueue(); |
2802 | superversion_contexts.clear(); | |
2803 | superversion_contexts.reserve(flush_req.size()); | |
2804 | ||
2805 | for (const auto& iter : flush_req) { | |
2806 | ColumnFamilyData* cfd = iter.first; | |
1e59de90 TL |
2807 | if (cfd->GetMempurgeUsed()) { |
2808 | // If imm() contains silent memtables (e.g.: because | |
2809 | // MemPurge was activated), requesting a flush will | |
2810 | // mark the imm_needed as true. | |
2811 | cfd->imm()->FlushRequested(); | |
2812 | } | |
2813 | ||
11fdf7f2 TL |
2814 | if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) { |
2815 | // can't flush this CF, try next one | |
f67539c2 | 2816 | column_families_not_to_flush.push_back(cfd); |
11fdf7f2 | 2817 | continue; |
7c673cae | 2818 | } |
11fdf7f2 TL |
2819 | superversion_contexts.emplace_back(SuperVersionContext(true)); |
2820 | bg_flush_args.emplace_back(cfd, iter.second, | |
2821 | &(superversion_contexts.back())); | |
2822 | } | |
2823 | if (!bg_flush_args.empty()) { | |
2824 | break; | |
7c673cae | 2825 | } |
7c673cae FG |
2826 | } |
2827 | ||
11fdf7f2 TL |
2828 | if (!bg_flush_args.empty()) { |
2829 | auto bg_job_limits = GetBGJobLimits(); | |
2830 | for (const auto& arg : bg_flush_args) { | |
2831 | ColumnFamilyData* cfd = arg.cfd_; | |
2832 | ROCKS_LOG_BUFFER( | |
2833 | log_buffer, | |
2834 | "Calling FlushMemTableToOutputFile with column " | |
2835 | "family [%s], flush slots available %d, compaction slots available " | |
2836 | "%d, " | |
2837 | "flush slots scheduled %d, compaction slots scheduled %d", | |
2838 | cfd->GetName().c_str(), bg_job_limits.max_flushes, | |
2839 | bg_job_limits.max_compactions, bg_flush_scheduled_, | |
2840 | bg_compaction_scheduled_); | |
2841 | } | |
2842 | status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress, | |
494da23a | 2843 | job_context, log_buffer, thread_pri); |
f67539c2 | 2844 | TEST_SYNC_POINT("DBImpl::BackgroundFlush:BeforeFlush"); |
11fdf7f2 TL |
2845 | // All the CFDs in the FlushReq must have the same flush reason, so just |
2846 | // grab the first one | |
2847 | *reason = bg_flush_args[0].cfd_->GetFlushReason(); | |
2848 | for (auto& arg : bg_flush_args) { | |
2849 | ColumnFamilyData* cfd = arg.cfd_; | |
f67539c2 | 2850 | if (cfd->UnrefAndTryDelete()) { |
11fdf7f2 TL |
2851 | arg.cfd_ = nullptr; |
2852 | } | |
7c673cae FG |
2853 | } |
2854 | } | |
f67539c2 TL |
2855 | for (auto cfd : column_families_not_to_flush) { |
2856 | cfd->UnrefAndTryDelete(); | |
2857 | } | |
7c673cae FG |
2858 | return status; |
2859 | } | |
2860 | ||
494da23a | 2861 | void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) { |
7c673cae FG |
2862 | bool made_progress = false; |
2863 | JobContext job_context(next_job_id_.fetch_add(1), true); | |
7c673cae | 2864 | |
1e59de90 | 2865 | TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCallFlush:start", nullptr); |
7c673cae FG |
2866 | |
2867 | LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, | |
2868 | immutable_db_options_.info_log.get()); | |
1e59de90 TL |
2869 | TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:Start:1"); |
2870 | TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:Start:2"); | |
7c673cae FG |
2871 | { |
2872 | InstrumentedMutexLock l(&mutex_); | |
11fdf7f2 | 2873 | assert(bg_flush_scheduled_); |
7c673cae FG |
2874 | num_running_flushes_++; |
2875 | ||
f67539c2 TL |
2876 | std::unique_ptr<std::list<uint64_t>::iterator> |
2877 | pending_outputs_inserted_elem(new std::list<uint64_t>::iterator( | |
2878 | CaptureCurrentFileNumberInPendingOutputs())); | |
11fdf7f2 | 2879 | FlushReason reason; |
7c673cae | 2880 | |
494da23a TL |
2881 | Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer, |
2882 | &reason, thread_pri); | |
f67539c2 | 2883 | if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped() && |
11fdf7f2 | 2884 | reason != FlushReason::kErrorRecovery) { |
7c673cae FG |
2885 | // Wait a little bit before retrying background flush in |
2886 | // case this is an environmental problem and we do not want to | |
2887 | // chew up resources for failed flushes for the duration of | |
2888 | // the problem. | |
2889 | uint64_t error_cnt = | |
11fdf7f2 | 2890 | default_cf_internal_stats_->BumpAndGetBackgroundErrorCount(); |
7c673cae FG |
2891 | bg_cv_.SignalAll(); // In case a waiter can proceed despite the error |
2892 | mutex_.Unlock(); | |
2893 | ROCKS_LOG_ERROR(immutable_db_options_.info_log, | |
2894 | "Waiting after background flush error: %s" | |
2895 | "Accumulated background error counts: %" PRIu64, | |
2896 | s.ToString().c_str(), error_cnt); | |
2897 | log_buffer.FlushBufferToLog(); | |
2898 | LogFlush(immutable_db_options_.info_log); | |
1e59de90 | 2899 | immutable_db_options_.clock->SleepForMicroseconds(1000000); |
7c673cae FG |
2900 | mutex_.Lock(); |
2901 | } | |
2902 | ||
494da23a | 2903 | TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FlushFinish:0"); |
7c673cae FG |
2904 | ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); |
2905 | ||
2906 | // If flush failed, we want to delete all temporary files that we might have | |
2907 | // created. Thus, we force full scan in FindObsoleteFiles() | |
f67539c2 TL |
2908 | FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() && |
2909 | !s.IsColumnFamilyDropped()); | |
7c673cae | 2910 | // delete unnecessary files if any, this is done outside the mutex |
11fdf7f2 TL |
2911 | if (job_context.HaveSomethingToClean() || |
2912 | job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { | |
7c673cae | 2913 | mutex_.Unlock(); |
11fdf7f2 | 2914 | TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FilesFound"); |
7c673cae FG |
2915 | // Have to flush the info logs before bg_flush_scheduled_-- |
2916 | // because if bg_flush_scheduled_ becomes 0 and the lock is | |
2917 | // released, the deconstructor of DB can kick in and destroy all the | |
2918 | // states of DB so info_log might not be available after that point. | |
2919 | // It also applies to access other states that DB owns. | |
2920 | log_buffer.FlushBufferToLog(); | |
2921 | if (job_context.HaveSomethingToDelete()) { | |
2922 | PurgeObsoleteFiles(job_context); | |
2923 | } | |
2924 | job_context.Clean(); | |
2925 | mutex_.Lock(); | |
2926 | } | |
494da23a | 2927 | TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:ContextCleanedUp"); |
7c673cae FG |
2928 | |
2929 | assert(num_running_flushes_ > 0); | |
2930 | num_running_flushes_--; | |
2931 | bg_flush_scheduled_--; | |
2932 | // See if there's more work to be done | |
2933 | MaybeScheduleFlushOrCompaction(); | |
494da23a | 2934 | atomic_flush_install_cv_.SignalAll(); |
7c673cae FG |
2935 | bg_cv_.SignalAll(); |
2936 | // IMPORTANT: there should be no code after calling SignalAll. This call may | |
2937 | // signal the DB destructor that it's OK to proceed with destruction. In | |
2938 | // that case, all DB variables will be dealloacated and referencing them | |
2939 | // will cause trouble. | |
2940 | } | |
2941 | } | |
2942 | ||
11fdf7f2 TL |
2943 | void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, |
2944 | Env::Priority bg_thread_pri) { | |
7c673cae | 2945 | bool made_progress = false; |
7c673cae FG |
2946 | JobContext job_context(next_job_id_.fetch_add(1), true); |
2947 | TEST_SYNC_POINT("BackgroundCallCompaction:0"); | |
7c673cae FG |
2948 | LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, |
2949 | immutable_db_options_.info_log.get()); | |
2950 | { | |
2951 | InstrumentedMutexLock l(&mutex_); | |
2952 | ||
2953 | // This call will unlock/lock the mutex to wait for current running | |
2954 | // IngestExternalFile() calls to finish. | |
2955 | WaitForIngestFile(); | |
2956 | ||
2957 | num_running_compactions_++; | |
2958 | ||
f67539c2 TL |
2959 | std::unique_ptr<std::list<uint64_t>::iterator> |
2960 | pending_outputs_inserted_elem(new std::list<uint64_t>::iterator( | |
2961 | CaptureCurrentFileNumberInPendingOutputs())); | |
7c673cae | 2962 | |
11fdf7f2 TL |
2963 | assert((bg_thread_pri == Env::Priority::BOTTOM && |
2964 | bg_bottom_compaction_scheduled_) || | |
2965 | (bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_)); | |
2966 | Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer, | |
494da23a | 2967 | prepicked_compaction, bg_thread_pri); |
7c673cae | 2968 | TEST_SYNC_POINT("BackgroundCallCompaction:1"); |
494da23a TL |
2969 | if (s.IsBusy()) { |
2970 | bg_cv_.SignalAll(); // In case a waiter can proceed despite the error | |
2971 | mutex_.Unlock(); | |
1e59de90 TL |
2972 | immutable_db_options_.clock->SleepForMicroseconds( |
2973 | 10000); // prevent hot loop | |
494da23a | 2974 | mutex_.Lock(); |
f67539c2 TL |
2975 | } else if (!s.ok() && !s.IsShutdownInProgress() && |
2976 | !s.IsManualCompactionPaused() && !s.IsColumnFamilyDropped()) { | |
7c673cae FG |
2977 | // Wait a little bit before retrying background compaction in |
2978 | // case this is an environmental problem and we do not want to | |
2979 | // chew up resources for failed compactions for the duration of | |
2980 | // the problem. | |
2981 | uint64_t error_cnt = | |
2982 | default_cf_internal_stats_->BumpAndGetBackgroundErrorCount(); | |
2983 | bg_cv_.SignalAll(); // In case a waiter can proceed despite the error | |
2984 | mutex_.Unlock(); | |
2985 | log_buffer.FlushBufferToLog(); | |
2986 | ROCKS_LOG_ERROR(immutable_db_options_.info_log, | |
2987 | "Waiting after background compaction error: %s, " | |
2988 | "Accumulated background error counts: %" PRIu64, | |
2989 | s.ToString().c_str(), error_cnt); | |
2990 | LogFlush(immutable_db_options_.info_log); | |
1e59de90 | 2991 | immutable_db_options_.clock->SleepForMicroseconds(1000000); |
7c673cae | 2992 | mutex_.Lock(); |
f67539c2 | 2993 | } else if (s.IsManualCompactionPaused()) { |
1e59de90 | 2994 | assert(prepicked_compaction); |
f67539c2 TL |
2995 | ManualCompactionState* m = prepicked_compaction->manual_compaction_state; |
2996 | assert(m); | |
2997 | ROCKS_LOG_BUFFER(&log_buffer, "[%s] [JOB %d] Manual compaction paused", | |
2998 | m->cfd->GetName().c_str(), job_context.job_id); | |
7c673cae FG |
2999 | } |
3000 | ||
3001 | ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); | |
3002 | ||
1e59de90 TL |
3003 | // If compaction failed, we want to delete all temporary files that we |
3004 | // might have created (they might not be all recorded in job_context in | |
3005 | // case of a failure). Thus, we force full scan in FindObsoleteFiles() | |
f67539c2 TL |
3006 | FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() && |
3007 | !s.IsManualCompactionPaused() && | |
1e59de90 TL |
3008 | !s.IsColumnFamilyDropped() && |
3009 | !s.IsBusy()); | |
11fdf7f2 | 3010 | TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"); |
7c673cae FG |
3011 | |
3012 | // delete unnecessary files if any, this is done outside the mutex | |
11fdf7f2 TL |
3013 | if (job_context.HaveSomethingToClean() || |
3014 | job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { | |
7c673cae FG |
3015 | mutex_.Unlock(); |
3016 | // Have to flush the info logs before bg_compaction_scheduled_-- | |
3017 | // because if bg_flush_scheduled_ becomes 0 and the lock is | |
3018 | // released, the deconstructor of DB can kick in and destroy all the | |
3019 | // states of DB so info_log might not be available after that point. | |
3020 | // It also applies to access other states that DB owns. | |
3021 | log_buffer.FlushBufferToLog(); | |
3022 | if (job_context.HaveSomethingToDelete()) { | |
3023 | PurgeObsoleteFiles(job_context); | |
11fdf7f2 | 3024 | TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles"); |
7c673cae FG |
3025 | } |
3026 | job_context.Clean(); | |
3027 | mutex_.Lock(); | |
3028 | } | |
3029 | ||
3030 | assert(num_running_compactions_ > 0); | |
3031 | num_running_compactions_--; | |
1e59de90 | 3032 | |
11fdf7f2 TL |
3033 | if (bg_thread_pri == Env::Priority::LOW) { |
3034 | bg_compaction_scheduled_--; | |
3035 | } else { | |
3036 | assert(bg_thread_pri == Env::Priority::BOTTOM); | |
3037 | bg_bottom_compaction_scheduled_--; | |
3038 | } | |
7c673cae | 3039 | |
7c673cae FG |
3040 | // See if there's more work to be done |
3041 | MaybeScheduleFlushOrCompaction(); | |
1e59de90 TL |
3042 | |
3043 | if (prepicked_compaction != nullptr && | |
3044 | prepicked_compaction->task_token != nullptr) { | |
3045 | // Releasing task tokens affects (and asserts on) the DB state, so | |
3046 | // must be done before we potentially signal the DB close process to | |
3047 | // proceed below. | |
3048 | prepicked_compaction->task_token.reset(); | |
3049 | } | |
3050 | ||
11fdf7f2 TL |
3051 | if (made_progress || |
3052 | (bg_compaction_scheduled_ == 0 && | |
3053 | bg_bottom_compaction_scheduled_ == 0) || | |
3054 | HasPendingManualCompaction() || unscheduled_compactions_ == 0) { | |
7c673cae FG |
3055 | // signal if |
3056 | // * made_progress -- need to wakeup DelayWrite | |
11fdf7f2 | 3057 | // * bg_{bottom,}_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl |
7c673cae FG |
3058 | // * HasPendingManualCompaction -- need to wakeup RunManualCompaction |
3059 | // If none of this is true, there is no need to signal since nobody is | |
3060 | // waiting for it | |
3061 | bg_cv_.SignalAll(); | |
3062 | } | |
3063 | // IMPORTANT: there should be no code after calling SignalAll. This call may | |
3064 | // signal the DB destructor that it's OK to proceed with destruction. In | |
3065 | // that case, all DB variables will be dealloacated and referencing them | |
3066 | // will cause trouble. | |
3067 | } | |
3068 | } | |
3069 | ||
3070 | Status DBImpl::BackgroundCompaction(bool* made_progress, | |
3071 | JobContext* job_context, | |
11fdf7f2 | 3072 | LogBuffer* log_buffer, |
494da23a TL |
3073 | PrepickedCompaction* prepicked_compaction, |
3074 | Env::Priority thread_pri) { | |
11fdf7f2 TL |
3075 | ManualCompactionState* manual_compaction = |
3076 | prepicked_compaction == nullptr | |
3077 | ? nullptr | |
3078 | : prepicked_compaction->manual_compaction_state; | |
7c673cae FG |
3079 | *made_progress = false; |
3080 | mutex_.AssertHeld(); | |
3081 | TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Start"); | |
3082 | ||
3083 | bool is_manual = (manual_compaction != nullptr); | |
494da23a | 3084 | std::unique_ptr<Compaction> c; |
11fdf7f2 TL |
3085 | if (prepicked_compaction != nullptr && |
3086 | prepicked_compaction->compaction != nullptr) { | |
3087 | c.reset(prepicked_compaction->compaction); | |
3088 | } | |
3089 | bool is_prepicked = is_manual || c; | |
7c673cae FG |
3090 | |
3091 | // (manual_compaction->in_progress == false); | |
3092 | bool trivial_move_disallowed = | |
3093 | is_manual && manual_compaction->disallow_trivial_move; | |
3094 | ||
3095 | CompactionJobStats compaction_job_stats; | |
11fdf7f2 TL |
3096 | Status status; |
3097 | if (!error_handler_.IsBGWorkStopped()) { | |
3098 | if (shutting_down_.load(std::memory_order_acquire)) { | |
3099 | status = Status::ShutdownInProgress(); | |
f67539c2 | 3100 | } else if (is_manual && |
1e59de90 | 3101 | manual_compaction->canceled.load(std::memory_order_acquire)) { |
f67539c2 | 3102 | status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); |
11fdf7f2 TL |
3103 | } |
3104 | } else { | |
3105 | status = error_handler_.GetBGError(); | |
3106 | // If we get here, it means a hard error happened after this compaction | |
3107 | // was scheduled by MaybeScheduleFlushOrCompaction(), but before it got | |
3108 | // a chance to execute. Since we didn't pop a cfd from the compaction | |
3109 | // queue, increment unscheduled_compactions_ | |
3110 | unscheduled_compactions_++; | |
7c673cae FG |
3111 | } |
3112 | ||
3113 | if (!status.ok()) { | |
3114 | if (is_manual) { | |
3115 | manual_compaction->status = status; | |
3116 | manual_compaction->done = true; | |
3117 | manual_compaction->in_progress = false; | |
7c673cae FG |
3118 | manual_compaction = nullptr; |
3119 | } | |
494da23a TL |
3120 | if (c) { |
3121 | c->ReleaseCompactionFiles(status); | |
3122 | c.reset(); | |
3123 | } | |
7c673cae FG |
3124 | return status; |
3125 | } | |
3126 | ||
3127 | if (is_manual) { | |
3128 | // another thread cannot pick up the same work | |
3129 | manual_compaction->in_progress = true; | |
3130 | } | |
3131 | ||
1e59de90 TL |
3132 | TEST_SYNC_POINT("DBImpl::BackgroundCompaction:InProgress"); |
3133 | ||
494da23a TL |
3134 | std::unique_ptr<TaskLimiterToken> task_token; |
3135 | ||
7c673cae FG |
3136 | // InternalKey manual_end_storage; |
3137 | // InternalKey* manual_end = &manual_end_storage; | |
11fdf7f2 | 3138 | bool sfm_reserved_compact_space = false; |
7c673cae | 3139 | if (is_manual) { |
11fdf7f2 | 3140 | ManualCompactionState* m = manual_compaction; |
7c673cae | 3141 | assert(m->in_progress); |
7c673cae FG |
3142 | if (!c) { |
3143 | m->done = true; | |
3144 | m->manual_end = nullptr; | |
20effc67 TL |
3145 | ROCKS_LOG_BUFFER( |
3146 | log_buffer, | |
3147 | "[%s] Manual compaction from level-%d from %s .. " | |
3148 | "%s; nothing to do\n", | |
3149 | m->cfd->GetName().c_str(), m->input_level, | |
3150 | (m->begin ? m->begin->DebugString(true).c_str() : "(begin)"), | |
3151 | (m->end ? m->end->DebugString(true).c_str() : "(end)")); | |
7c673cae | 3152 | } else { |
11fdf7f2 TL |
3153 | // First check if we have enough room to do the compaction |
3154 | bool enough_room = EnoughRoomForCompaction( | |
3155 | m->cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer); | |
3156 | ||
3157 | if (!enough_room) { | |
3158 | // Then don't do the compaction | |
3159 | c->ReleaseCompactionFiles(status); | |
3160 | c.reset(); | |
3161 | // m's vars will get set properly at the end of this function, | |
3162 | // as long as status == CompactionTooLarge | |
3163 | status = Status::CompactionTooLarge(); | |
3164 | } else { | |
3165 | ROCKS_LOG_BUFFER( | |
3166 | log_buffer, | |
3167 | "[%s] Manual compaction from level-%d to level-%d from %s .. " | |
3168 | "%s; will stop at %s\n", | |
3169 | m->cfd->GetName().c_str(), m->input_level, c->output_level(), | |
20effc67 TL |
3170 | (m->begin ? m->begin->DebugString(true).c_str() : "(begin)"), |
3171 | (m->end ? m->end->DebugString(true).c_str() : "(end)"), | |
11fdf7f2 TL |
3172 | ((m->done || m->manual_end == nullptr) |
3173 | ? "(end)" | |
20effc67 | 3174 | : m->manual_end->DebugString(true).c_str())); |
11fdf7f2 TL |
3175 | } |
3176 | } | |
3177 | } else if (!is_prepicked && !compaction_queue_.empty()) { | |
3178 | if (HasExclusiveManualCompaction()) { | |
3179 | // Can't compact right now, but try again later | |
3180 | TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict"); | |
3181 | ||
3182 | // Stay in the compaction queue. | |
3183 | unscheduled_compactions_++; | |
3184 | ||
3185 | return Status::OK(); | |
3186 | } | |
3187 | ||
494da23a TL |
3188 | auto cfd = PickCompactionFromQueue(&task_token, log_buffer); |
3189 | if (cfd == nullptr) { | |
3190 | // Can't find any executable task from the compaction queue. | |
3191 | // All tasks have been throttled by compaction thread limiter. | |
3192 | ++unscheduled_compactions_; | |
3193 | return Status::Busy(); | |
3194 | } | |
3195 | ||
7c673cae FG |
3196 | // We unreference here because the following code will take a Ref() on |
3197 | // this cfd if it is going to use it (Compaction class holds a | |
3198 | // reference). | |
3199 | // This will all happen under a mutex so we don't have to be afraid of | |
3200 | // somebody else deleting it. | |
f67539c2 | 3201 | if (cfd->UnrefAndTryDelete()) { |
7c673cae FG |
3202 | // This was the last reference of the column family, so no need to |
3203 | // compact. | |
3204 | return Status::OK(); | |
3205 | } | |
3206 | ||
7c673cae FG |
3207 | // Pick up latest mutable CF Options and use it throughout the |
3208 | // compaction job | |
3209 | // Compaction makes a copy of the latest MutableCFOptions. It should be used | |
3210 | // throughout the compaction procedure to make sure consistency. It will | |
3211 | // eventually be installed into SuperVersion | |
3212 | auto* mutable_cf_options = cfd->GetLatestMutableCFOptions(); | |
3213 | if (!mutable_cf_options->disable_auto_compactions && !cfd->IsDropped()) { | |
3214 | // NOTE: try to avoid unnecessary copy of MutableCFOptions if | |
3215 | // compaction is not necessary. Need to make sure mutex is held | |
3216 | // until we make a copy in the following code | |
3217 | TEST_SYNC_POINT("DBImpl::BackgroundCompaction():BeforePickCompaction"); | |
20effc67 TL |
3218 | c.reset(cfd->PickCompaction(*mutable_cf_options, mutable_db_options_, |
3219 | log_buffer)); | |
7c673cae | 3220 | TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction"); |
11fdf7f2 | 3221 | |
7c673cae | 3222 | if (c != nullptr) { |
11fdf7f2 TL |
3223 | bool enough_room = EnoughRoomForCompaction( |
3224 | cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer); | |
3225 | ||
3226 | if (!enough_room) { | |
3227 | // Then don't do the compaction | |
3228 | c->ReleaseCompactionFiles(status); | |
3229 | c->column_family_data() | |
3230 | ->current() | |
3231 | ->storage_info() | |
1e59de90 | 3232 | ->ComputeCompactionScore(*(c->immutable_options()), |
11fdf7f2 | 3233 | *(c->mutable_cf_options())); |
7c673cae FG |
3234 | AddToCompactionQueue(cfd); |
3235 | ++unscheduled_compactions_; | |
11fdf7f2 TL |
3236 | |
3237 | c.reset(); | |
3238 | // Don't need to sleep here, because BackgroundCallCompaction | |
3239 | // will sleep if !s.ok() | |
3240 | status = Status::CompactionTooLarge(); | |
3241 | } else { | |
3242 | // update statistics | |
1e59de90 TL |
3243 | size_t num_files = 0; |
3244 | for (auto& each_level : *c->inputs()) { | |
3245 | num_files += each_level.files.size(); | |
3246 | } | |
3247 | RecordInHistogram(stats_, NUM_FILES_IN_SINGLE_COMPACTION, num_files); | |
3248 | ||
11fdf7f2 TL |
3249 | // There are three things that can change compaction score: |
3250 | // 1) When flush or compaction finish. This case is covered by | |
3251 | // InstallSuperVersionAndScheduleWork | |
3252 | // 2) When MutableCFOptions changes. This case is also covered by | |
3253 | // InstallSuperVersionAndScheduleWork, because this is when the new | |
3254 | // options take effect. | |
3255 | // 3) When we Pick a new compaction, we "remove" those files being | |
3256 | // compacted from the calculation, which then influences compaction | |
3257 | // score. Here we check if we need the new compaction even without the | |
3258 | // files that are currently being compacted. If we need another | |
3259 | // compaction, we might be able to execute it in parallel, so we add | |
3260 | // it to the queue and schedule a new thread. | |
3261 | if (cfd->NeedsCompaction()) { | |
3262 | // Yes, we need more compactions! | |
3263 | AddToCompactionQueue(cfd); | |
3264 | ++unscheduled_compactions_; | |
3265 | MaybeScheduleFlushOrCompaction(); | |
3266 | } | |
7c673cae FG |
3267 | } |
3268 | } | |
3269 | } | |
3270 | } | |
3271 | ||
20effc67 | 3272 | IOStatus io_s; |
7c673cae FG |
3273 | if (!c) { |
3274 | // Nothing to do | |
3275 | ROCKS_LOG_BUFFER(log_buffer, "Compaction nothing to do"); | |
3276 | } else if (c->deletion_compaction()) { | |
3277 | // TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old | |
3278 | // file if there is alive snapshot pointing to it | |
494da23a TL |
3279 | TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction", |
3280 | c->column_family_data()); | |
7c673cae | 3281 | assert(c->num_input_files(1) == 0); |
7c673cae FG |
3282 | assert(c->column_family_data()->ioptions()->compaction_style == |
3283 | kCompactionStyleFIFO); | |
3284 | ||
3285 | compaction_job_stats.num_input_files = c->num_input_files(0); | |
3286 | ||
494da23a TL |
3287 | NotifyOnCompactionBegin(c->column_family_data(), c.get(), status, |
3288 | compaction_job_stats, job_context->job_id); | |
3289 | ||
7c673cae FG |
3290 | for (const auto& f : *c->inputs(0)) { |
3291 | c->edit()->DeleteFile(c->level(), f->fd.GetNumber()); | |
3292 | } | |
3293 | status = versions_->LogAndApply(c->column_family_data(), | |
3294 | *c->mutable_cf_options(), c->edit(), | |
3295 | &mutex_, directories_.GetDbDir()); | |
20effc67 | 3296 | io_s = versions_->io_status(); |
494da23a TL |
3297 | InstallSuperVersionAndScheduleWork(c->column_family_data(), |
3298 | &job_context->superversion_contexts[0], | |
3299 | *c->mutable_cf_options()); | |
7c673cae FG |
3300 | ROCKS_LOG_BUFFER(log_buffer, "[%s] Deleted %d files\n", |
3301 | c->column_family_data()->GetName().c_str(), | |
3302 | c->num_input_files(0)); | |
3303 | *made_progress = true; | |
494da23a TL |
3304 | TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction", |
3305 | c->column_family_data()); | |
7c673cae FG |
3306 | } else if (!trivial_move_disallowed && c->IsTrivialMove()) { |
3307 | TEST_SYNC_POINT("DBImpl::BackgroundCompaction:TrivialMove"); | |
494da23a TL |
3308 | TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction", |
3309 | c->column_family_data()); | |
7c673cae FG |
3310 | // Instrument for event update |
3311 | // TODO(yhchiang): add op details for showing trivial-move. | |
3312 | ThreadStatusUtil::SetColumnFamily( | |
3313 | c->column_family_data(), c->column_family_data()->ioptions()->env, | |
3314 | immutable_db_options_.enable_thread_tracking); | |
3315 | ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION); | |
3316 | ||
3317 | compaction_job_stats.num_input_files = c->num_input_files(0); | |
3318 | ||
494da23a TL |
3319 | NotifyOnCompactionBegin(c->column_family_data(), c.get(), status, |
3320 | compaction_job_stats, job_context->job_id); | |
3321 | ||
7c673cae FG |
3322 | // Move files to next level |
3323 | int32_t moved_files = 0; | |
3324 | int64_t moved_bytes = 0; | |
3325 | for (unsigned int l = 0; l < c->num_input_levels(); l++) { | |
3326 | if (c->level(l) == c->output_level()) { | |
3327 | continue; | |
3328 | } | |
3329 | for (size_t i = 0; i < c->num_input_files(l); i++) { | |
3330 | FileMetaData* f = c->input(l, i); | |
3331 | c->edit()->DeleteFile(c->level(l), f->fd.GetNumber()); | |
1e59de90 TL |
3332 | c->edit()->AddFile( |
3333 | c->output_level(), f->fd.GetNumber(), f->fd.GetPathId(), | |
3334 | f->fd.GetFileSize(), f->smallest, f->largest, f->fd.smallest_seqno, | |
3335 | f->fd.largest_seqno, f->marked_for_compaction, f->temperature, | |
3336 | f->oldest_blob_file_number, f->oldest_ancester_time, | |
3337 | f->file_creation_time, f->file_checksum, f->file_checksum_func_name, | |
3338 | f->unique_id); | |
11fdf7f2 TL |
3339 | |
3340 | ROCKS_LOG_BUFFER( | |
3341 | log_buffer, | |
3342 | "[%s] Moving #%" PRIu64 " to level-%d %" PRIu64 " bytes\n", | |
3343 | c->column_family_data()->GetName().c_str(), f->fd.GetNumber(), | |
3344 | c->output_level(), f->fd.GetFileSize()); | |
7c673cae FG |
3345 | ++moved_files; |
3346 | moved_bytes += f->fd.GetFileSize(); | |
3347 | } | |
3348 | } | |
1e59de90 TL |
3349 | if (c->compaction_reason() == CompactionReason::kLevelMaxLevelSize && |
3350 | c->immutable_options()->compaction_pri == kRoundRobin) { | |
3351 | int start_level = c->start_level(); | |
3352 | if (start_level > 0) { | |
3353 | auto vstorage = c->input_version()->storage_info(); | |
3354 | c->edit()->AddCompactCursor( | |
3355 | start_level, | |
3356 | vstorage->GetNextCompactCursor(start_level, c->num_input_files(0))); | |
3357 | } | |
3358 | } | |
7c673cae FG |
3359 | status = versions_->LogAndApply(c->column_family_data(), |
3360 | *c->mutable_cf_options(), c->edit(), | |
3361 | &mutex_, directories_.GetDbDir()); | |
20effc67 | 3362 | io_s = versions_->io_status(); |
7c673cae | 3363 | // Use latest MutableCFOptions |
494da23a TL |
3364 | InstallSuperVersionAndScheduleWork(c->column_family_data(), |
3365 | &job_context->superversion_contexts[0], | |
3366 | *c->mutable_cf_options()); | |
7c673cae FG |
3367 | |
3368 | VersionStorageInfo::LevelSummaryStorage tmp; | |
3369 | c->column_family_data()->internal_stats()->IncBytesMoved(c->output_level(), | |
3370 | moved_bytes); | |
3371 | { | |
3372 | event_logger_.LogToBuffer(log_buffer) | |
3373 | << "job" << job_context->job_id << "event" | |
3374 | << "trivial_move" | |
3375 | << "destination_level" << c->output_level() << "files" << moved_files | |
3376 | << "total_files_size" << moved_bytes; | |
3377 | } | |
3378 | ROCKS_LOG_BUFFER( | |
3379 | log_buffer, | |
3380 | "[%s] Moved #%d files to level-%d %" PRIu64 " bytes %s: %s\n", | |
3381 | c->column_family_data()->GetName().c_str(), moved_files, | |
3382 | c->output_level(), moved_bytes, status.ToString().c_str(), | |
3383 | c->column_family_data()->current()->storage_info()->LevelSummary(&tmp)); | |
3384 | *made_progress = true; | |
3385 | ||
3386 | // Clear Instrument | |
3387 | ThreadStatusUtil::ResetThreadStatus(); | |
494da23a TL |
3388 | TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction", |
3389 | c->column_family_data()); | |
11fdf7f2 TL |
3390 | } else if (!is_prepicked && c->output_level() > 0 && |
3391 | c->output_level() == | |
3392 | c->column_family_data() | |
3393 | ->current() | |
3394 | ->storage_info() | |
3395 | ->MaxOutputLevel( | |
3396 | immutable_db_options_.allow_ingest_behind) && | |
3397 | env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) { | |
3398 | // Forward compactions involving last level to the bottom pool if it exists, | |
3399 | // such that compactions unlikely to contribute to write stalls can be | |
3400 | // delayed or deprioritized. | |
3401 | TEST_SYNC_POINT("DBImpl::BackgroundCompaction:ForwardToBottomPriPool"); | |
3402 | CompactionArg* ca = new CompactionArg; | |
3403 | ca->db = this; | |
1e59de90 | 3404 | ca->compaction_pri_ = Env::Priority::BOTTOM; |
11fdf7f2 TL |
3405 | ca->prepicked_compaction = new PrepickedCompaction; |
3406 | ca->prepicked_compaction->compaction = c.release(); | |
3407 | ca->prepicked_compaction->manual_compaction_state = nullptr; | |
494da23a TL |
3408 | // Transfer requested token, so it doesn't need to do it again. |
3409 | ca->prepicked_compaction->task_token = std::move(task_token); | |
11fdf7f2 TL |
3410 | ++bg_bottom_compaction_scheduled_; |
3411 | env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, Env::Priority::BOTTOM, | |
494da23a | 3412 | this, &DBImpl::UnscheduleCompactionCallback); |
7c673cae | 3413 | } else { |
494da23a TL |
3414 | TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction", |
3415 | c->column_family_data()); | |
11fdf7f2 TL |
3416 | int output_level __attribute__((__unused__)); |
3417 | output_level = c->output_level(); | |
7c673cae FG |
3418 | TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial", |
3419 | &output_level); | |
494da23a | 3420 | std::vector<SequenceNumber> snapshot_seqs; |
7c673cae | 3421 | SequenceNumber earliest_write_conflict_snapshot; |
494da23a TL |
3422 | SnapshotChecker* snapshot_checker; |
3423 | GetSnapshotContext(job_context, &snapshot_seqs, | |
3424 | &earliest_write_conflict_snapshot, &snapshot_checker); | |
7c673cae | 3425 | assert(is_snapshot_supported_ || snapshots_.empty()); |
1e59de90 | 3426 | |
7c673cae | 3427 | CompactionJob compaction_job( |
11fdf7f2 | 3428 | job_context->job_id, c.get(), immutable_db_options_, |
1e59de90 TL |
3429 | mutable_db_options_, file_options_for_compaction_, versions_.get(), |
3430 | &shutting_down_, log_buffer, directories_.GetDbDir(), | |
20effc67 TL |
3431 | GetDataDir(c->column_family_data(), c->output_path_id()), |
3432 | GetDataDir(c->column_family_data(), 0), stats_, &mutex_, | |
3433 | &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot, | |
1e59de90 | 3434 | snapshot_checker, job_context, table_cache_, &event_logger_, |
20effc67 | 3435 | c->mutable_cf_options()->paranoid_file_checks, |
7c673cae | 3436 | c->mutable_cf_options()->report_bg_io_stats, dbname_, |
20effc67 | 3437 | &compaction_job_stats, thread_pri, io_tracer_, |
1e59de90 TL |
3438 | is_manual ? manual_compaction->canceled |
3439 | : kManualCompactionCanceledFalse_, | |
3440 | db_id_, db_session_id_, c->column_family_data()->GetFullHistoryTsLow(), | |
3441 | c->trim_ts(), &blob_callback_, &bg_compaction_scheduled_, | |
3442 | &bg_bottom_compaction_scheduled_); | |
7c673cae FG |
3443 | compaction_job.Prepare(); |
3444 | ||
494da23a TL |
3445 | NotifyOnCompactionBegin(c->column_family_data(), c.get(), status, |
3446 | compaction_job_stats, job_context->job_id); | |
7c673cae | 3447 | mutex_.Unlock(); |
f67539c2 TL |
3448 | TEST_SYNC_POINT_CALLBACK( |
3449 | "DBImpl::BackgroundCompaction:NonTrivial:BeforeRun", nullptr); | |
20effc67 TL |
3450 | // Should handle erorr? |
3451 | compaction_job.Run().PermitUncheckedError(); | |
7c673cae FG |
3452 | TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun"); |
3453 | mutex_.Lock(); | |
3454 | ||
3455 | status = compaction_job.Install(*c->mutable_cf_options()); | |
20effc67 | 3456 | io_s = compaction_job.io_status(); |
7c673cae | 3457 | if (status.ok()) { |
494da23a TL |
3458 | InstallSuperVersionAndScheduleWork(c->column_family_data(), |
3459 | &job_context->superversion_contexts[0], | |
3460 | *c->mutable_cf_options()); | |
7c673cae FG |
3461 | } |
3462 | *made_progress = true; | |
494da23a TL |
3463 | TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction", |
3464 | c->column_family_data()); | |
7c673cae | 3465 | } |
20effc67 TL |
3466 | |
3467 | if (status.ok() && !io_s.ok()) { | |
3468 | status = io_s; | |
3469 | } else { | |
3470 | io_s.PermitUncheckedError(); | |
3471 | } | |
3472 | ||
7c673cae FG |
3473 | if (c != nullptr) { |
3474 | c->ReleaseCompactionFiles(status); | |
3475 | *made_progress = true; | |
11fdf7f2 TL |
3476 | |
3477 | #ifndef ROCKSDB_LITE | |
3478 | // Need to make sure SstFileManager does its bookkeeping | |
3479 | auto sfm = static_cast<SstFileManagerImpl*>( | |
3480 | immutable_db_options_.sst_file_manager.get()); | |
3481 | if (sfm && sfm_reserved_compact_space) { | |
3482 | sfm->OnCompactionCompletion(c.get()); | |
3483 | } | |
3484 | #endif // ROCKSDB_LITE | |
3485 | ||
3486 | NotifyOnCompactionCompleted(c->column_family_data(), c.get(), status, | |
3487 | compaction_job_stats, job_context->job_id); | |
7c673cae | 3488 | } |
7c673cae | 3489 | |
f67539c2 TL |
3490 | if (status.ok() || status.IsCompactionTooLarge() || |
3491 | status.IsManualCompactionPaused()) { | |
7c673cae | 3492 | // Done |
f67539c2 | 3493 | } else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) { |
7c673cae FG |
3494 | // Ignore compaction errors found during shutting down |
3495 | } else { | |
3496 | ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s", | |
3497 | status.ToString().c_str()); | |
20effc67 TL |
3498 | if (!io_s.ok()) { |
3499 | // Error while writing to MANIFEST. | |
3500 | // In fact, versions_->io_status() can also be the result of renaming | |
3501 | // CURRENT file. With current code, it's just difficult to tell. So just | |
3502 | // be pessimistic and try write to a new MANIFEST. | |
3503 | // TODO: distinguish between MANIFEST write and CURRENT renaming | |
3504 | auto err_reason = versions_->io_status().ok() | |
3505 | ? BackgroundErrorReason::kCompaction | |
3506 | : BackgroundErrorReason::kManifestWrite; | |
1e59de90 | 3507 | error_handler_.SetBGError(io_s, err_reason); |
20effc67 | 3508 | } else { |
1e59de90 | 3509 | error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction); |
20effc67 | 3510 | } |
11fdf7f2 TL |
3511 | if (c != nullptr && !is_manual && !error_handler_.IsBGWorkStopped()) { |
3512 | // Put this cfd back in the compaction queue so we can retry after some | |
3513 | // time | |
3514 | auto cfd = c->column_family_data(); | |
3515 | assert(cfd != nullptr); | |
3516 | // Since this compaction failed, we need to recompute the score so it | |
3517 | // takes the original input files into account | |
3518 | c->column_family_data() | |
3519 | ->current() | |
3520 | ->storage_info() | |
1e59de90 | 3521 | ->ComputeCompactionScore(*(c->immutable_options()), |
11fdf7f2 TL |
3522 | *(c->mutable_cf_options())); |
3523 | if (!cfd->queued_for_compaction()) { | |
3524 | AddToCompactionQueue(cfd); | |
3525 | ++unscheduled_compactions_; | |
3526 | } | |
7c673cae FG |
3527 | } |
3528 | } | |
11fdf7f2 TL |
3529 | // this will unref its input_version and column_family_data |
3530 | c.reset(); | |
7c673cae FG |
3531 | |
3532 | if (is_manual) { | |
11fdf7f2 | 3533 | ManualCompactionState* m = manual_compaction; |
7c673cae FG |
3534 | if (!status.ok()) { |
3535 | m->status = status; | |
3536 | m->done = true; | |
3537 | } | |
3538 | // For universal compaction: | |
3539 | // Because universal compaction always happens at level 0, so one | |
3540 | // compaction will pick up all overlapped files. No files will be | |
3541 | // filtered out due to size limit and left for a successive compaction. | |
3542 | // So we can safely conclude the current compaction. | |
3543 | // | |
3544 | // Also note that, if we don't stop here, then the current compaction | |
3545 | // writes a new file back to level 0, which will be used in successive | |
3546 | // compaction. Hence the manual compaction will never finish. | |
3547 | // | |
3548 | // Stop the compaction if manual_end points to nullptr -- this means | |
3549 | // that we compacted the whole range. manual_end should always point | |
3550 | // to nullptr in case of universal compaction | |
3551 | if (m->manual_end == nullptr) { | |
3552 | m->done = true; | |
3553 | } | |
3554 | if (!m->done) { | |
3555 | // We only compacted part of the requested range. Update *m | |
3556 | // to the range that is left to be compacted. | |
3557 | // Universal and FIFO compactions should always compact the whole range | |
3558 | assert(m->cfd->ioptions()->compaction_style != | |
3559 | kCompactionStyleUniversal || | |
3560 | m->cfd->ioptions()->num_levels > 1); | |
3561 | assert(m->cfd->ioptions()->compaction_style != kCompactionStyleFIFO); | |
3562 | m->tmp_storage = *m->manual_end; | |
3563 | m->begin = &m->tmp_storage; | |
3564 | m->incomplete = true; | |
3565 | } | |
11fdf7f2 | 3566 | m->in_progress = false; // not being processed anymore |
7c673cae FG |
3567 | } |
3568 | TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Finish"); | |
3569 | return status; | |
3570 | } | |
3571 | ||
3572 | bool DBImpl::HasPendingManualCompaction() { | |
3573 | return (!manual_compaction_dequeue_.empty()); | |
3574 | } | |
3575 | ||
11fdf7f2 | 3576 | void DBImpl::AddManualCompaction(DBImpl::ManualCompactionState* m) { |
1e59de90 | 3577 | assert(manual_compaction_paused_ == 0); |
7c673cae FG |
3578 | manual_compaction_dequeue_.push_back(m); |
3579 | } | |
3580 | ||
11fdf7f2 | 3581 | void DBImpl::RemoveManualCompaction(DBImpl::ManualCompactionState* m) { |
7c673cae | 3582 | // Remove from queue |
11fdf7f2 | 3583 | std::deque<ManualCompactionState*>::iterator it = |
7c673cae FG |
3584 | manual_compaction_dequeue_.begin(); |
3585 | while (it != manual_compaction_dequeue_.end()) { | |
3586 | if (m == (*it)) { | |
3587 | it = manual_compaction_dequeue_.erase(it); | |
3588 | return; | |
3589 | } | |
f67539c2 | 3590 | ++it; |
7c673cae FG |
3591 | } |
3592 | assert(false); | |
3593 | return; | |
3594 | } | |
3595 | ||
11fdf7f2 | 3596 | bool DBImpl::ShouldntRunManualCompaction(ManualCompactionState* m) { |
7c673cae FG |
3597 | if (num_running_ingest_file_ > 0) { |
3598 | // We need to wait for other IngestExternalFile() calls to finish | |
3599 | // before running a manual compaction. | |
3600 | return true; | |
3601 | } | |
3602 | if (m->exclusive) { | |
11fdf7f2 TL |
3603 | return (bg_bottom_compaction_scheduled_ > 0 || |
3604 | bg_compaction_scheduled_ > 0); | |
7c673cae | 3605 | } |
11fdf7f2 | 3606 | std::deque<ManualCompactionState*>::iterator it = |
7c673cae FG |
3607 | manual_compaction_dequeue_.begin(); |
3608 | bool seen = false; | |
3609 | while (it != manual_compaction_dequeue_.end()) { | |
3610 | if (m == (*it)) { | |
f67539c2 | 3611 | ++it; |
7c673cae FG |
3612 | seen = true; |
3613 | continue; | |
3614 | } else if (MCOverlap(m, (*it)) && (!seen && !(*it)->in_progress)) { | |
3615 | // Consider the other manual compaction *it, conflicts if: | |
3616 | // overlaps with m | |
3617 | // and (*it) is ahead in the queue and is not yet in progress | |
3618 | return true; | |
3619 | } | |
f67539c2 | 3620 | ++it; |
7c673cae FG |
3621 | } |
3622 | return false; | |
3623 | } | |
3624 | ||
3625 | bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) { | |
3626 | // Remove from priority queue | |
11fdf7f2 | 3627 | std::deque<ManualCompactionState*>::iterator it = |
7c673cae FG |
3628 | manual_compaction_dequeue_.begin(); |
3629 | while (it != manual_compaction_dequeue_.end()) { | |
3630 | if ((*it)->exclusive) { | |
3631 | return true; | |
3632 | } | |
3633 | if ((cfd == (*it)->cfd) && (!((*it)->in_progress || (*it)->done))) { | |
3634 | // Allow automatic compaction if manual compaction is | |
11fdf7f2 | 3635 | // in progress |
7c673cae FG |
3636 | return true; |
3637 | } | |
f67539c2 | 3638 | ++it; |
7c673cae FG |
3639 | } |
3640 | return false; | |
3641 | } | |
3642 | ||
3643 | bool DBImpl::HasExclusiveManualCompaction() { | |
3644 | // Remove from priority queue | |
11fdf7f2 | 3645 | std::deque<ManualCompactionState*>::iterator it = |
7c673cae FG |
3646 | manual_compaction_dequeue_.begin(); |
3647 | while (it != manual_compaction_dequeue_.end()) { | |
3648 | if ((*it)->exclusive) { | |
3649 | return true; | |
3650 | } | |
f67539c2 | 3651 | ++it; |
7c673cae FG |
3652 | } |
3653 | return false; | |
3654 | } | |
3655 | ||
11fdf7f2 | 3656 | bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) { |
7c673cae FG |
3657 | if ((m->exclusive) || (m1->exclusive)) { |
3658 | return true; | |
3659 | } | |
3660 | if (m->cfd != m1->cfd) { | |
3661 | return false; | |
3662 | } | |
1e59de90 | 3663 | return false; |
7c673cae FG |
3664 | } |
3665 | ||
494da23a TL |
3666 | #ifndef ROCKSDB_LITE |
3667 | void DBImpl::BuildCompactionJobInfo( | |
3668 | const ColumnFamilyData* cfd, Compaction* c, const Status& st, | |
3669 | const CompactionJobStats& compaction_job_stats, const int job_id, | |
3670 | const Version* current, CompactionJobInfo* compaction_job_info) const { | |
3671 | assert(compaction_job_info != nullptr); | |
3672 | compaction_job_info->cf_id = cfd->GetID(); | |
3673 | compaction_job_info->cf_name = cfd->GetName(); | |
3674 | compaction_job_info->status = st; | |
3675 | compaction_job_info->thread_id = env_->GetThreadID(); | |
3676 | compaction_job_info->job_id = job_id; | |
3677 | compaction_job_info->base_input_level = c->start_level(); | |
3678 | compaction_job_info->output_level = c->output_level(); | |
3679 | compaction_job_info->stats = compaction_job_stats; | |
3680 | compaction_job_info->table_properties = c->GetOutputTableProperties(); | |
3681 | compaction_job_info->compaction_reason = c->compaction_reason(); | |
3682 | compaction_job_info->compression = c->output_compression(); | |
3683 | for (size_t i = 0; i < c->num_input_levels(); ++i) { | |
3684 | for (const auto fmd : *c->inputs(i)) { | |
f67539c2 TL |
3685 | const FileDescriptor& desc = fmd->fd; |
3686 | const uint64_t file_number = desc.GetNumber(); | |
1e59de90 | 3687 | auto fn = TableFileName(c->immutable_options()->cf_paths, file_number, |
f67539c2 | 3688 | desc.GetPathId()); |
494da23a | 3689 | compaction_job_info->input_files.push_back(fn); |
f67539c2 TL |
3690 | compaction_job_info->input_file_infos.push_back(CompactionFileInfo{ |
3691 | static_cast<int>(i), file_number, fmd->oldest_blob_file_number}); | |
494da23a TL |
3692 | if (compaction_job_info->table_properties.count(fn) == 0) { |
3693 | std::shared_ptr<const TableProperties> tp; | |
3694 | auto s = current->GetTableProperties(&tp, fmd, &fn); | |
3695 | if (s.ok()) { | |
3696 | compaction_job_info->table_properties[fn] = tp; | |
3697 | } | |
3698 | } | |
3699 | } | |
3700 | } | |
3701 | for (const auto& newf : c->edit()->GetNewFiles()) { | |
f67539c2 TL |
3702 | const FileMetaData& meta = newf.second; |
3703 | const FileDescriptor& desc = meta.fd; | |
3704 | const uint64_t file_number = desc.GetNumber(); | |
3705 | compaction_job_info->output_files.push_back(TableFileName( | |
1e59de90 | 3706 | c->immutable_options()->cf_paths, file_number, desc.GetPathId())); |
f67539c2 TL |
3707 | compaction_job_info->output_file_infos.push_back(CompactionFileInfo{ |
3708 | newf.first, file_number, meta.oldest_blob_file_number}); | |
494da23a | 3709 | } |
1e59de90 TL |
3710 | compaction_job_info->blob_compression_type = |
3711 | c->mutable_cf_options()->blob_compression_type; | |
3712 | ||
3713 | // Update BlobFilesInfo. | |
3714 | for (const auto& blob_file : c->edit()->GetBlobFileAdditions()) { | |
3715 | BlobFileAdditionInfo blob_file_addition_info( | |
3716 | BlobFileName(c->immutable_options()->cf_paths.front().path, | |
3717 | blob_file.GetBlobFileNumber()) /*blob_file_path*/, | |
3718 | blob_file.GetBlobFileNumber(), blob_file.GetTotalBlobCount(), | |
3719 | blob_file.GetTotalBlobBytes()); | |
3720 | compaction_job_info->blob_file_addition_infos.emplace_back( | |
3721 | std::move(blob_file_addition_info)); | |
3722 | } | |
3723 | ||
3724 | // Update BlobFilesGarbageInfo. | |
3725 | for (const auto& blob_file : c->edit()->GetBlobFileGarbages()) { | |
3726 | BlobFileGarbageInfo blob_file_garbage_info( | |
3727 | BlobFileName(c->immutable_options()->cf_paths.front().path, | |
3728 | blob_file.GetBlobFileNumber()) /*blob_file_path*/, | |
3729 | blob_file.GetBlobFileNumber(), blob_file.GetGarbageBlobCount(), | |
3730 | blob_file.GetGarbageBlobBytes()); | |
3731 | compaction_job_info->blob_file_garbage_infos.emplace_back( | |
3732 | std::move(blob_file_garbage_info)); | |
3733 | } | |
494da23a TL |
3734 | } |
3735 | #endif | |
3736 | ||
11fdf7f2 TL |
3737 | // SuperVersionContext gets created and destructed outside of the lock -- |
3738 | // we use this conveniently to: | |
7c673cae FG |
3739 | // * malloc one SuperVersion() outside of the lock -- new_superversion |
3740 | // * delete SuperVersion()s outside of the lock -- superversions_to_free | |
3741 | // | |
3742 | // However, if InstallSuperVersionAndScheduleWork() gets called twice with the | |
11fdf7f2 | 3743 | // same sv_context, we can't reuse the SuperVersion() that got |
7c673cae FG |
3744 | // malloced because |
3745 | // first call already used it. In that rare case, we take a hit and create a | |
3746 | // new SuperVersion() inside of the mutex. We do similar thing | |
3747 | // for superversion_to_free | |
7c673cae | 3748 | |
11fdf7f2 TL |
3749 | void DBImpl::InstallSuperVersionAndScheduleWork( |
3750 | ColumnFamilyData* cfd, SuperVersionContext* sv_context, | |
494da23a | 3751 | const MutableCFOptions& mutable_cf_options) { |
7c673cae FG |
3752 | mutex_.AssertHeld(); |
3753 | ||
3754 | // Update max_total_in_memory_state_ | |
3755 | size_t old_memtable_size = 0; | |
3756 | auto* old_sv = cfd->GetSuperVersion(); | |
3757 | if (old_sv) { | |
3758 | old_memtable_size = old_sv->mutable_cf_options.write_buffer_size * | |
3759 | old_sv->mutable_cf_options.max_write_buffer_number; | |
3760 | } | |
3761 | ||
11fdf7f2 TL |
3762 | // this branch is unlikely to step in |
3763 | if (UNLIKELY(sv_context->new_superversion == nullptr)) { | |
3764 | sv_context->NewSuperVersion(); | |
3765 | } | |
1e59de90 | 3766 | cfd->InstallSuperVersion(sv_context, mutable_cf_options); |
7c673cae | 3767 | |
494da23a TL |
3768 | // There may be a small data race here. The snapshot tricking bottommost |
3769 | // compaction may already be released here. But assuming there will always be | |
3770 | // newer snapshot created and released frequently, the compaction will be | |
3771 | // triggered soon anyway. | |
3772 | bottommost_files_mark_threshold_ = kMaxSequenceNumber; | |
3773 | for (auto* my_cfd : *versions_->GetColumnFamilySet()) { | |
1e59de90 TL |
3774 | if (!my_cfd->ioptions()->allow_ingest_behind) { |
3775 | bottommost_files_mark_threshold_ = std::min( | |
3776 | bottommost_files_mark_threshold_, | |
3777 | my_cfd->current()->storage_info()->bottommost_files_mark_threshold()); | |
3778 | } | |
494da23a TL |
3779 | } |
3780 | ||
7c673cae FG |
3781 | // Whenever we install new SuperVersion, we might need to issue new flushes or |
3782 | // compactions. | |
7c673cae FG |
3783 | SchedulePendingCompaction(cfd); |
3784 | MaybeScheduleFlushOrCompaction(); | |
3785 | ||
3786 | // Update max_total_in_memory_state_ | |
11fdf7f2 TL |
3787 | max_total_in_memory_state_ = max_total_in_memory_state_ - old_memtable_size + |
3788 | mutable_cf_options.write_buffer_size * | |
3789 | mutable_cf_options.max_write_buffer_number; | |
3790 | } | |
3791 | ||
3792 | // ShouldPurge is called by FindObsoleteFiles when doing a full scan, | |
f67539c2 | 3793 | // and db mutex (mutex_) should already be held. |
11fdf7f2 TL |
3794 | // Actually, the current implementation of FindObsoleteFiles with |
3795 | // full_scan=true can issue I/O requests to obtain list of files in | |
3796 | // directories, e.g. env_->getChildren while holding db mutex. | |
11fdf7f2 | 3797 | bool DBImpl::ShouldPurge(uint64_t file_number) const { |
f67539c2 TL |
3798 | return files_grabbed_for_purge_.find(file_number) == |
3799 | files_grabbed_for_purge_.end() && | |
3800 | purge_files_.find(file_number) == purge_files_.end(); | |
11fdf7f2 TL |
3801 | } |
3802 | ||
3803 | // MarkAsGrabbedForPurge is called by FindObsoleteFiles, and db mutex | |
3804 | // (mutex_) should already be held. | |
3805 | void DBImpl::MarkAsGrabbedForPurge(uint64_t file_number) { | |
f67539c2 | 3806 | files_grabbed_for_purge_.insert(file_number); |
11fdf7f2 TL |
3807 | } |
3808 | ||
3809 | void DBImpl::SetSnapshotChecker(SnapshotChecker* snapshot_checker) { | |
3810 | InstrumentedMutexLock l(&mutex_); | |
3811 | // snapshot_checker_ should only set once. If we need to set it multiple | |
3812 | // times, we need to make sure the old one is not deleted while it is still | |
3813 | // using by a compaction job. | |
3814 | assert(!snapshot_checker_); | |
3815 | snapshot_checker_.reset(snapshot_checker); | |
7c673cae | 3816 | } |
494da23a TL |
3817 | |
3818 | void DBImpl::GetSnapshotContext( | |
3819 | JobContext* job_context, std::vector<SequenceNumber>* snapshot_seqs, | |
3820 | SequenceNumber* earliest_write_conflict_snapshot, | |
3821 | SnapshotChecker** snapshot_checker_ptr) { | |
3822 | mutex_.AssertHeld(); | |
3823 | assert(job_context != nullptr); | |
3824 | assert(snapshot_seqs != nullptr); | |
3825 | assert(earliest_write_conflict_snapshot != nullptr); | |
3826 | assert(snapshot_checker_ptr != nullptr); | |
3827 | ||
3828 | *snapshot_checker_ptr = snapshot_checker_.get(); | |
3829 | if (use_custom_gc_ && *snapshot_checker_ptr == nullptr) { | |
3830 | *snapshot_checker_ptr = DisableGCSnapshotChecker::Instance(); | |
3831 | } | |
3832 | if (*snapshot_checker_ptr != nullptr) { | |
3833 | // If snapshot_checker is used, that means the flush/compaction may | |
3834 | // contain values not visible to snapshot taken after | |
3835 | // flush/compaction job starts. Take a snapshot and it will appear | |
3836 | // in snapshot_seqs and force compaction iterator to consider such | |
3837 | // snapshots. | |
3838 | const Snapshot* job_snapshot = | |
3839 | GetSnapshotImpl(false /*write_conflict_boundary*/, false /*lock*/); | |
3840 | job_context->job_snapshot.reset(new ManagedSnapshot(this, job_snapshot)); | |
3841 | } | |
3842 | *snapshot_seqs = snapshots_.GetAll(earliest_write_conflict_snapshot); | |
3843 | } | |
1e59de90 TL |
3844 | |
3845 | Status DBImpl::WaitForCompact(bool wait_unscheduled) { | |
3846 | // Wait until the compaction completes | |
3847 | InstrumentedMutexLock l(&mutex_); | |
3848 | while ((bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ || | |
3849 | bg_flush_scheduled_ || | |
3850 | (wait_unscheduled && unscheduled_compactions_)) && | |
3851 | (error_handler_.GetBGError().ok())) { | |
3852 | bg_cv_.Wait(); | |
3853 | } | |
3854 | return error_handler_.GetBGError(); | |
3855 | } | |
3856 | ||
f67539c2 | 3857 | } // namespace ROCKSDB_NAMESPACE |