]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | #include "db/db_impl.h" | |
10 | ||
11 | #ifndef __STDC_FORMAT_MACROS | |
12 | #define __STDC_FORMAT_MACROS | |
13 | #endif | |
14 | #include <inttypes.h> | |
15 | ||
16 | #include "db/builder.h" | |
11fdf7f2 TL |
17 | #include "db/error_handler.h" |
18 | #include "db/event_helpers.h" | |
7c673cae FG |
19 | #include "monitoring/iostats_context_imp.h" |
20 | #include "monitoring/perf_context_imp.h" | |
21 | #include "monitoring/thread_status_updater.h" | |
22 | #include "monitoring/thread_status_util.h" | |
494da23a | 23 | #include "util/concurrent_task_limiter_impl.h" |
7c673cae FG |
24 | #include "util/sst_file_manager_impl.h" |
25 | #include "util/sync_point.h" | |
26 | ||
27 | namespace rocksdb { | |
11fdf7f2 TL |
28 | |
29 | bool DBImpl::EnoughRoomForCompaction( | |
30 | ColumnFamilyData* cfd, const std::vector<CompactionInputFiles>& inputs, | |
31 | bool* sfm_reserved_compact_space, LogBuffer* log_buffer) { | |
32 | // Check if we have enough room to do the compaction | |
33 | bool enough_room = true; | |
34 | #ifndef ROCKSDB_LITE | |
35 | auto sfm = static_cast<SstFileManagerImpl*>( | |
36 | immutable_db_options_.sst_file_manager.get()); | |
37 | if (sfm) { | |
38 | // Pass the current bg_error_ to SFM so it can decide what checks to | |
39 | // perform. If this DB instance hasn't seen any error yet, the SFM can be | |
40 | // optimistic and not do disk space checks | |
41 | enough_room = | |
42 | sfm->EnoughRoomForCompaction(cfd, inputs, error_handler_.GetBGError()); | |
43 | if (enough_room) { | |
44 | *sfm_reserved_compact_space = true; | |
45 | } | |
46 | } | |
47 | #else | |
48 | (void)cfd; | |
49 | (void)inputs; | |
50 | (void)sfm_reserved_compact_space; | |
51 | #endif // ROCKSDB_LITE | |
52 | if (!enough_room) { | |
53 | // Just in case tests want to change the value of enough_room | |
54 | TEST_SYNC_POINT_CALLBACK( | |
55 | "DBImpl::BackgroundCompaction():CancelledCompaction", &enough_room); | |
56 | ROCKS_LOG_BUFFER(log_buffer, | |
57 | "Cancelled compaction because not enough room"); | |
58 | RecordTick(stats_, COMPACTION_CANCELLED, 1); | |
59 | } | |
60 | return enough_room; | |
61 | } | |
62 | ||
494da23a TL |
63 | bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force, |
64 | std::unique_ptr<TaskLimiterToken>* token, | |
65 | LogBuffer* log_buffer) { | |
66 | assert(*token == nullptr); | |
67 | auto limiter = static_cast<ConcurrentTaskLimiterImpl*>( | |
68 | cfd->ioptions()->compaction_thread_limiter.get()); | |
69 | if (limiter == nullptr) { | |
70 | return true; | |
71 | } | |
72 | *token = limiter->GetToken(force); | |
73 | if (*token != nullptr) { | |
74 | ROCKS_LOG_BUFFER(log_buffer, | |
75 | "Thread limiter [%s] increase [%s] compaction task, " | |
76 | "force: %s, tasks after: %d", | |
77 | limiter->GetName().c_str(), cfd->GetName().c_str(), | |
78 | force ? "true" : "false", limiter->GetOutstandingTask()); | |
79 | return true; | |
80 | } | |
81 | return false; | |
82 | } | |
83 | ||
7c673cae FG |
84 | Status DBImpl::SyncClosedLogs(JobContext* job_context) { |
85 | TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start"); | |
86 | mutex_.AssertHeld(); | |
87 | autovector<log::Writer*, 1> logs_to_sync; | |
88 | uint64_t current_log_number = logfile_number_; | |
89 | while (logs_.front().number < current_log_number && | |
90 | logs_.front().getting_synced) { | |
91 | log_sync_cv_.Wait(); | |
92 | } | |
93 | for (auto it = logs_.begin(); | |
94 | it != logs_.end() && it->number < current_log_number; ++it) { | |
95 | auto& log = *it; | |
96 | assert(!log.getting_synced); | |
97 | log.getting_synced = true; | |
98 | logs_to_sync.push_back(log.writer); | |
99 | } | |
100 | ||
101 | Status s; | |
102 | if (!logs_to_sync.empty()) { | |
103 | mutex_.Unlock(); | |
104 | ||
105 | for (log::Writer* log : logs_to_sync) { | |
106 | ROCKS_LOG_INFO(immutable_db_options_.info_log, | |
107 | "[JOB %d] Syncing log #%" PRIu64, job_context->job_id, | |
108 | log->get_log_number()); | |
109 | s = log->file()->Sync(immutable_db_options_.use_fsync); | |
11fdf7f2 TL |
110 | if (!s.ok()) { |
111 | break; | |
112 | } | |
7c673cae FG |
113 | } |
114 | if (s.ok()) { | |
115 | s = directories_.GetWalDir()->Fsync(); | |
116 | } | |
117 | ||
118 | mutex_.Lock(); | |
119 | ||
120 | // "number <= current_log_number - 1" is equivalent to | |
121 | // "number < current_log_number". | |
122 | MarkLogsSynced(current_log_number - 1, true, s); | |
123 | if (!s.ok()) { | |
11fdf7f2 | 124 | error_handler_.SetBGError(s, BackgroundErrorReason::kFlush); |
7c673cae FG |
125 | TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed"); |
126 | return s; | |
127 | } | |
128 | } | |
129 | return s; | |
130 | } | |
131 | ||
132 | Status DBImpl::FlushMemTableToOutputFile( | |
133 | ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, | |
11fdf7f2 | 134 | bool* made_progress, JobContext* job_context, |
494da23a TL |
135 | SuperVersionContext* superversion_context, |
136 | std::vector<SequenceNumber>& snapshot_seqs, | |
137 | SequenceNumber earliest_write_conflict_snapshot, | |
138 | SnapshotChecker* snapshot_checker, LogBuffer* log_buffer, | |
139 | Env::Priority thread_pri) { | |
7c673cae FG |
140 | mutex_.AssertHeld(); |
141 | assert(cfd->imm()->NumNotFlushed() != 0); | |
142 | assert(cfd->imm()->IsFlushPending()); | |
143 | ||
7c673cae | 144 | FlushJob flush_job( |
11fdf7f2 | 145 | dbname_, cfd, immutable_db_options_, mutable_cf_options, |
494da23a TL |
146 | nullptr /* memtable_id */, env_options_for_compaction_, versions_.get(), |
147 | &mutex_, &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot, | |
148 | snapshot_checker, job_context, log_buffer, directories_.GetDbDir(), | |
149 | GetDataDir(cfd, 0U), | |
7c673cae | 150 | GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, |
494da23a TL |
151 | &event_logger_, mutable_cf_options.report_bg_io_stats, |
152 | true /* sync_output_directory */, true /* write_manifest */, thread_pri); | |
7c673cae FG |
153 | |
154 | FileMetaData file_meta; | |
155 | ||
494da23a | 156 | TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables"); |
7c673cae | 157 | flush_job.PickMemTable(); |
494da23a | 158 | TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:AfterPickMemtables"); |
7c673cae FG |
159 | |
160 | #ifndef ROCKSDB_LITE | |
161 | // may temporarily unlock and lock the mutex. | |
162 | NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id, | |
163 | flush_job.GetTableProperties()); | |
164 | #endif // ROCKSDB_LITE | |
165 | ||
166 | Status s; | |
167 | if (logfile_number_ > 0 && | |
494da23a | 168 | versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1) { |
7c673cae FG |
169 | // If there are more than one column families, we need to make sure that |
170 | // all the log files except the most recent one are synced. Otherwise if | |
171 | // the host crashes after flushing and before WAL is persistent, the | |
172 | // flushed SST may contain data from write batches whose updates to | |
173 | // other column families are missing. | |
174 | // SyncClosedLogs() may unlock and re-lock the db_mutex. | |
175 | s = SyncClosedLogs(job_context); | |
494da23a TL |
176 | } else { |
177 | TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Skip"); | |
7c673cae FG |
178 | } |
179 | ||
180 | // Within flush_job.Run, rocksdb may call event listener to notify | |
181 | // file creation and deletion. | |
182 | // | |
183 | // Note that flush_job.Run will unlock and lock the db_mutex, | |
184 | // and EventListener callback will be called when the db_mutex | |
185 | // is unlocked by the current thread. | |
186 | if (s.ok()) { | |
11fdf7f2 | 187 | s = flush_job.Run(&logs_with_prep_tracker_, &file_meta); |
7c673cae FG |
188 | } else { |
189 | flush_job.Cancel(); | |
190 | } | |
191 | ||
192 | if (s.ok()) { | |
11fdf7f2 TL |
193 | InstallSuperVersionAndScheduleWork(cfd, superversion_context, |
194 | mutable_cf_options); | |
7c673cae | 195 | if (made_progress) { |
494da23a | 196 | *made_progress = true; |
7c673cae FG |
197 | } |
198 | VersionStorageInfo::LevelSummaryStorage tmp; | |
199 | ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n", | |
200 | cfd->GetName().c_str(), | |
201 | cfd->current()->storage_info()->LevelSummary(&tmp)); | |
202 | } | |
203 | ||
11fdf7f2 TL |
204 | if (!s.ok() && !s.IsShutdownInProgress()) { |
205 | Status new_bg_error = s; | |
206 | error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); | |
7c673cae FG |
207 | } |
208 | if (s.ok()) { | |
209 | #ifndef ROCKSDB_LITE | |
210 | // may temporarily unlock and lock the mutex. | |
211 | NotifyOnFlushCompleted(cfd, &file_meta, mutable_cf_options, | |
212 | job_context->job_id, flush_job.GetTableProperties()); | |
213 | auto sfm = static_cast<SstFileManagerImpl*>( | |
214 | immutable_db_options_.sst_file_manager.get()); | |
215 | if (sfm) { | |
216 | // Notify sst_file_manager that a new file was added | |
217 | std::string file_path = MakeTableFileName( | |
11fdf7f2 | 218 | cfd->ioptions()->cf_paths[0].path, file_meta.fd.GetNumber()); |
7c673cae | 219 | sfm->OnAddFile(file_path); |
11fdf7f2 | 220 | if (sfm->IsMaxAllowedSpaceReached()) { |
494da23a TL |
221 | Status new_bg_error = |
222 | Status::SpaceLimit("Max allowed space was reached"); | |
7c673cae FG |
223 | TEST_SYNC_POINT_CALLBACK( |
224 | "DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached", | |
11fdf7f2 TL |
225 | &new_bg_error); |
226 | error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); | |
7c673cae FG |
227 | } |
228 | } | |
229 | #endif // ROCKSDB_LITE | |
230 | } | |
231 | return s; | |
232 | } | |
233 | ||
11fdf7f2 TL |
234 | Status DBImpl::FlushMemTablesToOutputFiles( |
235 | const autovector<BGFlushArg>& bg_flush_args, bool* made_progress, | |
494da23a TL |
236 | JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) { |
237 | if (immutable_db_options_.atomic_flush) { | |
238 | return AtomicFlushMemTablesToOutputFiles( | |
239 | bg_flush_args, made_progress, job_context, log_buffer, thread_pri); | |
240 | } | |
241 | std::vector<SequenceNumber> snapshot_seqs; | |
242 | SequenceNumber earliest_write_conflict_snapshot; | |
243 | SnapshotChecker* snapshot_checker; | |
244 | GetSnapshotContext(job_context, &snapshot_seqs, | |
245 | &earliest_write_conflict_snapshot, &snapshot_checker); | |
246 | Status status; | |
11fdf7f2 TL |
247 | for (auto& arg : bg_flush_args) { |
248 | ColumnFamilyData* cfd = arg.cfd_; | |
494da23a | 249 | MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions(); |
11fdf7f2 | 250 | SuperVersionContext* superversion_context = arg.superversion_context_; |
494da23a TL |
251 | Status s = FlushMemTableToOutputFile( |
252 | cfd, mutable_cf_options, made_progress, job_context, | |
253 | superversion_context, snapshot_seqs, earliest_write_conflict_snapshot, | |
254 | snapshot_checker, log_buffer, thread_pri); | |
11fdf7f2 | 255 | if (!s.ok()) { |
494da23a TL |
256 | status = s; |
257 | if (!s.IsShutdownInProgress()) { | |
258 | // At this point, DB is not shutting down, nor is cfd dropped. | |
259 | // Something is wrong, thus we break out of the loop. | |
260 | break; | |
261 | } | |
262 | } | |
263 | } | |
264 | return status; | |
265 | } | |
266 | ||
267 | /* | |
268 | * Atomically flushes multiple column families. | |
269 | * | |
270 | * For each column family, all memtables with ID smaller than or equal to the | |
271 | * ID specified in bg_flush_args will be flushed. Only after all column | |
272 | * families finish flush will this function commit to MANIFEST. If any of the | |
273 | * column families are not flushed successfully, this function does not have | |
274 | * any side-effect on the state of the database. | |
275 | */ | |
276 | Status DBImpl::AtomicFlushMemTablesToOutputFiles( | |
277 | const autovector<BGFlushArg>& bg_flush_args, bool* made_progress, | |
278 | JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) { | |
279 | mutex_.AssertHeld(); | |
280 | ||
281 | autovector<ColumnFamilyData*> cfds; | |
282 | for (const auto& arg : bg_flush_args) { | |
283 | cfds.emplace_back(arg.cfd_); | |
284 | } | |
285 | ||
286 | #ifndef NDEBUG | |
287 | for (const auto cfd : cfds) { | |
288 | assert(cfd->imm()->NumNotFlushed() != 0); | |
289 | assert(cfd->imm()->IsFlushPending()); | |
290 | } | |
291 | #endif /* !NDEBUG */ | |
292 | ||
293 | std::vector<SequenceNumber> snapshot_seqs; | |
294 | SequenceNumber earliest_write_conflict_snapshot; | |
295 | SnapshotChecker* snapshot_checker; | |
296 | GetSnapshotContext(job_context, &snapshot_seqs, | |
297 | &earliest_write_conflict_snapshot, &snapshot_checker); | |
298 | ||
299 | autovector<Directory*> distinct_output_dirs; | |
300 | autovector<std::string> distinct_output_dir_paths; | |
301 | std::vector<FlushJob> jobs; | |
302 | std::vector<MutableCFOptions> all_mutable_cf_options; | |
303 | int num_cfs = static_cast<int>(cfds.size()); | |
304 | all_mutable_cf_options.reserve(num_cfs); | |
305 | for (int i = 0; i < num_cfs; ++i) { | |
306 | auto cfd = cfds[i]; | |
307 | Directory* data_dir = GetDataDir(cfd, 0U); | |
308 | const std::string& curr_path = cfd->ioptions()->cf_paths[0].path; | |
309 | ||
310 | // Add to distinct output directories if eligible. Use linear search. Since | |
311 | // the number of elements in the vector is not large, performance should be | |
312 | // tolerable. | |
313 | bool found = false; | |
314 | for (const auto& path : distinct_output_dir_paths) { | |
315 | if (path == curr_path) { | |
316 | found = true; | |
317 | break; | |
318 | } | |
319 | } | |
320 | if (!found) { | |
321 | distinct_output_dir_paths.emplace_back(curr_path); | |
322 | distinct_output_dirs.emplace_back(data_dir); | |
323 | } | |
324 | ||
325 | all_mutable_cf_options.emplace_back(*cfd->GetLatestMutableCFOptions()); | |
326 | const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.back(); | |
327 | const uint64_t* max_memtable_id = &(bg_flush_args[i].max_memtable_id_); | |
328 | jobs.emplace_back( | |
329 | dbname_, cfd, immutable_db_options_, mutable_cf_options, | |
330 | max_memtable_id, env_options_for_compaction_, versions_.get(), &mutex_, | |
331 | &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot, | |
332 | snapshot_checker, job_context, log_buffer, directories_.GetDbDir(), | |
333 | data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), | |
334 | stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, | |
335 | false /* sync_output_directory */, false /* write_manifest */, | |
336 | thread_pri); | |
337 | jobs.back().PickMemTable(); | |
338 | } | |
339 | ||
340 | std::vector<FileMetaData> file_meta(num_cfs); | |
341 | Status s; | |
342 | assert(num_cfs == static_cast<int>(jobs.size())); | |
343 | ||
344 | #ifndef ROCKSDB_LITE | |
345 | for (int i = 0; i != num_cfs; ++i) { | |
346 | const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.at(i); | |
347 | // may temporarily unlock and lock the mutex. | |
348 | NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options, | |
349 | job_context->job_id, jobs[i].GetTableProperties()); | |
350 | } | |
351 | #endif /* !ROCKSDB_LITE */ | |
352 | ||
353 | if (logfile_number_ > 0) { | |
354 | // TODO (yanqin) investigate whether we should sync the closed logs for | |
355 | // single column family case. | |
356 | s = SyncClosedLogs(job_context); | |
357 | } | |
358 | ||
359 | // exec_status stores the execution status of flush_jobs as | |
360 | // <bool /* executed */, Status /* status code */> | |
361 | autovector<std::pair<bool, Status>> exec_status; | |
362 | for (int i = 0; i != num_cfs; ++i) { | |
363 | // Initially all jobs are not executed, with status OK. | |
364 | exec_status.emplace_back(false, Status::OK()); | |
365 | } | |
366 | ||
367 | if (s.ok()) { | |
368 | // TODO (yanqin): parallelize jobs with threads. | |
369 | for (int i = 1; i != num_cfs; ++i) { | |
370 | exec_status[i].second = | |
371 | jobs[i].Run(&logs_with_prep_tracker_, &file_meta[i]); | |
372 | exec_status[i].first = true; | |
373 | } | |
374 | if (num_cfs > 1) { | |
375 | TEST_SYNC_POINT( | |
376 | "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:1"); | |
377 | TEST_SYNC_POINT( | |
378 | "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2"); | |
379 | } | |
380 | exec_status[0].second = | |
381 | jobs[0].Run(&logs_with_prep_tracker_, &file_meta[0]); | |
382 | exec_status[0].first = true; | |
383 | ||
384 | Status error_status; | |
385 | for (const auto& e : exec_status) { | |
386 | if (!e.second.ok()) { | |
387 | s = e.second; | |
388 | if (!e.second.IsShutdownInProgress()) { | |
389 | // If a flush job did not return OK, and the CF is not dropped, and | |
390 | // the DB is not shutting down, then we have to return this result to | |
391 | // caller later. | |
392 | error_status = e.second; | |
393 | } | |
394 | } | |
395 | } | |
396 | ||
397 | s = error_status.ok() ? s : error_status; | |
398 | } | |
399 | ||
400 | if (s.ok() || s.IsShutdownInProgress()) { | |
401 | // Sync on all distinct output directories. | |
402 | for (auto dir : distinct_output_dirs) { | |
403 | if (dir != nullptr) { | |
404 | s = dir->Fsync(); | |
405 | if (!s.ok()) { | |
406 | break; | |
407 | } | |
408 | } | |
409 | } | |
410 | } | |
411 | ||
412 | if (s.ok()) { | |
413 | auto wait_to_install_func = [&]() { | |
414 | bool ready = true; | |
415 | for (size_t i = 0; i != cfds.size(); ++i) { | |
416 | const auto& mems = jobs[i].GetMemTables(); | |
417 | if (cfds[i]->IsDropped()) { | |
418 | // If the column family is dropped, then do not wait. | |
419 | continue; | |
420 | } else if (!mems.empty() && | |
421 | cfds[i]->imm()->GetEarliestMemTableID() < mems[0]->GetID()) { | |
422 | // If a flush job needs to install the flush result for mems and | |
423 | // mems[0] is not the earliest memtable, it means another thread must | |
424 | // be installing flush results for the same column family, then the | |
425 | // current thread needs to wait. | |
426 | ready = false; | |
427 | break; | |
428 | } else if (mems.empty() && cfds[i]->imm()->GetEarliestMemTableID() <= | |
429 | bg_flush_args[i].max_memtable_id_) { | |
430 | // If a flush job does not need to install flush results, then it has | |
431 | // to wait until all memtables up to max_memtable_id_ (inclusive) are | |
432 | // installed. | |
433 | ready = false; | |
434 | break; | |
435 | } | |
436 | } | |
437 | return ready; | |
438 | }; | |
439 | ||
440 | bool resuming_from_bg_err = error_handler_.IsDBStopped(); | |
441 | while ((!error_handler_.IsDBStopped() || | |
442 | error_handler_.GetRecoveryError().ok()) && | |
443 | !wait_to_install_func()) { | |
444 | atomic_flush_install_cv_.Wait(); | |
445 | } | |
446 | ||
447 | s = resuming_from_bg_err ? error_handler_.GetRecoveryError() | |
448 | : error_handler_.GetBGError(); | |
449 | } | |
450 | ||
451 | if (s.ok()) { | |
452 | autovector<ColumnFamilyData*> tmp_cfds; | |
453 | autovector<const autovector<MemTable*>*> mems_list; | |
454 | autovector<const MutableCFOptions*> mutable_cf_options_list; | |
455 | autovector<FileMetaData*> tmp_file_meta; | |
456 | for (int i = 0; i != num_cfs; ++i) { | |
457 | const auto& mems = jobs[i].GetMemTables(); | |
458 | if (!cfds[i]->IsDropped() && !mems.empty()) { | |
459 | tmp_cfds.emplace_back(cfds[i]); | |
460 | mems_list.emplace_back(&mems); | |
461 | mutable_cf_options_list.emplace_back(&all_mutable_cf_options[i]); | |
462 | tmp_file_meta.emplace_back(&file_meta[i]); | |
463 | } | |
464 | } | |
465 | ||
466 | s = InstallMemtableAtomicFlushResults( | |
467 | nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list, | |
468 | versions_.get(), &mutex_, tmp_file_meta, | |
469 | &job_context->memtables_to_free, directories_.GetDbDir(), log_buffer); | |
470 | } | |
471 | ||
472 | if (s.ok() || s.IsShutdownInProgress()) { | |
473 | assert(num_cfs == | |
474 | static_cast<int>(job_context->superversion_contexts.size())); | |
475 | for (int i = 0; i != num_cfs; ++i) { | |
476 | if (cfds[i]->IsDropped()) { | |
477 | continue; | |
478 | } | |
479 | InstallSuperVersionAndScheduleWork(cfds[i], | |
480 | &job_context->superversion_contexts[i], | |
481 | all_mutable_cf_options[i]); | |
482 | VersionStorageInfo::LevelSummaryStorage tmp; | |
483 | ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n", | |
484 | cfds[i]->GetName().c_str(), | |
485 | cfds[i]->current()->storage_info()->LevelSummary(&tmp)); | |
486 | } | |
487 | if (made_progress) { | |
488 | *made_progress = true; | |
489 | } | |
490 | #ifndef ROCKSDB_LITE | |
491 | auto sfm = static_cast<SstFileManagerImpl*>( | |
492 | immutable_db_options_.sst_file_manager.get()); | |
493 | for (int i = 0; i != num_cfs; ++i) { | |
494 | if (cfds[i]->IsDropped()) { | |
495 | continue; | |
496 | } | |
497 | NotifyOnFlushCompleted(cfds[i], &file_meta[i], all_mutable_cf_options[i], | |
498 | job_context->job_id, jobs[i].GetTableProperties()); | |
499 | if (sfm) { | |
500 | std::string file_path = MakeTableFileName( | |
501 | cfds[i]->ioptions()->cf_paths[0].path, file_meta[i].fd.GetNumber()); | |
502 | sfm->OnAddFile(file_path); | |
503 | if (sfm->IsMaxAllowedSpaceReached() && | |
504 | error_handler_.GetBGError().ok()) { | |
505 | Status new_bg_error = | |
506 | Status::SpaceLimit("Max allowed space was reached"); | |
507 | error_handler_.SetBGError(new_bg_error, | |
508 | BackgroundErrorReason::kFlush); | |
509 | } | |
510 | } | |
11fdf7f2 | 511 | } |
494da23a TL |
512 | #endif // ROCKSDB_LITE |
513 | } | |
514 | ||
515 | // Need to undo atomic flush if something went wrong, i.e. s is not OK and | |
516 | // it is not because of CF drop. | |
517 | if (!s.ok() && !s.IsShutdownInProgress()) { | |
518 | // Have to cancel the flush jobs that have NOT executed because we need to | |
519 | // unref the versions. | |
520 | for (int i = 0; i != num_cfs; ++i) { | |
521 | if (!exec_status[i].first) { | |
522 | jobs[i].Cancel(); | |
523 | } | |
524 | } | |
525 | for (int i = 0; i != num_cfs; ++i) { | |
526 | if (exec_status[i].first && exec_status[i].second.ok()) { | |
527 | auto& mems = jobs[i].GetMemTables(); | |
528 | cfds[i]->imm()->RollbackMemtableFlush(mems, | |
529 | file_meta[i].fd.GetNumber()); | |
530 | } | |
531 | } | |
532 | Status new_bg_error = s; | |
533 | error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); | |
11fdf7f2 | 534 | } |
494da23a | 535 | |
11fdf7f2 TL |
536 | return s; |
537 | } | |
538 | ||
7c673cae FG |
539 | void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, |
540 | const MutableCFOptions& mutable_cf_options, | |
541 | int job_id, TableProperties prop) { | |
542 | #ifndef ROCKSDB_LITE | |
543 | if (immutable_db_options_.listeners.size() == 0U) { | |
544 | return; | |
545 | } | |
546 | mutex_.AssertHeld(); | |
547 | if (shutting_down_.load(std::memory_order_acquire)) { | |
548 | return; | |
549 | } | |
550 | bool triggered_writes_slowdown = | |
551 | (cfd->current()->storage_info()->NumLevelFiles(0) >= | |
552 | mutable_cf_options.level0_slowdown_writes_trigger); | |
553 | bool triggered_writes_stop = | |
554 | (cfd->current()->storage_info()->NumLevelFiles(0) >= | |
555 | mutable_cf_options.level0_stop_writes_trigger); | |
556 | // release lock while notifying events | |
557 | mutex_.Unlock(); | |
558 | { | |
559 | FlushJobInfo info; | |
494da23a | 560 | info.cf_id = cfd->GetID(); |
7c673cae FG |
561 | info.cf_name = cfd->GetName(); |
562 | // TODO(yhchiang): make db_paths dynamic in case flush does not | |
563 | // go to L0 in the future. | |
11fdf7f2 | 564 | info.file_path = MakeTableFileName(cfd->ioptions()->cf_paths[0].path, |
7c673cae FG |
565 | file_meta->fd.GetNumber()); |
566 | info.thread_id = env_->GetThreadID(); | |
567 | info.job_id = job_id; | |
568 | info.triggered_writes_slowdown = triggered_writes_slowdown; | |
569 | info.triggered_writes_stop = triggered_writes_stop; | |
11fdf7f2 TL |
570 | info.smallest_seqno = file_meta->fd.smallest_seqno; |
571 | info.largest_seqno = file_meta->fd.largest_seqno; | |
7c673cae | 572 | info.table_properties = prop; |
11fdf7f2 | 573 | info.flush_reason = cfd->GetFlushReason(); |
7c673cae FG |
574 | for (auto listener : immutable_db_options_.listeners) { |
575 | listener->OnFlushBegin(this, info); | |
576 | } | |
577 | } | |
578 | mutex_.Lock(); | |
579 | // no need to signal bg_cv_ as it will be signaled at the end of the | |
580 | // flush process. | |
11fdf7f2 TL |
581 | #else |
582 | (void)cfd; | |
583 | (void)file_meta; | |
584 | (void)mutable_cf_options; | |
585 | (void)job_id; | |
586 | (void)prop; | |
7c673cae FG |
587 | #endif // ROCKSDB_LITE |
588 | } | |
589 | ||
590 | void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd, | |
591 | FileMetaData* file_meta, | |
592 | const MutableCFOptions& mutable_cf_options, | |
593 | int job_id, TableProperties prop) { | |
594 | #ifndef ROCKSDB_LITE | |
595 | if (immutable_db_options_.listeners.size() == 0U) { | |
596 | return; | |
597 | } | |
598 | mutex_.AssertHeld(); | |
599 | if (shutting_down_.load(std::memory_order_acquire)) { | |
600 | return; | |
601 | } | |
602 | bool triggered_writes_slowdown = | |
603 | (cfd->current()->storage_info()->NumLevelFiles(0) >= | |
604 | mutable_cf_options.level0_slowdown_writes_trigger); | |
605 | bool triggered_writes_stop = | |
606 | (cfd->current()->storage_info()->NumLevelFiles(0) >= | |
607 | mutable_cf_options.level0_stop_writes_trigger); | |
608 | // release lock while notifying events | |
609 | mutex_.Unlock(); | |
610 | { | |
611 | FlushJobInfo info; | |
494da23a | 612 | info.cf_id = cfd->GetID(); |
7c673cae FG |
613 | info.cf_name = cfd->GetName(); |
614 | // TODO(yhchiang): make db_paths dynamic in case flush does not | |
615 | // go to L0 in the future. | |
11fdf7f2 | 616 | info.file_path = MakeTableFileName(cfd->ioptions()->cf_paths[0].path, |
7c673cae FG |
617 | file_meta->fd.GetNumber()); |
618 | info.thread_id = env_->GetThreadID(); | |
619 | info.job_id = job_id; | |
620 | info.triggered_writes_slowdown = triggered_writes_slowdown; | |
621 | info.triggered_writes_stop = triggered_writes_stop; | |
11fdf7f2 TL |
622 | info.smallest_seqno = file_meta->fd.smallest_seqno; |
623 | info.largest_seqno = file_meta->fd.largest_seqno; | |
7c673cae | 624 | info.table_properties = prop; |
11fdf7f2 | 625 | info.flush_reason = cfd->GetFlushReason(); |
7c673cae FG |
626 | for (auto listener : immutable_db_options_.listeners) { |
627 | listener->OnFlushCompleted(this, info); | |
628 | } | |
629 | } | |
630 | mutex_.Lock(); | |
631 | // no need to signal bg_cv_ as it will be signaled at the end of the | |
632 | // flush process. | |
11fdf7f2 TL |
633 | #else |
634 | (void)cfd; | |
635 | (void)file_meta; | |
636 | (void)mutable_cf_options; | |
637 | (void)job_id; | |
638 | (void)prop; | |
7c673cae FG |
639 | #endif // ROCKSDB_LITE |
640 | } | |
641 | ||
642 | Status DBImpl::CompactRange(const CompactRangeOptions& options, | |
643 | ColumnFamilyHandle* column_family, | |
644 | const Slice* begin, const Slice* end) { | |
11fdf7f2 TL |
645 | auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); |
646 | auto cfd = cfh->cfd(); | |
647 | ||
648 | if (options.target_path_id >= cfd->ioptions()->cf_paths.size()) { | |
7c673cae FG |
649 | return Status::InvalidArgument("Invalid target path ID"); |
650 | } | |
651 | ||
7c673cae FG |
652 | bool exclusive = options.exclusive_manual_compaction; |
653 | ||
11fdf7f2 TL |
654 | bool flush_needed = true; |
655 | if (begin != nullptr && end != nullptr) { | |
656 | // TODO(ajkr): We could also optimize away the flush in certain cases where | |
657 | // one/both sides of the interval are unbounded. But it requires more | |
658 | // changes to RangesOverlapWithMemtables. | |
659 | Range range(*begin, *end); | |
660 | SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_); | |
661 | cfd->RangesOverlapWithMemtables({range}, super_version, &flush_needed); | |
662 | CleanupSuperVersion(super_version); | |
663 | } | |
664 | ||
665 | Status s; | |
666 | if (flush_needed) { | |
667 | FlushOptions fo; | |
668 | fo.allow_write_stall = options.allow_write_stall; | |
494da23a TL |
669 | if (immutable_db_options_.atomic_flush) { |
670 | autovector<ColumnFamilyData*> cfds; | |
671 | mutex_.Lock(); | |
672 | SelectColumnFamiliesForAtomicFlush(&cfds); | |
673 | mutex_.Unlock(); | |
674 | s = AtomicFlushMemTables(cfds, fo, FlushReason::kManualCompaction, | |
675 | false /* writes_stopped */); | |
676 | } else { | |
677 | s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction, | |
678 | false /* writes_stopped*/); | |
679 | } | |
11fdf7f2 TL |
680 | if (!s.ok()) { |
681 | LogFlush(immutable_db_options_.info_log); | |
682 | return s; | |
683 | } | |
7c673cae FG |
684 | } |
685 | ||
686 | int max_level_with_files = 0; | |
687 | { | |
688 | InstrumentedMutexLock l(&mutex_); | |
689 | Version* base = cfd->current(); | |
690 | for (int level = 1; level < base->storage_info()->num_non_empty_levels(); | |
691 | level++) { | |
692 | if (base->storage_info()->OverlapInLevel(level, begin, end)) { | |
693 | max_level_with_files = level; | |
694 | } | |
695 | } | |
696 | } | |
697 | ||
698 | int final_output_level = 0; | |
699 | if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal && | |
700 | cfd->NumberLevels() > 1) { | |
701 | // Always compact all files together. | |
7c673cae | 702 | final_output_level = cfd->NumberLevels() - 1; |
11fdf7f2 TL |
703 | // if bottom most level is reserved |
704 | if (immutable_db_options_.allow_ingest_behind) { | |
705 | final_output_level--; | |
706 | } | |
707 | s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels, | |
708 | final_output_level, options.target_path_id, | |
709 | options.max_subcompactions, begin, end, exclusive); | |
7c673cae FG |
710 | } else { |
711 | for (int level = 0; level <= max_level_with_files; level++) { | |
712 | int output_level; | |
713 | // in case the compaction is universal or if we're compacting the | |
714 | // bottom-most level, the output level will be the same as input one. | |
715 | // level 0 can never be the bottommost level (i.e. if all files are in | |
716 | // level 0, we will compact to level 1) | |
717 | if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal || | |
718 | cfd->ioptions()->compaction_style == kCompactionStyleFIFO) { | |
719 | output_level = level; | |
720 | } else if (level == max_level_with_files && level > 0) { | |
721 | if (options.bottommost_level_compaction == | |
722 | BottommostLevelCompaction::kSkip) { | |
723 | // Skip bottommost level compaction | |
724 | continue; | |
725 | } else if (options.bottommost_level_compaction == | |
726 | BottommostLevelCompaction::kIfHaveCompactionFilter && | |
727 | cfd->ioptions()->compaction_filter == nullptr && | |
728 | cfd->ioptions()->compaction_filter_factory == nullptr) { | |
729 | // Skip bottommost level compaction since we don't have a compaction | |
730 | // filter | |
731 | continue; | |
732 | } | |
733 | output_level = level; | |
734 | } else { | |
735 | output_level = level + 1; | |
736 | if (cfd->ioptions()->compaction_style == kCompactionStyleLevel && | |
737 | cfd->ioptions()->level_compaction_dynamic_level_bytes && | |
738 | level == 0) { | |
739 | output_level = ColumnFamilyData::kCompactToBaseLevel; | |
740 | } | |
741 | } | |
742 | s = RunManualCompaction(cfd, level, output_level, options.target_path_id, | |
494da23a TL |
743 | options.max_subcompactions, begin, end, |
744 | exclusive); | |
7c673cae FG |
745 | if (!s.ok()) { |
746 | break; | |
747 | } | |
748 | if (output_level == ColumnFamilyData::kCompactToBaseLevel) { | |
749 | final_output_level = cfd->NumberLevels() - 1; | |
750 | } else if (output_level > final_output_level) { | |
751 | final_output_level = output_level; | |
752 | } | |
753 | TEST_SYNC_POINT("DBImpl::RunManualCompaction()::1"); | |
754 | TEST_SYNC_POINT("DBImpl::RunManualCompaction()::2"); | |
755 | } | |
756 | } | |
757 | if (!s.ok()) { | |
758 | LogFlush(immutable_db_options_.info_log); | |
759 | return s; | |
760 | } | |
761 | ||
762 | if (options.change_level) { | |
763 | ROCKS_LOG_INFO(immutable_db_options_.info_log, | |
764 | "[RefitLevel] waiting for background threads to stop"); | |
765 | s = PauseBackgroundWork(); | |
766 | if (s.ok()) { | |
767 | s = ReFitLevel(cfd, final_output_level, options.target_level); | |
768 | } | |
769 | ContinueBackgroundWork(); | |
770 | } | |
771 | LogFlush(immutable_db_options_.info_log); | |
772 | ||
773 | { | |
774 | InstrumentedMutexLock l(&mutex_); | |
775 | // an automatic compaction that has been scheduled might have been | |
776 | // preempted by the manual compactions. Need to schedule it back. | |
777 | MaybeScheduleFlushOrCompaction(); | |
778 | } | |
779 | ||
780 | return s; | |
781 | } | |
782 | ||
11fdf7f2 TL |
783 | Status DBImpl::CompactFiles(const CompactionOptions& compact_options, |
784 | ColumnFamilyHandle* column_family, | |
785 | const std::vector<std::string>& input_file_names, | |
786 | const int output_level, const int output_path_id, | |
494da23a TL |
787 | std::vector<std::string>* const output_file_names, |
788 | CompactionJobInfo* compaction_job_info) { | |
7c673cae | 789 | #ifdef ROCKSDB_LITE |
11fdf7f2 TL |
790 | (void)compact_options; |
791 | (void)column_family; | |
792 | (void)input_file_names; | |
793 | (void)output_level; | |
794 | (void)output_path_id; | |
795 | (void)output_file_names; | |
494da23a | 796 | (void)compaction_job_info; |
11fdf7f2 | 797 | // not supported in lite version |
7c673cae FG |
798 | return Status::NotSupported("Not supported in ROCKSDB LITE"); |
799 | #else | |
800 | if (column_family == nullptr) { | |
801 | return Status::InvalidArgument("ColumnFamilyHandle must be non-null."); | |
802 | } | |
803 | ||
804 | auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd(); | |
805 | assert(cfd); | |
806 | ||
807 | Status s; | |
808 | JobContext job_context(0, true); | |
809 | LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, | |
810 | immutable_db_options_.info_log.get()); | |
811 | ||
812 | // Perform CompactFiles | |
494da23a | 813 | TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile2"); |
7c673cae FG |
814 | { |
815 | InstrumentedMutexLock l(&mutex_); | |
816 | ||
817 | // This call will unlock/lock the mutex to wait for current running | |
818 | // IngestExternalFile() calls to finish. | |
819 | WaitForIngestFile(); | |
820 | ||
494da23a TL |
821 | // We need to get current after `WaitForIngestFile`, because |
822 | // `IngestExternalFile` may add files that overlap with `input_file_names` | |
823 | auto* current = cfd->current(); | |
824 | current->Ref(); | |
825 | ||
826 | s = CompactFilesImpl(compact_options, cfd, current, input_file_names, | |
11fdf7f2 | 827 | output_file_names, output_level, output_path_id, |
494da23a TL |
828 | &job_context, &log_buffer, compaction_job_info); |
829 | ||
830 | current->Unref(); | |
7c673cae FG |
831 | } |
832 | ||
833 | // Find and delete obsolete files | |
834 | { | |
835 | InstrumentedMutexLock l(&mutex_); | |
836 | // If !s.ok(), this means that Compaction failed. In that case, we want | |
837 | // to delete all obsolete files we might have created and we force | |
838 | // FindObsoleteFiles(). This is because job_context does not | |
839 | // catch all created files if compaction failed. | |
840 | FindObsoleteFiles(&job_context, !s.ok()); | |
841 | } // release the mutex | |
842 | ||
843 | // delete unnecessary files if any, this is done outside the mutex | |
11fdf7f2 TL |
844 | if (job_context.HaveSomethingToClean() || |
845 | job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { | |
7c673cae FG |
846 | // Have to flush the info logs before bg_compaction_scheduled_-- |
847 | // because if bg_flush_scheduled_ becomes 0 and the lock is | |
848 | // released, the deconstructor of DB can kick in and destroy all the | |
849 | // states of DB so info_log might not be available after that point. | |
850 | // It also applies to access other states that DB owns. | |
851 | log_buffer.FlushBufferToLog(); | |
852 | if (job_context.HaveSomethingToDelete()) { | |
853 | // no mutex is locked here. No need to Unlock() and Lock() here. | |
854 | PurgeObsoleteFiles(job_context); | |
855 | } | |
856 | job_context.Clean(); | |
857 | } | |
858 | ||
859 | return s; | |
860 | #endif // ROCKSDB_LITE | |
861 | } | |
862 | ||
863 | #ifndef ROCKSDB_LITE | |
864 | Status DBImpl::CompactFilesImpl( | |
865 | const CompactionOptions& compact_options, ColumnFamilyData* cfd, | |
866 | Version* version, const std::vector<std::string>& input_file_names, | |
11fdf7f2 | 867 | std::vector<std::string>* const output_file_names, const int output_level, |
494da23a TL |
868 | int output_path_id, JobContext* job_context, LogBuffer* log_buffer, |
869 | CompactionJobInfo* compaction_job_info) { | |
7c673cae FG |
870 | mutex_.AssertHeld(); |
871 | ||
872 | if (shutting_down_.load(std::memory_order_acquire)) { | |
873 | return Status::ShutdownInProgress(); | |
874 | } | |
875 | ||
876 | std::unordered_set<uint64_t> input_set; | |
494da23a | 877 | for (const auto& file_name : input_file_names) { |
7c673cae FG |
878 | input_set.insert(TableFileNameToNumber(file_name)); |
879 | } | |
880 | ||
881 | ColumnFamilyMetaData cf_meta; | |
882 | // TODO(yhchiang): can directly use version here if none of the | |
883 | // following functions call is pluggable to external developers. | |
884 | version->GetColumnFamilyMetaData(&cf_meta); | |
885 | ||
886 | if (output_path_id < 0) { | |
11fdf7f2 | 887 | if (cfd->ioptions()->cf_paths.size() == 1U) { |
7c673cae FG |
888 | output_path_id = 0; |
889 | } else { | |
890 | return Status::NotSupported( | |
891 | "Automatic output path selection is not " | |
892 | "yet supported in CompactFiles()"); | |
893 | } | |
894 | } | |
895 | ||
896 | Status s = cfd->compaction_picker()->SanitizeCompactionInputFiles( | |
897 | &input_set, cf_meta, output_level); | |
898 | if (!s.ok()) { | |
899 | return s; | |
900 | } | |
901 | ||
902 | std::vector<CompactionInputFiles> input_files; | |
903 | s = cfd->compaction_picker()->GetCompactionInputsFromFileNumbers( | |
904 | &input_files, &input_set, version->storage_info(), compact_options); | |
905 | if (!s.ok()) { | |
906 | return s; | |
907 | } | |
908 | ||
494da23a | 909 | for (const auto& inputs : input_files) { |
7c673cae FG |
910 | if (cfd->compaction_picker()->AreFilesInCompaction(inputs.files)) { |
911 | return Status::Aborted( | |
912 | "Some of the necessary compaction input " | |
913 | "files are already being compacted"); | |
914 | } | |
915 | } | |
11fdf7f2 TL |
916 | bool sfm_reserved_compact_space = false; |
917 | // First check if we have enough room to do the compaction | |
918 | bool enough_room = EnoughRoomForCompaction( | |
919 | cfd, input_files, &sfm_reserved_compact_space, log_buffer); | |
920 | ||
921 | if (!enough_room) { | |
922 | // m's vars will get set properly at the end of this function, | |
923 | // as long as status == CompactionTooLarge | |
924 | return Status::CompactionTooLarge(); | |
925 | } | |
7c673cae FG |
926 | |
927 | // At this point, CompactFiles will be run. | |
928 | bg_compaction_scheduled_++; | |
929 | ||
494da23a | 930 | std::unique_ptr<Compaction> c; |
7c673cae FG |
931 | assert(cfd->compaction_picker()); |
932 | c.reset(cfd->compaction_picker()->CompactFiles( | |
933 | compact_options, input_files, output_level, version->storage_info(), | |
934 | *cfd->GetLatestMutableCFOptions(), output_path_id)); | |
11fdf7f2 TL |
935 | // we already sanitized the set of input files and checked for conflicts |
936 | // without releasing the lock, so we're guaranteed a compaction can be formed. | |
937 | assert(c != nullptr); | |
938 | ||
7c673cae FG |
939 | c->SetInputVersion(version); |
940 | // deletion compaction currently not allowed in CompactFiles. | |
941 | assert(!c->deletion_compaction()); | |
942 | ||
494da23a | 943 | std::vector<SequenceNumber> snapshot_seqs; |
7c673cae | 944 | SequenceNumber earliest_write_conflict_snapshot; |
494da23a TL |
945 | SnapshotChecker* snapshot_checker; |
946 | GetSnapshotContext(job_context, &snapshot_seqs, | |
947 | &earliest_write_conflict_snapshot, &snapshot_checker); | |
7c673cae FG |
948 | |
949 | auto pending_outputs_inserted_elem = | |
950 | CaptureCurrentFileNumberInPendingOutputs(); | |
951 | ||
952 | assert(is_snapshot_supported_ || snapshots_.empty()); | |
494da23a | 953 | CompactionJobStats compaction_job_stats; |
7c673cae | 954 | CompactionJob compaction_job( |
11fdf7f2 TL |
955 | job_context->job_id, c.get(), immutable_db_options_, |
956 | env_options_for_compaction_, versions_.get(), &shutting_down_, | |
957 | preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(), | |
958 | GetDataDir(c->column_family_data(), c->output_path_id()), stats_, &mutex_, | |
959 | &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot, | |
960 | snapshot_checker, table_cache_, &event_logger_, | |
961 | c->mutable_cf_options()->paranoid_file_checks, | |
7c673cae | 962 | c->mutable_cf_options()->report_bg_io_stats, dbname_, |
494da23a | 963 | &compaction_job_stats, Env::Priority::USER); |
7c673cae FG |
964 | |
965 | // Creating a compaction influences the compaction score because the score | |
966 | // takes running compactions into account (by skipping files that are already | |
967 | // being compacted). Since we just changed compaction score, we recalculate it | |
968 | // here. | |
969 | version->storage_info()->ComputeCompactionScore(*cfd->ioptions(), | |
970 | *c->mutable_cf_options()); | |
971 | ||
972 | compaction_job.Prepare(); | |
973 | ||
974 | mutex_.Unlock(); | |
975 | TEST_SYNC_POINT("CompactFilesImpl:0"); | |
976 | TEST_SYNC_POINT("CompactFilesImpl:1"); | |
977 | compaction_job.Run(); | |
978 | TEST_SYNC_POINT("CompactFilesImpl:2"); | |
979 | TEST_SYNC_POINT("CompactFilesImpl:3"); | |
980 | mutex_.Lock(); | |
981 | ||
982 | Status status = compaction_job.Install(*c->mutable_cf_options()); | |
983 | if (status.ok()) { | |
494da23a TL |
984 | InstallSuperVersionAndScheduleWork(c->column_family_data(), |
985 | &job_context->superversion_contexts[0], | |
986 | *c->mutable_cf_options()); | |
7c673cae FG |
987 | } |
988 | c->ReleaseCompactionFiles(s); | |
11fdf7f2 TL |
989 | #ifndef ROCKSDB_LITE |
990 | // Need to make sure SstFileManager does its bookkeeping | |
991 | auto sfm = static_cast<SstFileManagerImpl*>( | |
992 | immutable_db_options_.sst_file_manager.get()); | |
993 | if (sfm && sfm_reserved_compact_space) { | |
994 | sfm->OnCompactionCompletion(c.get()); | |
995 | } | |
996 | #endif // ROCKSDB_LITE | |
7c673cae FG |
997 | |
998 | ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); | |
999 | ||
494da23a TL |
1000 | if (compaction_job_info != nullptr) { |
1001 | BuildCompactionJobInfo(cfd, c.get(), s, compaction_job_stats, | |
1002 | job_context->job_id, version, compaction_job_info); | |
1003 | } | |
1004 | ||
7c673cae FG |
1005 | if (status.ok()) { |
1006 | // Done | |
1007 | } else if (status.IsShutdownInProgress()) { | |
1008 | // Ignore compaction errors found during shutting down | |
1009 | } else { | |
1010 | ROCKS_LOG_WARN(immutable_db_options_.info_log, | |
1011 | "[%s] [JOB %d] Compaction error: %s", | |
1012 | c->column_family_data()->GetName().c_str(), | |
1013 | job_context->job_id, status.ToString().c_str()); | |
11fdf7f2 TL |
1014 | error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction); |
1015 | } | |
1016 | ||
1017 | if (output_file_names != nullptr) { | |
1018 | for (const auto newf : c->edit()->GetNewFiles()) { | |
1019 | (*output_file_names) | |
1020 | .push_back(TableFileName(c->immutable_cf_options()->cf_paths, | |
1021 | newf.second.fd.GetNumber(), | |
1022 | newf.second.fd.GetPathId())); | |
7c673cae FG |
1023 | } |
1024 | } | |
1025 | ||
1026 | c.reset(); | |
1027 | ||
1028 | bg_compaction_scheduled_--; | |
1029 | if (bg_compaction_scheduled_ == 0) { | |
1030 | bg_cv_.SignalAll(); | |
1031 | } | |
494da23a | 1032 | MaybeScheduleFlushOrCompaction(); |
11fdf7f2 | 1033 | TEST_SYNC_POINT("CompactFilesImpl:End"); |
7c673cae FG |
1034 | |
1035 | return status; | |
1036 | } | |
1037 | #endif // ROCKSDB_LITE | |
1038 | ||
1039 | Status DBImpl::PauseBackgroundWork() { | |
1040 | InstrumentedMutexLock guard_lock(&mutex_); | |
1041 | bg_compaction_paused_++; | |
11fdf7f2 TL |
1042 | while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0 || |
1043 | bg_flush_scheduled_ > 0) { | |
7c673cae FG |
1044 | bg_cv_.Wait(); |
1045 | } | |
1046 | bg_work_paused_++; | |
1047 | return Status::OK(); | |
1048 | } | |
1049 | ||
1050 | Status DBImpl::ContinueBackgroundWork() { | |
1051 | InstrumentedMutexLock guard_lock(&mutex_); | |
1052 | if (bg_work_paused_ == 0) { | |
1053 | return Status::InvalidArgument(); | |
1054 | } | |
1055 | assert(bg_work_paused_ > 0); | |
1056 | assert(bg_compaction_paused_ > 0); | |
1057 | bg_compaction_paused_--; | |
1058 | bg_work_paused_--; | |
1059 | // It's sufficient to check just bg_work_paused_ here since | |
1060 | // bg_work_paused_ is always no greater than bg_compaction_paused_ | |
1061 | if (bg_work_paused_ == 0) { | |
1062 | MaybeScheduleFlushOrCompaction(); | |
1063 | } | |
1064 | return Status::OK(); | |
1065 | } | |
1066 | ||
494da23a TL |
1067 | void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c, |
1068 | const Status& st, | |
1069 | const CompactionJobStats& job_stats, | |
1070 | int job_id) { | |
7c673cae | 1071 | #ifndef ROCKSDB_LITE |
494da23a | 1072 | if (immutable_db_options_.listeners.empty()) { |
7c673cae FG |
1073 | return; |
1074 | } | |
1075 | mutex_.AssertHeld(); | |
1076 | if (shutting_down_.load(std::memory_order_acquire)) { | |
1077 | return; | |
1078 | } | |
11fdf7f2 TL |
1079 | Version* current = cfd->current(); |
1080 | current->Ref(); | |
7c673cae FG |
1081 | // release lock while notifying events |
1082 | mutex_.Unlock(); | |
494da23a | 1083 | TEST_SYNC_POINT("DBImpl::NotifyOnCompactionBegin::UnlockMutex"); |
7c673cae FG |
1084 | { |
1085 | CompactionJobInfo info; | |
1086 | info.cf_name = cfd->GetName(); | |
1087 | info.status = st; | |
1088 | info.thread_id = env_->GetThreadID(); | |
1089 | info.job_id = job_id; | |
1090 | info.base_input_level = c->start_level(); | |
1091 | info.output_level = c->output_level(); | |
494da23a | 1092 | info.stats = job_stats; |
7c673cae FG |
1093 | info.table_properties = c->GetOutputTableProperties(); |
1094 | info.compaction_reason = c->compaction_reason(); | |
1095 | info.compression = c->output_compression(); | |
1096 | for (size_t i = 0; i < c->num_input_levels(); ++i) { | |
1097 | for (const auto fmd : *c->inputs(i)) { | |
11fdf7f2 | 1098 | auto fn = TableFileName(c->immutable_cf_options()->cf_paths, |
7c673cae FG |
1099 | fmd->fd.GetNumber(), fmd->fd.GetPathId()); |
1100 | info.input_files.push_back(fn); | |
1101 | if (info.table_properties.count(fn) == 0) { | |
1102 | std::shared_ptr<const TableProperties> tp; | |
11fdf7f2 | 1103 | auto s = current->GetTableProperties(&tp, fmd, &fn); |
7c673cae FG |
1104 | if (s.ok()) { |
1105 | info.table_properties[fn] = tp; | |
1106 | } | |
1107 | } | |
1108 | } | |
1109 | } | |
1110 | for (const auto newf : c->edit()->GetNewFiles()) { | |
11fdf7f2 TL |
1111 | info.output_files.push_back(TableFileName( |
1112 | c->immutable_cf_options()->cf_paths, newf.second.fd.GetNumber(), | |
1113 | newf.second.fd.GetPathId())); | |
7c673cae | 1114 | } |
494da23a TL |
1115 | for (auto listener : immutable_db_options_.listeners) { |
1116 | listener->OnCompactionBegin(this, info); | |
1117 | } | |
1118 | } | |
1119 | mutex_.Lock(); | |
1120 | current->Unref(); | |
1121 | #else | |
1122 | (void)cfd; | |
1123 | (void)c; | |
1124 | (void)st; | |
1125 | (void)job_stats; | |
1126 | (void)job_id; | |
1127 | #endif // ROCKSDB_LITE | |
1128 | } | |
1129 | ||
1130 | void DBImpl::NotifyOnCompactionCompleted( | |
1131 | ColumnFamilyData* cfd, Compaction* c, const Status& st, | |
1132 | const CompactionJobStats& compaction_job_stats, const int job_id) { | |
1133 | #ifndef ROCKSDB_LITE | |
1134 | if (immutable_db_options_.listeners.size() == 0U) { | |
1135 | return; | |
1136 | } | |
1137 | mutex_.AssertHeld(); | |
1138 | if (shutting_down_.load(std::memory_order_acquire)) { | |
1139 | return; | |
1140 | } | |
1141 | Version* current = cfd->current(); | |
1142 | current->Ref(); | |
1143 | // release lock while notifying events | |
1144 | mutex_.Unlock(); | |
1145 | TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex"); | |
1146 | { | |
1147 | CompactionJobInfo info; | |
1148 | BuildCompactionJobInfo(cfd, c, st, compaction_job_stats, job_id, current, | |
1149 | &info); | |
7c673cae FG |
1150 | for (auto listener : immutable_db_options_.listeners) { |
1151 | listener->OnCompactionCompleted(this, info); | |
1152 | } | |
1153 | } | |
1154 | mutex_.Lock(); | |
11fdf7f2 | 1155 | current->Unref(); |
7c673cae FG |
1156 | // no need to signal bg_cv_ as it will be signaled at the end of the |
1157 | // flush process. | |
11fdf7f2 TL |
1158 | #else |
1159 | (void)cfd; | |
1160 | (void)c; | |
1161 | (void)st; | |
1162 | (void)compaction_job_stats; | |
1163 | (void)job_id; | |
7c673cae FG |
1164 | #endif // ROCKSDB_LITE |
1165 | } | |
1166 | ||
1167 | // REQUIREMENT: block all background work by calling PauseBackgroundWork() | |
1168 | // before calling this function | |
1169 | Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { | |
1170 | assert(level < cfd->NumberLevels()); | |
1171 | if (target_level >= cfd->NumberLevels()) { | |
1172 | return Status::InvalidArgument("Target level exceeds number of levels"); | |
1173 | } | |
1174 | ||
11fdf7f2 | 1175 | SuperVersionContext sv_context(/* create_superversion */ true); |
7c673cae FG |
1176 | |
1177 | Status status; | |
1178 | ||
1179 | InstrumentedMutexLock guard_lock(&mutex_); | |
1180 | ||
1181 | // only allow one thread refitting | |
1182 | if (refitting_level_) { | |
1183 | ROCKS_LOG_INFO(immutable_db_options_.info_log, | |
1184 | "[ReFitLevel] another thread is refitting"); | |
1185 | return Status::NotSupported("another thread is refitting"); | |
1186 | } | |
1187 | refitting_level_ = true; | |
1188 | ||
1189 | const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions(); | |
1190 | // move to a smaller level | |
1191 | int to_level = target_level; | |
1192 | if (target_level < 0) { | |
1193 | to_level = FindMinimumEmptyLevelFitting(cfd, mutable_cf_options, level); | |
1194 | } | |
1195 | ||
1196 | auto* vstorage = cfd->current()->storage_info(); | |
1197 | if (to_level > level) { | |
1198 | if (level == 0) { | |
1199 | return Status::NotSupported( | |
1200 | "Cannot change from level 0 to other levels."); | |
1201 | } | |
1202 | // Check levels are empty for a trivial move | |
1203 | for (int l = level + 1; l <= to_level; l++) { | |
1204 | if (vstorage->NumLevelFiles(l) > 0) { | |
1205 | return Status::NotSupported( | |
1206 | "Levels between source and target are not empty for a move."); | |
1207 | } | |
1208 | } | |
1209 | } | |
1210 | if (to_level != level) { | |
1211 | ROCKS_LOG_DEBUG(immutable_db_options_.info_log, | |
1212 | "[%s] Before refitting:\n%s", cfd->GetName().c_str(), | |
1213 | cfd->current()->DebugString().data()); | |
1214 | ||
1215 | VersionEdit edit; | |
1216 | edit.SetColumnFamily(cfd->GetID()); | |
1217 | for (const auto& f : vstorage->LevelFiles(level)) { | |
1218 | edit.DeleteFile(level, f->fd.GetNumber()); | |
1219 | edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(), | |
1220 | f->fd.GetFileSize(), f->smallest, f->largest, | |
11fdf7f2 | 1221 | f->fd.smallest_seqno, f->fd.largest_seqno, |
7c673cae FG |
1222 | f->marked_for_compaction); |
1223 | } | |
1224 | ROCKS_LOG_DEBUG(immutable_db_options_.info_log, | |
1225 | "[%s] Apply version edit:\n%s", cfd->GetName().c_str(), | |
1226 | edit.DebugString().data()); | |
1227 | ||
1228 | status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_, | |
1229 | directories_.GetDbDir()); | |
11fdf7f2 | 1230 | InstallSuperVersionAndScheduleWork(cfd, &sv_context, mutable_cf_options); |
7c673cae FG |
1231 | |
1232 | ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] LogAndApply: %s\n", | |
1233 | cfd->GetName().c_str(), status.ToString().data()); | |
1234 | ||
1235 | if (status.ok()) { | |
1236 | ROCKS_LOG_DEBUG(immutable_db_options_.info_log, | |
1237 | "[%s] After refitting:\n%s", cfd->GetName().c_str(), | |
1238 | cfd->current()->DebugString().data()); | |
1239 | } | |
1240 | } | |
1241 | ||
11fdf7f2 | 1242 | sv_context.Clean(); |
7c673cae FG |
1243 | refitting_level_ = false; |
1244 | ||
1245 | return status; | |
1246 | } | |
1247 | ||
1248 | int DBImpl::NumberLevels(ColumnFamilyHandle* column_family) { | |
1249 | auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); | |
1250 | return cfh->cfd()->NumberLevels(); | |
1251 | } | |
1252 | ||
11fdf7f2 | 1253 | int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle* /*column_family*/) { |
7c673cae FG |
1254 | return 0; |
1255 | } | |
1256 | ||
1257 | int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) { | |
1258 | auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); | |
1259 | InstrumentedMutexLock l(&mutex_); | |
11fdf7f2 TL |
1260 | return cfh->cfd() |
1261 | ->GetSuperVersion() | |
1262 | ->mutable_cf_options.level0_stop_writes_trigger; | |
7c673cae FG |
1263 | } |
1264 | ||
1265 | Status DBImpl::Flush(const FlushOptions& flush_options, | |
1266 | ColumnFamilyHandle* column_family) { | |
1267 | auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); | |
11fdf7f2 TL |
1268 | ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.", |
1269 | cfh->GetName().c_str()); | |
494da23a TL |
1270 | Status s; |
1271 | if (immutable_db_options_.atomic_flush) { | |
1272 | s = AtomicFlushMemTables({cfh->cfd()}, flush_options, | |
1273 | FlushReason::kManualFlush); | |
1274 | } else { | |
1275 | s = FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualFlush); | |
1276 | } | |
1277 | ||
11fdf7f2 TL |
1278 | ROCKS_LOG_INFO(immutable_db_options_.info_log, |
1279 | "[%s] Manual flush finished, status: %s\n", | |
1280 | cfh->GetName().c_str(), s.ToString().c_str()); | |
1281 | return s; | |
1282 | } | |
1283 | ||
494da23a TL |
1284 | Status DBImpl::Flush(const FlushOptions& flush_options, |
1285 | const std::vector<ColumnFamilyHandle*>& column_families) { | |
11fdf7f2 | 1286 | Status s; |
494da23a TL |
1287 | if (!immutable_db_options_.atomic_flush) { |
1288 | for (auto cfh : column_families) { | |
1289 | s = Flush(flush_options, cfh); | |
1290 | if (!s.ok()) { | |
1291 | break; | |
1292 | } | |
11fdf7f2 | 1293 | } |
494da23a TL |
1294 | } else { |
1295 | ROCKS_LOG_INFO(immutable_db_options_.info_log, | |
1296 | "Manual atomic flush start.\n" | |
1297 | "=====Column families:====="); | |
1298 | for (auto cfh : column_families) { | |
1299 | auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh); | |
1300 | ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s", | |
1301 | cfhi->GetName().c_str()); | |
11fdf7f2 | 1302 | } |
494da23a TL |
1303 | ROCKS_LOG_INFO(immutable_db_options_.info_log, |
1304 | "=====End of column families list====="); | |
1305 | autovector<ColumnFamilyData*> cfds; | |
1306 | std::for_each(column_families.begin(), column_families.end(), | |
1307 | [&cfds](ColumnFamilyHandle* elem) { | |
1308 | auto cfh = static_cast<ColumnFamilyHandleImpl*>(elem); | |
1309 | cfds.emplace_back(cfh->cfd()); | |
1310 | }); | |
1311 | s = AtomicFlushMemTables(cfds, flush_options, FlushReason::kManualFlush); | |
1312 | ROCKS_LOG_INFO(immutable_db_options_.info_log, | |
1313 | "Manual atomic flush finished, status: %s\n" | |
1314 | "=====Column families:=====", | |
1315 | s.ToString().c_str()); | |
1316 | for (auto cfh : column_families) { | |
1317 | auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh); | |
1318 | ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s", | |
1319 | cfhi->GetName().c_str()); | |
11fdf7f2 | 1320 | } |
494da23a TL |
1321 | ROCKS_LOG_INFO(immutable_db_options_.info_log, |
1322 | "=====End of column families list====="); | |
11fdf7f2 | 1323 | } |
11fdf7f2 | 1324 | return s; |
7c673cae FG |
1325 | } |
1326 | ||
1327 | Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, | |
1328 | int output_level, uint32_t output_path_id, | |
11fdf7f2 | 1329 | uint32_t max_subcompactions, |
7c673cae FG |
1330 | const Slice* begin, const Slice* end, |
1331 | bool exclusive, bool disallow_trivial_move) { | |
1332 | assert(input_level == ColumnFamilyData::kCompactAllLevels || | |
1333 | input_level >= 0); | |
1334 | ||
1335 | InternalKey begin_storage, end_storage; | |
1336 | CompactionArg* ca; | |
1337 | ||
1338 | bool scheduled = false; | |
1339 | bool manual_conflict = false; | |
11fdf7f2 | 1340 | ManualCompactionState manual; |
7c673cae FG |
1341 | manual.cfd = cfd; |
1342 | manual.input_level = input_level; | |
1343 | manual.output_level = output_level; | |
1344 | manual.output_path_id = output_path_id; | |
1345 | manual.done = false; | |
1346 | manual.in_progress = false; | |
1347 | manual.incomplete = false; | |
1348 | manual.exclusive = exclusive; | |
1349 | manual.disallow_trivial_move = disallow_trivial_move; | |
1350 | // For universal compaction, we enforce every manual compaction to compact | |
1351 | // all files. | |
1352 | if (begin == nullptr || | |
1353 | cfd->ioptions()->compaction_style == kCompactionStyleUniversal || | |
1354 | cfd->ioptions()->compaction_style == kCompactionStyleFIFO) { | |
1355 | manual.begin = nullptr; | |
1356 | } else { | |
11fdf7f2 | 1357 | begin_storage.SetMinPossibleForUserKey(*begin); |
7c673cae FG |
1358 | manual.begin = &begin_storage; |
1359 | } | |
1360 | if (end == nullptr || | |
1361 | cfd->ioptions()->compaction_style == kCompactionStyleUniversal || | |
1362 | cfd->ioptions()->compaction_style == kCompactionStyleFIFO) { | |
1363 | manual.end = nullptr; | |
1364 | } else { | |
11fdf7f2 | 1365 | end_storage.SetMaxPossibleForUserKey(*end); |
7c673cae FG |
1366 | manual.end = &end_storage; |
1367 | } | |
1368 | ||
1369 | TEST_SYNC_POINT("DBImpl::RunManualCompaction:0"); | |
1370 | TEST_SYNC_POINT("DBImpl::RunManualCompaction:1"); | |
1371 | InstrumentedMutexLock l(&mutex_); | |
1372 | ||
1373 | // When a manual compaction arrives, temporarily disable scheduling of | |
1374 | // non-manual compactions and wait until the number of scheduled compaction | |
1375 | // jobs drops to zero. This is needed to ensure that this manual compaction | |
1376 | // can compact any range of keys/files. | |
1377 | // | |
1378 | // HasPendingManualCompaction() is true when at least one thread is inside | |
1379 | // RunManualCompaction(), i.e. during that time no other compaction will | |
1380 | // get scheduled (see MaybeScheduleFlushOrCompaction). | |
1381 | // | |
1382 | // Note that the following loop doesn't stop more that one thread calling | |
1383 | // RunManualCompaction() from getting to the second while loop below. | |
1384 | // However, only one of them will actually schedule compaction, while | |
1385 | // others will wait on a condition variable until it completes. | |
1386 | ||
1387 | AddManualCompaction(&manual); | |
1388 | TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_); | |
1389 | if (exclusive) { | |
11fdf7f2 TL |
1390 | while (bg_bottom_compaction_scheduled_ > 0 || |
1391 | bg_compaction_scheduled_ > 0) { | |
1392 | TEST_SYNC_POINT("DBImpl::RunManualCompaction:WaitScheduled"); | |
7c673cae FG |
1393 | ROCKS_LOG_INFO( |
1394 | immutable_db_options_.info_log, | |
1395 | "[%s] Manual compaction waiting for all other scheduled background " | |
1396 | "compactions to finish", | |
1397 | cfd->GetName().c_str()); | |
1398 | bg_cv_.Wait(); | |
1399 | } | |
1400 | } | |
1401 | ||
1402 | ROCKS_LOG_INFO(immutable_db_options_.info_log, | |
1403 | "[%s] Manual compaction starting", cfd->GetName().c_str()); | |
1404 | ||
494da23a TL |
1405 | LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, |
1406 | immutable_db_options_.info_log.get()); | |
7c673cae FG |
1407 | // We don't check bg_error_ here, because if we get the error in compaction, |
1408 | // the compaction will set manual.status to bg_error_ and set manual.done to | |
1409 | // true. | |
1410 | while (!manual.done) { | |
1411 | assert(HasPendingManualCompaction()); | |
1412 | manual_conflict = false; | |
11fdf7f2 | 1413 | Compaction* compaction = nullptr; |
7c673cae FG |
1414 | if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) || |
1415 | scheduled || | |
11fdf7f2 TL |
1416 | (((manual.manual_end = &manual.tmp_storage1) != nullptr) && |
1417 | ((compaction = manual.cfd->CompactRange( | |
1418 | *manual.cfd->GetLatestMutableCFOptions(), manual.input_level, | |
1419 | manual.output_level, manual.output_path_id, max_subcompactions, | |
1420 | manual.begin, manual.end, &manual.manual_end, | |
1421 | &manual_conflict)) == nullptr && | |
1422 | manual_conflict))) { | |
7c673cae FG |
1423 | // exclusive manual compactions should not see a conflict during |
1424 | // CompactRange | |
1425 | assert(!exclusive || !manual_conflict); | |
1426 | // Running either this or some other manual compaction | |
1427 | bg_cv_.Wait(); | |
1428 | if (scheduled && manual.incomplete == true) { | |
1429 | assert(!manual.in_progress); | |
1430 | scheduled = false; | |
1431 | manual.incomplete = false; | |
1432 | } | |
1433 | } else if (!scheduled) { | |
11fdf7f2 | 1434 | if (compaction == nullptr) { |
7c673cae FG |
1435 | manual.done = true; |
1436 | bg_cv_.SignalAll(); | |
1437 | continue; | |
1438 | } | |
1439 | ca = new CompactionArg; | |
1440 | ca->db = this; | |
11fdf7f2 TL |
1441 | ca->prepicked_compaction = new PrepickedCompaction; |
1442 | ca->prepicked_compaction->manual_compaction_state = &manual; | |
1443 | ca->prepicked_compaction->compaction = compaction; | |
494da23a TL |
1444 | if (!RequestCompactionToken( |
1445 | cfd, true, &ca->prepicked_compaction->task_token, &log_buffer)) { | |
1446 | // Don't throttle manual compaction, only count outstanding tasks. | |
1447 | assert(false); | |
1448 | } | |
7c673cae FG |
1449 | manual.incomplete = false; |
1450 | bg_compaction_scheduled_++; | |
1451 | env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this, | |
494da23a | 1452 | &DBImpl::UnscheduleCompactionCallback); |
7c673cae FG |
1453 | scheduled = true; |
1454 | } | |
1455 | } | |
1456 | ||
494da23a | 1457 | log_buffer.FlushBufferToLog(); |
7c673cae FG |
1458 | assert(!manual.in_progress); |
1459 | assert(HasPendingManualCompaction()); | |
1460 | RemoveManualCompaction(&manual); | |
1461 | bg_cv_.SignalAll(); | |
1462 | return manual.status; | |
1463 | } | |
1464 | ||
494da23a TL |
1465 | void DBImpl::GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds, |
1466 | FlushRequest* req) { | |
1467 | assert(req != nullptr); | |
1468 | req->reserve(cfds.size()); | |
1469 | for (const auto cfd : cfds) { | |
1470 | if (nullptr == cfd) { | |
1471 | // cfd may be null, see DBImpl::ScheduleFlushes | |
1472 | continue; | |
1473 | } | |
1474 | uint64_t max_memtable_id = cfd->imm()->GetLatestMemTableID(); | |
1475 | req->emplace_back(cfd, max_memtable_id); | |
1476 | } | |
1477 | } | |
1478 | ||
7c673cae FG |
1479 | Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, |
1480 | const FlushOptions& flush_options, | |
11fdf7f2 | 1481 | FlushReason flush_reason, bool writes_stopped) { |
7c673cae | 1482 | Status s; |
11fdf7f2 TL |
1483 | uint64_t flush_memtable_id = 0; |
1484 | if (!flush_options.allow_write_stall) { | |
1485 | bool flush_needed = true; | |
1486 | s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed); | |
1487 | TEST_SYNC_POINT("DBImpl::FlushMemTable:StallWaitDone"); | |
1488 | if (!s.ok() || !flush_needed) { | |
1489 | return s; | |
1490 | } | |
1491 | } | |
1492 | FlushRequest flush_req; | |
7c673cae FG |
1493 | { |
1494 | WriteContext context; | |
1495 | InstrumentedMutexLock guard_lock(&mutex_); | |
1496 | ||
7c673cae FG |
1497 | WriteThread::Writer w; |
1498 | if (!writes_stopped) { | |
1499 | write_thread_.EnterUnbatched(&w, &mutex_); | |
1500 | } | |
1501 | ||
494da23a | 1502 | if (!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load()) { |
11fdf7f2 | 1503 | s = SwitchMemtable(cfd, &context); |
494da23a TL |
1504 | } |
1505 | ||
1506 | if (s.ok()) { | |
1507 | if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() || | |
1508 | !cached_recoverable_state_empty_.load()) { | |
1509 | flush_memtable_id = cfd->imm()->GetLatestMemTableID(); | |
1510 | flush_req.emplace_back(cfd, flush_memtable_id); | |
1511 | } | |
11fdf7f2 TL |
1512 | } |
1513 | ||
1514 | if (s.ok() && !flush_req.empty()) { | |
1515 | for (auto& elem : flush_req) { | |
1516 | ColumnFamilyData* loop_cfd = elem.first; | |
1517 | loop_cfd->imm()->FlushRequested(); | |
1518 | } | |
1519 | SchedulePendingFlush(flush_req, flush_reason); | |
1520 | MaybeScheduleFlushOrCompaction(); | |
1521 | } | |
7c673cae FG |
1522 | |
1523 | if (!writes_stopped) { | |
1524 | write_thread_.ExitUnbatched(&w); | |
1525 | } | |
7c673cae FG |
1526 | } |
1527 | ||
1528 | if (s.ok() && flush_options.wait) { | |
11fdf7f2 TL |
1529 | autovector<ColumnFamilyData*> cfds; |
1530 | autovector<const uint64_t*> flush_memtable_ids; | |
1531 | for (auto& iter : flush_req) { | |
1532 | cfds.push_back(iter.first); | |
1533 | flush_memtable_ids.push_back(&(iter.second)); | |
1534 | } | |
494da23a TL |
1535 | s = WaitForFlushMemTables(cfds, flush_memtable_ids, |
1536 | (flush_reason == FlushReason::kErrorRecovery)); | |
7c673cae | 1537 | } |
11fdf7f2 | 1538 | TEST_SYNC_POINT("FlushMemTableFinished"); |
7c673cae FG |
1539 | return s; |
1540 | } | |
1541 | ||
494da23a TL |
1542 | // Flush all elments in 'column_family_datas' |
1543 | // and atomically record the result to the MANIFEST. | |
1544 | Status DBImpl::AtomicFlushMemTables( | |
1545 | const autovector<ColumnFamilyData*>& column_family_datas, | |
1546 | const FlushOptions& flush_options, FlushReason flush_reason, | |
1547 | bool writes_stopped) { | |
1548 | Status s; | |
1549 | if (!flush_options.allow_write_stall) { | |
1550 | int num_cfs_to_flush = 0; | |
1551 | for (auto cfd : column_family_datas) { | |
1552 | bool flush_needed = true; | |
1553 | s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed); | |
1554 | if (!s.ok()) { | |
1555 | return s; | |
1556 | } else if (flush_needed) { | |
1557 | ++num_cfs_to_flush; | |
1558 | } | |
1559 | } | |
1560 | if (0 == num_cfs_to_flush) { | |
1561 | return s; | |
1562 | } | |
1563 | } | |
1564 | FlushRequest flush_req; | |
1565 | autovector<ColumnFamilyData*> cfds; | |
1566 | { | |
1567 | WriteContext context; | |
1568 | InstrumentedMutexLock guard_lock(&mutex_); | |
1569 | ||
1570 | WriteThread::Writer w; | |
1571 | if (!writes_stopped) { | |
1572 | write_thread_.EnterUnbatched(&w, &mutex_); | |
1573 | } | |
1574 | ||
1575 | for (auto cfd : column_family_datas) { | |
1576 | if (cfd->IsDropped()) { | |
1577 | continue; | |
1578 | } | |
1579 | if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() || | |
1580 | !cached_recoverable_state_empty_.load()) { | |
1581 | cfds.emplace_back(cfd); | |
1582 | } | |
1583 | } | |
1584 | for (auto cfd : cfds) { | |
1585 | if (cfd->mem()->IsEmpty() && cached_recoverable_state_empty_.load()) { | |
1586 | continue; | |
1587 | } | |
1588 | cfd->Ref(); | |
1589 | s = SwitchMemtable(cfd, &context); | |
1590 | cfd->Unref(); | |
1591 | if (!s.ok()) { | |
1592 | break; | |
1593 | } | |
1594 | } | |
1595 | if (s.ok()) { | |
1596 | AssignAtomicFlushSeq(cfds); | |
1597 | for (auto cfd : cfds) { | |
1598 | cfd->imm()->FlushRequested(); | |
1599 | } | |
1600 | GenerateFlushRequest(cfds, &flush_req); | |
1601 | SchedulePendingFlush(flush_req, flush_reason); | |
1602 | MaybeScheduleFlushOrCompaction(); | |
1603 | } | |
1604 | ||
1605 | if (!writes_stopped) { | |
1606 | write_thread_.ExitUnbatched(&w); | |
1607 | } | |
1608 | } | |
1609 | TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:AfterScheduleFlush"); | |
1610 | ||
1611 | if (s.ok() && flush_options.wait) { | |
1612 | autovector<const uint64_t*> flush_memtable_ids; | |
1613 | for (auto& iter : flush_req) { | |
1614 | flush_memtable_ids.push_back(&(iter.second)); | |
1615 | } | |
1616 | s = WaitForFlushMemTables(cfds, flush_memtable_ids, | |
1617 | (flush_reason == FlushReason::kErrorRecovery)); | |
1618 | } | |
1619 | return s; | |
1620 | } | |
1621 | ||
11fdf7f2 TL |
1622 | // Calling FlushMemTable(), whether from DB::Flush() or from Backup Engine, can |
1623 | // cause write stall, for example if one memtable is being flushed already. | |
1624 | // This method tries to avoid write stall (similar to CompactRange() behavior) | |
1625 | // it emulates how the SuperVersion / LSM would change if flush happens, checks | |
1626 | // it against various constrains and delays flush if it'd cause write stall. | |
1627 | // Called should check status and flush_needed to see if flush already happened. | |
1628 | Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd, | |
494da23a | 1629 | bool* flush_needed) { |
11fdf7f2 TL |
1630 | { |
1631 | *flush_needed = true; | |
1632 | InstrumentedMutexLock l(&mutex_); | |
1633 | uint64_t orig_active_memtable_id = cfd->mem()->GetID(); | |
1634 | WriteStallCondition write_stall_condition = WriteStallCondition::kNormal; | |
1635 | do { | |
1636 | if (write_stall_condition != WriteStallCondition::kNormal) { | |
494da23a TL |
1637 | // Same error handling as user writes: Don't wait if there's a |
1638 | // background error, even if it's a soft error. We might wait here | |
1639 | // indefinitely as the pending flushes/compactions may never finish | |
1640 | // successfully, resulting in the stall condition lasting indefinitely | |
1641 | if (error_handler_.IsBGWorkStopped()) { | |
1642 | return error_handler_.GetBGError(); | |
1643 | } | |
1644 | ||
11fdf7f2 TL |
1645 | TEST_SYNC_POINT("DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait"); |
1646 | ROCKS_LOG_INFO(immutable_db_options_.info_log, | |
1647 | "[%s] WaitUntilFlushWouldNotStallWrites" | |
1648 | " waiting on stall conditions to clear", | |
1649 | cfd->GetName().c_str()); | |
1650 | bg_cv_.Wait(); | |
1651 | } | |
1652 | if (cfd->IsDropped() || shutting_down_.load(std::memory_order_acquire)) { | |
1653 | return Status::ShutdownInProgress(); | |
1654 | } | |
1655 | ||
1656 | uint64_t earliest_memtable_id = | |
1657 | std::min(cfd->mem()->GetID(), cfd->imm()->GetEarliestMemTableID()); | |
1658 | if (earliest_memtable_id > orig_active_memtable_id) { | |
1659 | // We waited so long that the memtable we were originally waiting on was | |
1660 | // flushed. | |
1661 | *flush_needed = false; | |
1662 | return Status::OK(); | |
1663 | } | |
1664 | ||
1665 | const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions(); | |
1666 | const auto* vstorage = cfd->current()->storage_info(); | |
1667 | ||
1668 | // Skip stalling check if we're below auto-flush and auto-compaction | |
1669 | // triggers. If it stalled in these conditions, that'd mean the stall | |
1670 | // triggers are so low that stalling is needed for any background work. In | |
1671 | // that case we shouldn't wait since background work won't be scheduled. | |
1672 | if (cfd->imm()->NumNotFlushed() < | |
1673 | cfd->ioptions()->min_write_buffer_number_to_merge && | |
1674 | vstorage->l0_delay_trigger_count() < | |
1675 | mutable_cf_options.level0_file_num_compaction_trigger) { | |
1676 | break; | |
1677 | } | |
1678 | ||
1679 | // check whether one extra immutable memtable or an extra L0 file would | |
1680 | // cause write stalling mode to be entered. It could still enter stall | |
1681 | // mode due to pending compaction bytes, but that's less common | |
1682 | write_stall_condition = | |
1683 | ColumnFamilyData::GetWriteStallConditionAndCause( | |
1684 | cfd->imm()->NumNotFlushed() + 1, | |
1685 | vstorage->l0_delay_trigger_count() + 1, | |
1686 | vstorage->estimated_compaction_needed_bytes(), mutable_cf_options) | |
1687 | .first; | |
1688 | } while (write_stall_condition != WriteStallCondition::kNormal); | |
1689 | } | |
1690 | return Status::OK(); | |
1691 | } | |
1692 | ||
1693 | // Wait for memtables to be flushed for multiple column families. | |
1694 | // let N = cfds.size() | |
1695 | // for i in [0, N), | |
1696 | // 1) if flush_memtable_ids[i] is not null, then the memtables with lower IDs | |
1697 | // have to be flushed for THIS column family; | |
1698 | // 2) if flush_memtable_ids[i] is null, then all memtables in THIS column | |
1699 | // family have to be flushed. | |
1700 | // Finish waiting when ALL column families finish flushing memtables. | |
494da23a TL |
1701 | // resuming_from_bg_err indicates whether the caller is trying to resume from |
1702 | // background error or in normal processing. | |
11fdf7f2 TL |
1703 | Status DBImpl::WaitForFlushMemTables( |
1704 | const autovector<ColumnFamilyData*>& cfds, | |
494da23a TL |
1705 | const autovector<const uint64_t*>& flush_memtable_ids, |
1706 | bool resuming_from_bg_err) { | |
11fdf7f2 | 1707 | int num = static_cast<int>(cfds.size()); |
7c673cae FG |
1708 | // Wait until the compaction completes |
1709 | InstrumentedMutexLock l(&mutex_); | |
494da23a TL |
1710 | // If the caller is trying to resume from bg error, then |
1711 | // error_handler_.IsDBStopped() is true. | |
1712 | while (resuming_from_bg_err || !error_handler_.IsDBStopped()) { | |
7c673cae FG |
1713 | if (shutting_down_.load(std::memory_order_acquire)) { |
1714 | return Status::ShutdownInProgress(); | |
1715 | } | |
494da23a TL |
1716 | // If an error has occurred during resumption, then no need to wait. |
1717 | if (!error_handler_.GetRecoveryError().ok()) { | |
1718 | break; | |
1719 | } | |
11fdf7f2 TL |
1720 | // Number of column families that have been dropped. |
1721 | int num_dropped = 0; | |
1722 | // Number of column families that have finished flush. | |
1723 | int num_finished = 0; | |
1724 | for (int i = 0; i < num; ++i) { | |
1725 | if (cfds[i]->IsDropped()) { | |
1726 | ++num_dropped; | |
1727 | } else if (cfds[i]->imm()->NumNotFlushed() == 0 || | |
1728 | (flush_memtable_ids[i] != nullptr && | |
1729 | cfds[i]->imm()->GetEarliestMemTableID() > | |
1730 | *flush_memtable_ids[i])) { | |
1731 | ++num_finished; | |
1732 | } | |
1733 | } | |
1734 | if (1 == num_dropped && 1 == num) { | |
7c673cae FG |
1735 | return Status::InvalidArgument("Cannot flush a dropped CF"); |
1736 | } | |
11fdf7f2 TL |
1737 | // Column families involved in this flush request have either been dropped |
1738 | // or finished flush. Then it's time to finish waiting. | |
1739 | if (num_dropped + num_finished == num) { | |
1740 | break; | |
1741 | } | |
7c673cae FG |
1742 | bg_cv_.Wait(); |
1743 | } | |
11fdf7f2 | 1744 | Status s; |
494da23a TL |
1745 | // If not resuming from bg error, and an error has caused the DB to stop, |
1746 | // then report the bg error to caller. | |
1747 | if (!resuming_from_bg_err && error_handler_.IsDBStopped()) { | |
11fdf7f2 | 1748 | s = error_handler_.GetBGError(); |
7c673cae FG |
1749 | } |
1750 | return s; | |
1751 | } | |
1752 | ||
1753 | Status DBImpl::EnableAutoCompaction( | |
1754 | const std::vector<ColumnFamilyHandle*>& column_family_handles) { | |
1755 | Status s; | |
1756 | for (auto cf_ptr : column_family_handles) { | |
1757 | Status status = | |
1758 | this->SetOptions(cf_ptr, {{"disable_auto_compactions", "false"}}); | |
1759 | if (!status.ok()) { | |
1760 | s = status; | |
1761 | } | |
1762 | } | |
1763 | ||
1764 | return s; | |
1765 | } | |
1766 | ||
1767 | void DBImpl::MaybeScheduleFlushOrCompaction() { | |
1768 | mutex_.AssertHeld(); | |
1769 | if (!opened_successfully_) { | |
1770 | // Compaction may introduce data race to DB open | |
1771 | return; | |
1772 | } | |
1773 | if (bg_work_paused_ > 0) { | |
1774 | // we paused the background work | |
1775 | return; | |
11fdf7f2 | 1776 | } else if (error_handler_.IsBGWorkStopped() && |
494da23a | 1777 | !error_handler_.IsRecoveryInProgress()) { |
11fdf7f2 TL |
1778 | // There has been a hard error and this call is not part of the recovery |
1779 | // sequence. Bail out here so we don't get into an endless loop of | |
1780 | // scheduling BG work which will again call this function | |
1781 | return; | |
7c673cae FG |
1782 | } else if (shutting_down_.load(std::memory_order_acquire)) { |
1783 | // DB is being deleted; no more background compactions | |
1784 | return; | |
1785 | } | |
11fdf7f2 TL |
1786 | auto bg_job_limits = GetBGJobLimits(); |
1787 | bool is_flush_pool_empty = | |
1788 | env_->GetBackgroundThreads(Env::Priority::HIGH) == 0; | |
1789 | while (!is_flush_pool_empty && unscheduled_flushes_ > 0 && | |
1790 | bg_flush_scheduled_ < bg_job_limits.max_flushes) { | |
7c673cae | 1791 | bg_flush_scheduled_++; |
494da23a TL |
1792 | FlushThreadArg* fta = new FlushThreadArg; |
1793 | fta->db_ = this; | |
1794 | fta->thread_pri_ = Env::Priority::HIGH; | |
1795 | env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::HIGH, this, | |
1796 | &DBImpl::UnscheduleFlushCallback); | |
7c673cae FG |
1797 | } |
1798 | ||
11fdf7f2 TL |
1799 | // special case -- if high-pri (flush) thread pool is empty, then schedule |
1800 | // flushes in low-pri (compaction) thread pool. | |
1801 | if (is_flush_pool_empty) { | |
7c673cae FG |
1802 | while (unscheduled_flushes_ > 0 && |
1803 | bg_flush_scheduled_ + bg_compaction_scheduled_ < | |
11fdf7f2 | 1804 | bg_job_limits.max_flushes) { |
7c673cae | 1805 | bg_flush_scheduled_++; |
494da23a TL |
1806 | FlushThreadArg* fta = new FlushThreadArg; |
1807 | fta->db_ = this; | |
1808 | fta->thread_pri_ = Env::Priority::LOW; | |
1809 | env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::LOW, this, | |
1810 | &DBImpl::UnscheduleFlushCallback); | |
7c673cae FG |
1811 | } |
1812 | } | |
1813 | ||
1814 | if (bg_compaction_paused_ > 0) { | |
1815 | // we paused the background compaction | |
1816 | return; | |
11fdf7f2 TL |
1817 | } else if (error_handler_.IsBGWorkStopped()) { |
1818 | // Compaction is not part of the recovery sequence from a hard error. We | |
1819 | // might get here because recovery might do a flush and install a new | |
1820 | // super version, which will try to schedule pending compactions. Bail | |
1821 | // out here and let the higher level recovery handle compactions | |
1822 | return; | |
7c673cae FG |
1823 | } |
1824 | ||
1825 | if (HasExclusiveManualCompaction()) { | |
1826 | // only manual compactions are allowed to run. don't schedule automatic | |
1827 | // compactions | |
11fdf7f2 | 1828 | TEST_SYNC_POINT("DBImpl::MaybeScheduleFlushOrCompaction:Conflict"); |
7c673cae FG |
1829 | return; |
1830 | } | |
1831 | ||
11fdf7f2 | 1832 | while (bg_compaction_scheduled_ < bg_job_limits.max_compactions && |
7c673cae FG |
1833 | unscheduled_compactions_ > 0) { |
1834 | CompactionArg* ca = new CompactionArg; | |
1835 | ca->db = this; | |
11fdf7f2 | 1836 | ca->prepicked_compaction = nullptr; |
7c673cae FG |
1837 | bg_compaction_scheduled_++; |
1838 | unscheduled_compactions_--; | |
1839 | env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this, | |
494da23a | 1840 | &DBImpl::UnscheduleCompactionCallback); |
7c673cae FG |
1841 | } |
1842 | } | |
1843 | ||
11fdf7f2 | 1844 | DBImpl::BGJobLimits DBImpl::GetBGJobLimits() const { |
7c673cae | 1845 | mutex_.AssertHeld(); |
11fdf7f2 TL |
1846 | return GetBGJobLimits(immutable_db_options_.max_background_flushes, |
1847 | mutable_db_options_.max_background_compactions, | |
1848 | mutable_db_options_.max_background_jobs, | |
1849 | write_controller_.NeedSpeedupCompaction()); | |
1850 | } | |
1851 | ||
1852 | DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes, | |
1853 | int max_background_compactions, | |
1854 | int max_background_jobs, | |
1855 | bool parallelize_compactions) { | |
1856 | BGJobLimits res; | |
1857 | if (max_background_flushes == -1 && max_background_compactions == -1) { | |
1858 | // for our first stab implementing max_background_jobs, simply allocate a | |
1859 | // quarter of the threads to flushes. | |
1860 | res.max_flushes = std::max(1, max_background_jobs / 4); | |
1861 | res.max_compactions = std::max(1, max_background_jobs - res.max_flushes); | |
7c673cae | 1862 | } else { |
11fdf7f2 TL |
1863 | // compatibility code in case users haven't migrated to max_background_jobs, |
1864 | // which automatically computes flush/compaction limits | |
1865 | res.max_flushes = std::max(1, max_background_flushes); | |
1866 | res.max_compactions = std::max(1, max_background_compactions); | |
7c673cae | 1867 | } |
11fdf7f2 TL |
1868 | if (!parallelize_compactions) { |
1869 | // throttle background compactions until we deem necessary | |
1870 | res.max_compactions = 1; | |
1871 | } | |
1872 | return res; | |
7c673cae FG |
1873 | } |
1874 | ||
1875 | void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) { | |
11fdf7f2 | 1876 | assert(!cfd->queued_for_compaction()); |
7c673cae FG |
1877 | cfd->Ref(); |
1878 | compaction_queue_.push_back(cfd); | |
11fdf7f2 | 1879 | cfd->set_queued_for_compaction(true); |
7c673cae FG |
1880 | } |
1881 | ||
1882 | ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() { | |
1883 | assert(!compaction_queue_.empty()); | |
1884 | auto cfd = *compaction_queue_.begin(); | |
1885 | compaction_queue_.pop_front(); | |
11fdf7f2 TL |
1886 | assert(cfd->queued_for_compaction()); |
1887 | cfd->set_queued_for_compaction(false); | |
7c673cae FG |
1888 | return cfd; |
1889 | } | |
1890 | ||
11fdf7f2 | 1891 | DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() { |
7c673cae | 1892 | assert(!flush_queue_.empty()); |
11fdf7f2 TL |
1893 | FlushRequest flush_req = flush_queue_.front(); |
1894 | assert(unscheduled_flushes_ >= static_cast<int>(flush_req.size())); | |
1895 | unscheduled_flushes_ -= static_cast<int>(flush_req.size()); | |
7c673cae | 1896 | flush_queue_.pop_front(); |
11fdf7f2 TL |
1897 | // TODO: need to unset flush reason? |
1898 | return flush_req; | |
7c673cae FG |
1899 | } |
1900 | ||
494da23a TL |
1901 | ColumnFamilyData* DBImpl::PickCompactionFromQueue( |
1902 | std::unique_ptr<TaskLimiterToken>* token, LogBuffer* log_buffer) { | |
1903 | assert(!compaction_queue_.empty()); | |
1904 | assert(*token == nullptr); | |
1905 | autovector<ColumnFamilyData*> throttled_candidates; | |
1906 | ColumnFamilyData* cfd = nullptr; | |
1907 | while (!compaction_queue_.empty()) { | |
1908 | auto first_cfd = *compaction_queue_.begin(); | |
1909 | compaction_queue_.pop_front(); | |
1910 | assert(first_cfd->queued_for_compaction()); | |
1911 | if (!RequestCompactionToken(first_cfd, false, token, log_buffer)) { | |
1912 | throttled_candidates.push_back(first_cfd); | |
1913 | continue; | |
1914 | } | |
1915 | cfd = first_cfd; | |
1916 | cfd->set_queued_for_compaction(false); | |
1917 | break; | |
1918 | } | |
1919 | // Add throttled compaction candidates back to queue in the original order. | |
1920 | for (auto iter = throttled_candidates.rbegin(); | |
1921 | iter != throttled_candidates.rend(); ++iter) { | |
1922 | compaction_queue_.push_front(*iter); | |
1923 | } | |
1924 | return cfd; | |
1925 | } | |
1926 | ||
11fdf7f2 TL |
1927 | void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req, |
1928 | FlushReason flush_reason) { | |
1929 | if (flush_req.empty()) { | |
1930 | return; | |
1931 | } | |
1932 | for (auto& iter : flush_req) { | |
1933 | ColumnFamilyData* cfd = iter.first; | |
1934 | cfd->Ref(); | |
1935 | cfd->SetFlushReason(flush_reason); | |
7c673cae | 1936 | } |
11fdf7f2 TL |
1937 | unscheduled_flushes_ += static_cast<int>(flush_req.size()); |
1938 | flush_queue_.push_back(flush_req); | |
7c673cae FG |
1939 | } |
1940 | ||
1941 | void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) { | |
11fdf7f2 | 1942 | if (!cfd->queued_for_compaction() && cfd->NeedsCompaction()) { |
7c673cae FG |
1943 | AddToCompactionQueue(cfd); |
1944 | ++unscheduled_compactions_; | |
1945 | } | |
1946 | } | |
1947 | ||
11fdf7f2 TL |
1948 | void DBImpl::SchedulePendingPurge(std::string fname, std::string dir_to_sync, |
1949 | FileType type, uint64_t number, int job_id) { | |
7c673cae | 1950 | mutex_.AssertHeld(); |
11fdf7f2 | 1951 | PurgeFileInfo file_info(fname, dir_to_sync, type, number, job_id); |
7c673cae FG |
1952 | purge_queue_.push_back(std::move(file_info)); |
1953 | } | |
1954 | ||
494da23a TL |
1955 | void DBImpl::BGWorkFlush(void* arg) { |
1956 | FlushThreadArg fta = *(reinterpret_cast<FlushThreadArg*>(arg)); | |
1957 | delete reinterpret_cast<FlushThreadArg*>(arg); | |
1958 | ||
1959 | IOSTATS_SET_THREAD_POOL_ID(fta.thread_pri_); | |
7c673cae | 1960 | TEST_SYNC_POINT("DBImpl::BGWorkFlush"); |
494da23a | 1961 | reinterpret_cast<DBImpl*>(fta.db_)->BackgroundCallFlush(fta.thread_pri_); |
7c673cae FG |
1962 | TEST_SYNC_POINT("DBImpl::BGWorkFlush:done"); |
1963 | } | |
1964 | ||
1965 | void DBImpl::BGWorkCompaction(void* arg) { | |
1966 | CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg)); | |
1967 | delete reinterpret_cast<CompactionArg*>(arg); | |
1968 | IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW); | |
1969 | TEST_SYNC_POINT("DBImpl::BGWorkCompaction"); | |
11fdf7f2 TL |
1970 | auto prepicked_compaction = |
1971 | static_cast<PrepickedCompaction*>(ca.prepicked_compaction); | |
1972 | reinterpret_cast<DBImpl*>(ca.db)->BackgroundCallCompaction( | |
1973 | prepicked_compaction, Env::Priority::LOW); | |
1974 | delete prepicked_compaction; | |
1975 | } | |
1976 | ||
1977 | void DBImpl::BGWorkBottomCompaction(void* arg) { | |
1978 | CompactionArg ca = *(static_cast<CompactionArg*>(arg)); | |
1979 | delete static_cast<CompactionArg*>(arg); | |
1980 | IOSTATS_SET_THREAD_POOL_ID(Env::Priority::BOTTOM); | |
1981 | TEST_SYNC_POINT("DBImpl::BGWorkBottomCompaction"); | |
1982 | auto* prepicked_compaction = ca.prepicked_compaction; | |
1983 | assert(prepicked_compaction && prepicked_compaction->compaction && | |
1984 | !prepicked_compaction->manual_compaction_state); | |
1985 | ca.db->BackgroundCallCompaction(prepicked_compaction, Env::Priority::BOTTOM); | |
1986 | delete prepicked_compaction; | |
7c673cae FG |
1987 | } |
1988 | ||
1989 | void DBImpl::BGWorkPurge(void* db) { | |
1990 | IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH); | |
1991 | TEST_SYNC_POINT("DBImpl::BGWorkPurge:start"); | |
1992 | reinterpret_cast<DBImpl*>(db)->BackgroundCallPurge(); | |
1993 | TEST_SYNC_POINT("DBImpl::BGWorkPurge:end"); | |
1994 | } | |
1995 | ||
494da23a | 1996 | void DBImpl::UnscheduleCompactionCallback(void* arg) { |
7c673cae FG |
1997 | CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg)); |
1998 | delete reinterpret_cast<CompactionArg*>(arg); | |
11fdf7f2 TL |
1999 | if (ca.prepicked_compaction != nullptr) { |
2000 | if (ca.prepicked_compaction->compaction != nullptr) { | |
2001 | delete ca.prepicked_compaction->compaction; | |
2002 | } | |
2003 | delete ca.prepicked_compaction; | |
7c673cae | 2004 | } |
494da23a TL |
2005 | TEST_SYNC_POINT("DBImpl::UnscheduleCompactionCallback"); |
2006 | } | |
2007 | ||
2008 | void DBImpl::UnscheduleFlushCallback(void* arg) { | |
2009 | delete reinterpret_cast<FlushThreadArg*>(arg); | |
2010 | TEST_SYNC_POINT("DBImpl::UnscheduleFlushCallback"); | |
7c673cae FG |
2011 | } |
2012 | ||
2013 | Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, | |
494da23a TL |
2014 | LogBuffer* log_buffer, FlushReason* reason, |
2015 | Env::Priority thread_pri) { | |
7c673cae FG |
2016 | mutex_.AssertHeld(); |
2017 | ||
11fdf7f2 TL |
2018 | Status status; |
2019 | *reason = FlushReason::kOthers; | |
2020 | // If BG work is stopped due to an error, but a recovery is in progress, | |
2021 | // that means this flush is part of the recovery. So allow it to go through | |
2022 | if (!error_handler_.IsBGWorkStopped()) { | |
2023 | if (shutting_down_.load(std::memory_order_acquire)) { | |
2024 | status = Status::ShutdownInProgress(); | |
2025 | } | |
2026 | } else if (!error_handler_.IsRecoveryInProgress()) { | |
2027 | status = error_handler_.GetBGError(); | |
7c673cae FG |
2028 | } |
2029 | ||
2030 | if (!status.ok()) { | |
2031 | return status; | |
2032 | } | |
2033 | ||
11fdf7f2 TL |
2034 | autovector<BGFlushArg> bg_flush_args; |
2035 | std::vector<SuperVersionContext>& superversion_contexts = | |
2036 | job_context->superversion_contexts; | |
7c673cae FG |
2037 | while (!flush_queue_.empty()) { |
2038 | // This cfd is already referenced | |
11fdf7f2 TL |
2039 | const FlushRequest& flush_req = PopFirstFromFlushQueue(); |
2040 | superversion_contexts.clear(); | |
2041 | superversion_contexts.reserve(flush_req.size()); | |
2042 | ||
2043 | for (const auto& iter : flush_req) { | |
2044 | ColumnFamilyData* cfd = iter.first; | |
2045 | if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) { | |
2046 | // can't flush this CF, try next one | |
2047 | if (cfd->Unref()) { | |
2048 | delete cfd; | |
2049 | } | |
2050 | continue; | |
7c673cae | 2051 | } |
11fdf7f2 TL |
2052 | superversion_contexts.emplace_back(SuperVersionContext(true)); |
2053 | bg_flush_args.emplace_back(cfd, iter.second, | |
2054 | &(superversion_contexts.back())); | |
2055 | } | |
2056 | if (!bg_flush_args.empty()) { | |
2057 | break; | |
7c673cae | 2058 | } |
7c673cae FG |
2059 | } |
2060 | ||
11fdf7f2 TL |
2061 | if (!bg_flush_args.empty()) { |
2062 | auto bg_job_limits = GetBGJobLimits(); | |
2063 | for (const auto& arg : bg_flush_args) { | |
2064 | ColumnFamilyData* cfd = arg.cfd_; | |
2065 | ROCKS_LOG_BUFFER( | |
2066 | log_buffer, | |
2067 | "Calling FlushMemTableToOutputFile with column " | |
2068 | "family [%s], flush slots available %d, compaction slots available " | |
2069 | "%d, " | |
2070 | "flush slots scheduled %d, compaction slots scheduled %d", | |
2071 | cfd->GetName().c_str(), bg_job_limits.max_flushes, | |
2072 | bg_job_limits.max_compactions, bg_flush_scheduled_, | |
2073 | bg_compaction_scheduled_); | |
2074 | } | |
2075 | status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress, | |
494da23a | 2076 | job_context, log_buffer, thread_pri); |
11fdf7f2 TL |
2077 | // All the CFDs in the FlushReq must have the same flush reason, so just |
2078 | // grab the first one | |
2079 | *reason = bg_flush_args[0].cfd_->GetFlushReason(); | |
2080 | for (auto& arg : bg_flush_args) { | |
2081 | ColumnFamilyData* cfd = arg.cfd_; | |
2082 | if (cfd->Unref()) { | |
2083 | delete cfd; | |
2084 | arg.cfd_ = nullptr; | |
2085 | } | |
7c673cae FG |
2086 | } |
2087 | } | |
2088 | return status; | |
2089 | } | |
2090 | ||
494da23a | 2091 | void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) { |
7c673cae FG |
2092 | bool made_progress = false; |
2093 | JobContext job_context(next_job_id_.fetch_add(1), true); | |
7c673cae FG |
2094 | |
2095 | TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:start"); | |
2096 | ||
2097 | LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, | |
2098 | immutable_db_options_.info_log.get()); | |
2099 | { | |
2100 | InstrumentedMutexLock l(&mutex_); | |
11fdf7f2 | 2101 | assert(bg_flush_scheduled_); |
7c673cae FG |
2102 | num_running_flushes_++; |
2103 | ||
2104 | auto pending_outputs_inserted_elem = | |
2105 | CaptureCurrentFileNumberInPendingOutputs(); | |
11fdf7f2 | 2106 | FlushReason reason; |
7c673cae | 2107 | |
494da23a TL |
2108 | Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer, |
2109 | &reason, thread_pri); | |
11fdf7f2 TL |
2110 | if (!s.ok() && !s.IsShutdownInProgress() && |
2111 | reason != FlushReason::kErrorRecovery) { | |
7c673cae FG |
2112 | // Wait a little bit before retrying background flush in |
2113 | // case this is an environmental problem and we do not want to | |
2114 | // chew up resources for failed flushes for the duration of | |
2115 | // the problem. | |
2116 | uint64_t error_cnt = | |
11fdf7f2 | 2117 | default_cf_internal_stats_->BumpAndGetBackgroundErrorCount(); |
7c673cae FG |
2118 | bg_cv_.SignalAll(); // In case a waiter can proceed despite the error |
2119 | mutex_.Unlock(); | |
2120 | ROCKS_LOG_ERROR(immutable_db_options_.info_log, | |
2121 | "Waiting after background flush error: %s" | |
2122 | "Accumulated background error counts: %" PRIu64, | |
2123 | s.ToString().c_str(), error_cnt); | |
2124 | log_buffer.FlushBufferToLog(); | |
2125 | LogFlush(immutable_db_options_.info_log); | |
2126 | env_->SleepForMicroseconds(1000000); | |
2127 | mutex_.Lock(); | |
2128 | } | |
2129 | ||
494da23a | 2130 | TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FlushFinish:0"); |
7c673cae FG |
2131 | ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); |
2132 | ||
2133 | // If flush failed, we want to delete all temporary files that we might have | |
2134 | // created. Thus, we force full scan in FindObsoleteFiles() | |
2135 | FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress()); | |
2136 | // delete unnecessary files if any, this is done outside the mutex | |
11fdf7f2 TL |
2137 | if (job_context.HaveSomethingToClean() || |
2138 | job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { | |
7c673cae | 2139 | mutex_.Unlock(); |
11fdf7f2 | 2140 | TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FilesFound"); |
7c673cae FG |
2141 | // Have to flush the info logs before bg_flush_scheduled_-- |
2142 | // because if bg_flush_scheduled_ becomes 0 and the lock is | |
2143 | // released, the deconstructor of DB can kick in and destroy all the | |
2144 | // states of DB so info_log might not be available after that point. | |
2145 | // It also applies to access other states that DB owns. | |
2146 | log_buffer.FlushBufferToLog(); | |
2147 | if (job_context.HaveSomethingToDelete()) { | |
2148 | PurgeObsoleteFiles(job_context); | |
2149 | } | |
2150 | job_context.Clean(); | |
2151 | mutex_.Lock(); | |
2152 | } | |
494da23a | 2153 | TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:ContextCleanedUp"); |
7c673cae FG |
2154 | |
2155 | assert(num_running_flushes_ > 0); | |
2156 | num_running_flushes_--; | |
2157 | bg_flush_scheduled_--; | |
2158 | // See if there's more work to be done | |
2159 | MaybeScheduleFlushOrCompaction(); | |
494da23a | 2160 | atomic_flush_install_cv_.SignalAll(); |
7c673cae FG |
2161 | bg_cv_.SignalAll(); |
2162 | // IMPORTANT: there should be no code after calling SignalAll. This call may | |
2163 | // signal the DB destructor that it's OK to proceed with destruction. In | |
2164 | // that case, all DB variables will be dealloacated and referencing them | |
2165 | // will cause trouble. | |
2166 | } | |
2167 | } | |
2168 | ||
11fdf7f2 TL |
2169 | void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, |
2170 | Env::Priority bg_thread_pri) { | |
7c673cae | 2171 | bool made_progress = false; |
7c673cae FG |
2172 | JobContext job_context(next_job_id_.fetch_add(1), true); |
2173 | TEST_SYNC_POINT("BackgroundCallCompaction:0"); | |
7c673cae FG |
2174 | LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, |
2175 | immutable_db_options_.info_log.get()); | |
2176 | { | |
2177 | InstrumentedMutexLock l(&mutex_); | |
2178 | ||
2179 | // This call will unlock/lock the mutex to wait for current running | |
2180 | // IngestExternalFile() calls to finish. | |
2181 | WaitForIngestFile(); | |
2182 | ||
2183 | num_running_compactions_++; | |
2184 | ||
2185 | auto pending_outputs_inserted_elem = | |
2186 | CaptureCurrentFileNumberInPendingOutputs(); | |
2187 | ||
11fdf7f2 TL |
2188 | assert((bg_thread_pri == Env::Priority::BOTTOM && |
2189 | bg_bottom_compaction_scheduled_) || | |
2190 | (bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_)); | |
2191 | Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer, | |
494da23a | 2192 | prepicked_compaction, bg_thread_pri); |
7c673cae | 2193 | TEST_SYNC_POINT("BackgroundCallCompaction:1"); |
494da23a TL |
2194 | if (s.IsBusy()) { |
2195 | bg_cv_.SignalAll(); // In case a waiter can proceed despite the error | |
2196 | mutex_.Unlock(); | |
2197 | env_->SleepForMicroseconds(10000); // prevent hot loop | |
2198 | mutex_.Lock(); | |
2199 | } else if (!s.ok() && !s.IsShutdownInProgress()) { | |
7c673cae FG |
2200 | // Wait a little bit before retrying background compaction in |
2201 | // case this is an environmental problem and we do not want to | |
2202 | // chew up resources for failed compactions for the duration of | |
2203 | // the problem. | |
2204 | uint64_t error_cnt = | |
2205 | default_cf_internal_stats_->BumpAndGetBackgroundErrorCount(); | |
2206 | bg_cv_.SignalAll(); // In case a waiter can proceed despite the error | |
2207 | mutex_.Unlock(); | |
2208 | log_buffer.FlushBufferToLog(); | |
2209 | ROCKS_LOG_ERROR(immutable_db_options_.info_log, | |
2210 | "Waiting after background compaction error: %s, " | |
2211 | "Accumulated background error counts: %" PRIu64, | |
2212 | s.ToString().c_str(), error_cnt); | |
2213 | LogFlush(immutable_db_options_.info_log); | |
2214 | env_->SleepForMicroseconds(1000000); | |
2215 | mutex_.Lock(); | |
2216 | } | |
2217 | ||
2218 | ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); | |
2219 | ||
2220 | // If compaction failed, we want to delete all temporary files that we might | |
2221 | // have created (they might not be all recorded in job_context in case of a | |
2222 | // failure). Thus, we force full scan in FindObsoleteFiles() | |
2223 | FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress()); | |
11fdf7f2 | 2224 | TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"); |
7c673cae FG |
2225 | |
2226 | // delete unnecessary files if any, this is done outside the mutex | |
11fdf7f2 TL |
2227 | if (job_context.HaveSomethingToClean() || |
2228 | job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { | |
7c673cae FG |
2229 | mutex_.Unlock(); |
2230 | // Have to flush the info logs before bg_compaction_scheduled_-- | |
2231 | // because if bg_flush_scheduled_ becomes 0 and the lock is | |
2232 | // released, the deconstructor of DB can kick in and destroy all the | |
2233 | // states of DB so info_log might not be available after that point. | |
2234 | // It also applies to access other states that DB owns. | |
2235 | log_buffer.FlushBufferToLog(); | |
2236 | if (job_context.HaveSomethingToDelete()) { | |
2237 | PurgeObsoleteFiles(job_context); | |
11fdf7f2 | 2238 | TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles"); |
7c673cae FG |
2239 | } |
2240 | job_context.Clean(); | |
2241 | mutex_.Lock(); | |
2242 | } | |
2243 | ||
2244 | assert(num_running_compactions_ > 0); | |
2245 | num_running_compactions_--; | |
11fdf7f2 TL |
2246 | if (bg_thread_pri == Env::Priority::LOW) { |
2247 | bg_compaction_scheduled_--; | |
2248 | } else { | |
2249 | assert(bg_thread_pri == Env::Priority::BOTTOM); | |
2250 | bg_bottom_compaction_scheduled_--; | |
2251 | } | |
7c673cae FG |
2252 | |
2253 | versions_->GetColumnFamilySet()->FreeDeadColumnFamilies(); | |
2254 | ||
2255 | // See if there's more work to be done | |
2256 | MaybeScheduleFlushOrCompaction(); | |
11fdf7f2 TL |
2257 | if (made_progress || |
2258 | (bg_compaction_scheduled_ == 0 && | |
2259 | bg_bottom_compaction_scheduled_ == 0) || | |
2260 | HasPendingManualCompaction() || unscheduled_compactions_ == 0) { | |
7c673cae FG |
2261 | // signal if |
2262 | // * made_progress -- need to wakeup DelayWrite | |
11fdf7f2 | 2263 | // * bg_{bottom,}_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl |
7c673cae FG |
2264 | // * HasPendingManualCompaction -- need to wakeup RunManualCompaction |
2265 | // If none of this is true, there is no need to signal since nobody is | |
2266 | // waiting for it | |
2267 | bg_cv_.SignalAll(); | |
2268 | } | |
2269 | // IMPORTANT: there should be no code after calling SignalAll. This call may | |
2270 | // signal the DB destructor that it's OK to proceed with destruction. In | |
2271 | // that case, all DB variables will be dealloacated and referencing them | |
2272 | // will cause trouble. | |
2273 | } | |
2274 | } | |
2275 | ||
2276 | Status DBImpl::BackgroundCompaction(bool* made_progress, | |
2277 | JobContext* job_context, | |
11fdf7f2 | 2278 | LogBuffer* log_buffer, |
494da23a TL |
2279 | PrepickedCompaction* prepicked_compaction, |
2280 | Env::Priority thread_pri) { | |
11fdf7f2 TL |
2281 | ManualCompactionState* manual_compaction = |
2282 | prepicked_compaction == nullptr | |
2283 | ? nullptr | |
2284 | : prepicked_compaction->manual_compaction_state; | |
7c673cae FG |
2285 | *made_progress = false; |
2286 | mutex_.AssertHeld(); | |
2287 | TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Start"); | |
2288 | ||
2289 | bool is_manual = (manual_compaction != nullptr); | |
494da23a | 2290 | std::unique_ptr<Compaction> c; |
11fdf7f2 TL |
2291 | if (prepicked_compaction != nullptr && |
2292 | prepicked_compaction->compaction != nullptr) { | |
2293 | c.reset(prepicked_compaction->compaction); | |
2294 | } | |
2295 | bool is_prepicked = is_manual || c; | |
7c673cae FG |
2296 | |
2297 | // (manual_compaction->in_progress == false); | |
2298 | bool trivial_move_disallowed = | |
2299 | is_manual && manual_compaction->disallow_trivial_move; | |
2300 | ||
2301 | CompactionJobStats compaction_job_stats; | |
11fdf7f2 TL |
2302 | Status status; |
2303 | if (!error_handler_.IsBGWorkStopped()) { | |
2304 | if (shutting_down_.load(std::memory_order_acquire)) { | |
2305 | status = Status::ShutdownInProgress(); | |
2306 | } | |
2307 | } else { | |
2308 | status = error_handler_.GetBGError(); | |
2309 | // If we get here, it means a hard error happened after this compaction | |
2310 | // was scheduled by MaybeScheduleFlushOrCompaction(), but before it got | |
2311 | // a chance to execute. Since we didn't pop a cfd from the compaction | |
2312 | // queue, increment unscheduled_compactions_ | |
2313 | unscheduled_compactions_++; | |
7c673cae FG |
2314 | } |
2315 | ||
2316 | if (!status.ok()) { | |
2317 | if (is_manual) { | |
2318 | manual_compaction->status = status; | |
2319 | manual_compaction->done = true; | |
2320 | manual_compaction->in_progress = false; | |
7c673cae FG |
2321 | manual_compaction = nullptr; |
2322 | } | |
494da23a TL |
2323 | if (c) { |
2324 | c->ReleaseCompactionFiles(status); | |
2325 | c.reset(); | |
2326 | } | |
7c673cae FG |
2327 | return status; |
2328 | } | |
2329 | ||
2330 | if (is_manual) { | |
2331 | // another thread cannot pick up the same work | |
2332 | manual_compaction->in_progress = true; | |
2333 | } | |
2334 | ||
494da23a TL |
2335 | std::unique_ptr<TaskLimiterToken> task_token; |
2336 | ||
7c673cae FG |
2337 | // InternalKey manual_end_storage; |
2338 | // InternalKey* manual_end = &manual_end_storage; | |
11fdf7f2 | 2339 | bool sfm_reserved_compact_space = false; |
7c673cae | 2340 | if (is_manual) { |
11fdf7f2 | 2341 | ManualCompactionState* m = manual_compaction; |
7c673cae | 2342 | assert(m->in_progress); |
7c673cae FG |
2343 | if (!c) { |
2344 | m->done = true; | |
2345 | m->manual_end = nullptr; | |
2346 | ROCKS_LOG_BUFFER(log_buffer, | |
2347 | "[%s] Manual compaction from level-%d from %s .. " | |
2348 | "%s; nothing to do\n", | |
2349 | m->cfd->GetName().c_str(), m->input_level, | |
2350 | (m->begin ? m->begin->DebugString().c_str() : "(begin)"), | |
2351 | (m->end ? m->end->DebugString().c_str() : "(end)")); | |
2352 | } else { | |
11fdf7f2 TL |
2353 | // First check if we have enough room to do the compaction |
2354 | bool enough_room = EnoughRoomForCompaction( | |
2355 | m->cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer); | |
2356 | ||
2357 | if (!enough_room) { | |
2358 | // Then don't do the compaction | |
2359 | c->ReleaseCompactionFiles(status); | |
2360 | c.reset(); | |
2361 | // m's vars will get set properly at the end of this function, | |
2362 | // as long as status == CompactionTooLarge | |
2363 | status = Status::CompactionTooLarge(); | |
2364 | } else { | |
2365 | ROCKS_LOG_BUFFER( | |
2366 | log_buffer, | |
2367 | "[%s] Manual compaction from level-%d to level-%d from %s .. " | |
2368 | "%s; will stop at %s\n", | |
2369 | m->cfd->GetName().c_str(), m->input_level, c->output_level(), | |
2370 | (m->begin ? m->begin->DebugString().c_str() : "(begin)"), | |
2371 | (m->end ? m->end->DebugString().c_str() : "(end)"), | |
2372 | ((m->done || m->manual_end == nullptr) | |
2373 | ? "(end)" | |
2374 | : m->manual_end->DebugString().c_str())); | |
2375 | } | |
2376 | } | |
2377 | } else if (!is_prepicked && !compaction_queue_.empty()) { | |
2378 | if (HasExclusiveManualCompaction()) { | |
2379 | // Can't compact right now, but try again later | |
2380 | TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict"); | |
2381 | ||
2382 | // Stay in the compaction queue. | |
2383 | unscheduled_compactions_++; | |
2384 | ||
2385 | return Status::OK(); | |
2386 | } | |
2387 | ||
494da23a TL |
2388 | auto cfd = PickCompactionFromQueue(&task_token, log_buffer); |
2389 | if (cfd == nullptr) { | |
2390 | // Can't find any executable task from the compaction queue. | |
2391 | // All tasks have been throttled by compaction thread limiter. | |
2392 | ++unscheduled_compactions_; | |
2393 | return Status::Busy(); | |
2394 | } | |
2395 | ||
7c673cae FG |
2396 | // We unreference here because the following code will take a Ref() on |
2397 | // this cfd if it is going to use it (Compaction class holds a | |
2398 | // reference). | |
2399 | // This will all happen under a mutex so we don't have to be afraid of | |
2400 | // somebody else deleting it. | |
2401 | if (cfd->Unref()) { | |
7c673cae FG |
2402 | // This was the last reference of the column family, so no need to |
2403 | // compact. | |
494da23a | 2404 | delete cfd; |
7c673cae FG |
2405 | return Status::OK(); |
2406 | } | |
2407 | ||
7c673cae FG |
2408 | // Pick up latest mutable CF Options and use it throughout the |
2409 | // compaction job | |
2410 | // Compaction makes a copy of the latest MutableCFOptions. It should be used | |
2411 | // throughout the compaction procedure to make sure consistency. It will | |
2412 | // eventually be installed into SuperVersion | |
2413 | auto* mutable_cf_options = cfd->GetLatestMutableCFOptions(); | |
2414 | if (!mutable_cf_options->disable_auto_compactions && !cfd->IsDropped()) { | |
2415 | // NOTE: try to avoid unnecessary copy of MutableCFOptions if | |
2416 | // compaction is not necessary. Need to make sure mutex is held | |
2417 | // until we make a copy in the following code | |
2418 | TEST_SYNC_POINT("DBImpl::BackgroundCompaction():BeforePickCompaction"); | |
2419 | c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer)); | |
2420 | TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction"); | |
11fdf7f2 | 2421 | |
7c673cae | 2422 | if (c != nullptr) { |
11fdf7f2 TL |
2423 | bool enough_room = EnoughRoomForCompaction( |
2424 | cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer); | |
2425 | ||
2426 | if (!enough_room) { | |
2427 | // Then don't do the compaction | |
2428 | c->ReleaseCompactionFiles(status); | |
2429 | c->column_family_data() | |
2430 | ->current() | |
2431 | ->storage_info() | |
2432 | ->ComputeCompactionScore(*(c->immutable_cf_options()), | |
2433 | *(c->mutable_cf_options())); | |
7c673cae FG |
2434 | AddToCompactionQueue(cfd); |
2435 | ++unscheduled_compactions_; | |
11fdf7f2 TL |
2436 | |
2437 | c.reset(); | |
2438 | // Don't need to sleep here, because BackgroundCallCompaction | |
2439 | // will sleep if !s.ok() | |
2440 | status = Status::CompactionTooLarge(); | |
2441 | } else { | |
2442 | // update statistics | |
494da23a TL |
2443 | RecordInHistogram(stats_, NUM_FILES_IN_SINGLE_COMPACTION, |
2444 | c->inputs(0)->size()); | |
11fdf7f2 TL |
2445 | // There are three things that can change compaction score: |
2446 | // 1) When flush or compaction finish. This case is covered by | |
2447 | // InstallSuperVersionAndScheduleWork | |
2448 | // 2) When MutableCFOptions changes. This case is also covered by | |
2449 | // InstallSuperVersionAndScheduleWork, because this is when the new | |
2450 | // options take effect. | |
2451 | // 3) When we Pick a new compaction, we "remove" those files being | |
2452 | // compacted from the calculation, which then influences compaction | |
2453 | // score. Here we check if we need the new compaction even without the | |
2454 | // files that are currently being compacted. If we need another | |
2455 | // compaction, we might be able to execute it in parallel, so we add | |
2456 | // it to the queue and schedule a new thread. | |
2457 | if (cfd->NeedsCompaction()) { | |
2458 | // Yes, we need more compactions! | |
2459 | AddToCompactionQueue(cfd); | |
2460 | ++unscheduled_compactions_; | |
2461 | MaybeScheduleFlushOrCompaction(); | |
2462 | } | |
7c673cae FG |
2463 | } |
2464 | } | |
2465 | } | |
2466 | } | |
2467 | ||
2468 | if (!c) { | |
2469 | // Nothing to do | |
2470 | ROCKS_LOG_BUFFER(log_buffer, "Compaction nothing to do"); | |
2471 | } else if (c->deletion_compaction()) { | |
2472 | // TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old | |
2473 | // file if there is alive snapshot pointing to it | |
494da23a TL |
2474 | TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction", |
2475 | c->column_family_data()); | |
7c673cae FG |
2476 | assert(c->num_input_files(1) == 0); |
2477 | assert(c->level() == 0); | |
2478 | assert(c->column_family_data()->ioptions()->compaction_style == | |
2479 | kCompactionStyleFIFO); | |
2480 | ||
2481 | compaction_job_stats.num_input_files = c->num_input_files(0); | |
2482 | ||
494da23a TL |
2483 | NotifyOnCompactionBegin(c->column_family_data(), c.get(), status, |
2484 | compaction_job_stats, job_context->job_id); | |
2485 | ||
7c673cae FG |
2486 | for (const auto& f : *c->inputs(0)) { |
2487 | c->edit()->DeleteFile(c->level(), f->fd.GetNumber()); | |
2488 | } | |
2489 | status = versions_->LogAndApply(c->column_family_data(), | |
2490 | *c->mutable_cf_options(), c->edit(), | |
2491 | &mutex_, directories_.GetDbDir()); | |
494da23a TL |
2492 | InstallSuperVersionAndScheduleWork(c->column_family_data(), |
2493 | &job_context->superversion_contexts[0], | |
2494 | *c->mutable_cf_options()); | |
7c673cae FG |
2495 | ROCKS_LOG_BUFFER(log_buffer, "[%s] Deleted %d files\n", |
2496 | c->column_family_data()->GetName().c_str(), | |
2497 | c->num_input_files(0)); | |
2498 | *made_progress = true; | |
494da23a TL |
2499 | TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction", |
2500 | c->column_family_data()); | |
7c673cae FG |
2501 | } else if (!trivial_move_disallowed && c->IsTrivialMove()) { |
2502 | TEST_SYNC_POINT("DBImpl::BackgroundCompaction:TrivialMove"); | |
494da23a TL |
2503 | TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction", |
2504 | c->column_family_data()); | |
7c673cae FG |
2505 | // Instrument for event update |
2506 | // TODO(yhchiang): add op details for showing trivial-move. | |
2507 | ThreadStatusUtil::SetColumnFamily( | |
2508 | c->column_family_data(), c->column_family_data()->ioptions()->env, | |
2509 | immutable_db_options_.enable_thread_tracking); | |
2510 | ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION); | |
2511 | ||
2512 | compaction_job_stats.num_input_files = c->num_input_files(0); | |
2513 | ||
494da23a TL |
2514 | NotifyOnCompactionBegin(c->column_family_data(), c.get(), status, |
2515 | compaction_job_stats, job_context->job_id); | |
2516 | ||
7c673cae FG |
2517 | // Move files to next level |
2518 | int32_t moved_files = 0; | |
2519 | int64_t moved_bytes = 0; | |
2520 | for (unsigned int l = 0; l < c->num_input_levels(); l++) { | |
2521 | if (c->level(l) == c->output_level()) { | |
2522 | continue; | |
2523 | } | |
2524 | for (size_t i = 0; i < c->num_input_files(l); i++) { | |
2525 | FileMetaData* f = c->input(l, i); | |
2526 | c->edit()->DeleteFile(c->level(l), f->fd.GetNumber()); | |
2527 | c->edit()->AddFile(c->output_level(), f->fd.GetNumber(), | |
2528 | f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest, | |
11fdf7f2 TL |
2529 | f->largest, f->fd.smallest_seqno, |
2530 | f->fd.largest_seqno, f->marked_for_compaction); | |
2531 | ||
2532 | ROCKS_LOG_BUFFER( | |
2533 | log_buffer, | |
2534 | "[%s] Moving #%" PRIu64 " to level-%d %" PRIu64 " bytes\n", | |
2535 | c->column_family_data()->GetName().c_str(), f->fd.GetNumber(), | |
2536 | c->output_level(), f->fd.GetFileSize()); | |
7c673cae FG |
2537 | ++moved_files; |
2538 | moved_bytes += f->fd.GetFileSize(); | |
2539 | } | |
2540 | } | |
2541 | ||
2542 | status = versions_->LogAndApply(c->column_family_data(), | |
2543 | *c->mutable_cf_options(), c->edit(), | |
2544 | &mutex_, directories_.GetDbDir()); | |
2545 | // Use latest MutableCFOptions | |
494da23a TL |
2546 | InstallSuperVersionAndScheduleWork(c->column_family_data(), |
2547 | &job_context->superversion_contexts[0], | |
2548 | *c->mutable_cf_options()); | |
7c673cae FG |
2549 | |
2550 | VersionStorageInfo::LevelSummaryStorage tmp; | |
2551 | c->column_family_data()->internal_stats()->IncBytesMoved(c->output_level(), | |
2552 | moved_bytes); | |
2553 | { | |
2554 | event_logger_.LogToBuffer(log_buffer) | |
2555 | << "job" << job_context->job_id << "event" | |
2556 | << "trivial_move" | |
2557 | << "destination_level" << c->output_level() << "files" << moved_files | |
2558 | << "total_files_size" << moved_bytes; | |
2559 | } | |
2560 | ROCKS_LOG_BUFFER( | |
2561 | log_buffer, | |
2562 | "[%s] Moved #%d files to level-%d %" PRIu64 " bytes %s: %s\n", | |
2563 | c->column_family_data()->GetName().c_str(), moved_files, | |
2564 | c->output_level(), moved_bytes, status.ToString().c_str(), | |
2565 | c->column_family_data()->current()->storage_info()->LevelSummary(&tmp)); | |
2566 | *made_progress = true; | |
2567 | ||
2568 | // Clear Instrument | |
2569 | ThreadStatusUtil::ResetThreadStatus(); | |
494da23a TL |
2570 | TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction", |
2571 | c->column_family_data()); | |
11fdf7f2 TL |
2572 | } else if (!is_prepicked && c->output_level() > 0 && |
2573 | c->output_level() == | |
2574 | c->column_family_data() | |
2575 | ->current() | |
2576 | ->storage_info() | |
2577 | ->MaxOutputLevel( | |
2578 | immutable_db_options_.allow_ingest_behind) && | |
2579 | env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) { | |
2580 | // Forward compactions involving last level to the bottom pool if it exists, | |
2581 | // such that compactions unlikely to contribute to write stalls can be | |
2582 | // delayed or deprioritized. | |
2583 | TEST_SYNC_POINT("DBImpl::BackgroundCompaction:ForwardToBottomPriPool"); | |
2584 | CompactionArg* ca = new CompactionArg; | |
2585 | ca->db = this; | |
2586 | ca->prepicked_compaction = new PrepickedCompaction; | |
2587 | ca->prepicked_compaction->compaction = c.release(); | |
2588 | ca->prepicked_compaction->manual_compaction_state = nullptr; | |
494da23a TL |
2589 | // Transfer requested token, so it doesn't need to do it again. |
2590 | ca->prepicked_compaction->task_token = std::move(task_token); | |
11fdf7f2 TL |
2591 | ++bg_bottom_compaction_scheduled_; |
2592 | env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, Env::Priority::BOTTOM, | |
494da23a | 2593 | this, &DBImpl::UnscheduleCompactionCallback); |
7c673cae | 2594 | } else { |
494da23a TL |
2595 | TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction", |
2596 | c->column_family_data()); | |
11fdf7f2 TL |
2597 | int output_level __attribute__((__unused__)); |
2598 | output_level = c->output_level(); | |
7c673cae FG |
2599 | TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial", |
2600 | &output_level); | |
494da23a | 2601 | std::vector<SequenceNumber> snapshot_seqs; |
7c673cae | 2602 | SequenceNumber earliest_write_conflict_snapshot; |
494da23a TL |
2603 | SnapshotChecker* snapshot_checker; |
2604 | GetSnapshotContext(job_context, &snapshot_seqs, | |
2605 | &earliest_write_conflict_snapshot, &snapshot_checker); | |
7c673cae FG |
2606 | assert(is_snapshot_supported_ || snapshots_.empty()); |
2607 | CompactionJob compaction_job( | |
11fdf7f2 TL |
2608 | job_context->job_id, c.get(), immutable_db_options_, |
2609 | env_options_for_compaction_, versions_.get(), &shutting_down_, | |
2610 | preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(), | |
2611 | GetDataDir(c->column_family_data(), c->output_path_id()), stats_, | |
494da23a TL |
2612 | &mutex_, &error_handler_, snapshot_seqs, |
2613 | earliest_write_conflict_snapshot, snapshot_checker, table_cache_, | |
2614 | &event_logger_, c->mutable_cf_options()->paranoid_file_checks, | |
7c673cae | 2615 | c->mutable_cf_options()->report_bg_io_stats, dbname_, |
494da23a | 2616 | &compaction_job_stats, thread_pri); |
7c673cae FG |
2617 | compaction_job.Prepare(); |
2618 | ||
494da23a TL |
2619 | NotifyOnCompactionBegin(c->column_family_data(), c.get(), status, |
2620 | compaction_job_stats, job_context->job_id); | |
2621 | ||
7c673cae FG |
2622 | mutex_.Unlock(); |
2623 | compaction_job.Run(); | |
2624 | TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun"); | |
2625 | mutex_.Lock(); | |
2626 | ||
2627 | status = compaction_job.Install(*c->mutable_cf_options()); | |
2628 | if (status.ok()) { | |
494da23a TL |
2629 | InstallSuperVersionAndScheduleWork(c->column_family_data(), |
2630 | &job_context->superversion_contexts[0], | |
2631 | *c->mutable_cf_options()); | |
7c673cae FG |
2632 | } |
2633 | *made_progress = true; | |
494da23a TL |
2634 | TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction", |
2635 | c->column_family_data()); | |
7c673cae FG |
2636 | } |
2637 | if (c != nullptr) { | |
2638 | c->ReleaseCompactionFiles(status); | |
2639 | *made_progress = true; | |
11fdf7f2 TL |
2640 | |
2641 | #ifndef ROCKSDB_LITE | |
2642 | // Need to make sure SstFileManager does its bookkeeping | |
2643 | auto sfm = static_cast<SstFileManagerImpl*>( | |
2644 | immutable_db_options_.sst_file_manager.get()); | |
2645 | if (sfm && sfm_reserved_compact_space) { | |
2646 | sfm->OnCompactionCompletion(c.get()); | |
2647 | } | |
2648 | #endif // ROCKSDB_LITE | |
2649 | ||
2650 | NotifyOnCompactionCompleted(c->column_family_data(), c.get(), status, | |
2651 | compaction_job_stats, job_context->job_id); | |
7c673cae | 2652 | } |
7c673cae | 2653 | |
11fdf7f2 | 2654 | if (status.ok() || status.IsCompactionTooLarge()) { |
7c673cae FG |
2655 | // Done |
2656 | } else if (status.IsShutdownInProgress()) { | |
2657 | // Ignore compaction errors found during shutting down | |
2658 | } else { | |
2659 | ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s", | |
2660 | status.ToString().c_str()); | |
11fdf7f2 TL |
2661 | error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction); |
2662 | if (c != nullptr && !is_manual && !error_handler_.IsBGWorkStopped()) { | |
2663 | // Put this cfd back in the compaction queue so we can retry after some | |
2664 | // time | |
2665 | auto cfd = c->column_family_data(); | |
2666 | assert(cfd != nullptr); | |
2667 | // Since this compaction failed, we need to recompute the score so it | |
2668 | // takes the original input files into account | |
2669 | c->column_family_data() | |
2670 | ->current() | |
2671 | ->storage_info() | |
2672 | ->ComputeCompactionScore(*(c->immutable_cf_options()), | |
2673 | *(c->mutable_cf_options())); | |
2674 | if (!cfd->queued_for_compaction()) { | |
2675 | AddToCompactionQueue(cfd); | |
2676 | ++unscheduled_compactions_; | |
2677 | } | |
7c673cae FG |
2678 | } |
2679 | } | |
11fdf7f2 TL |
2680 | // this will unref its input_version and column_family_data |
2681 | c.reset(); | |
7c673cae FG |
2682 | |
2683 | if (is_manual) { | |
11fdf7f2 | 2684 | ManualCompactionState* m = manual_compaction; |
7c673cae FG |
2685 | if (!status.ok()) { |
2686 | m->status = status; | |
2687 | m->done = true; | |
2688 | } | |
2689 | // For universal compaction: | |
2690 | // Because universal compaction always happens at level 0, so one | |
2691 | // compaction will pick up all overlapped files. No files will be | |
2692 | // filtered out due to size limit and left for a successive compaction. | |
2693 | // So we can safely conclude the current compaction. | |
2694 | // | |
2695 | // Also note that, if we don't stop here, then the current compaction | |
2696 | // writes a new file back to level 0, which will be used in successive | |
2697 | // compaction. Hence the manual compaction will never finish. | |
2698 | // | |
2699 | // Stop the compaction if manual_end points to nullptr -- this means | |
2700 | // that we compacted the whole range. manual_end should always point | |
2701 | // to nullptr in case of universal compaction | |
2702 | if (m->manual_end == nullptr) { | |
2703 | m->done = true; | |
2704 | } | |
2705 | if (!m->done) { | |
2706 | // We only compacted part of the requested range. Update *m | |
2707 | // to the range that is left to be compacted. | |
2708 | // Universal and FIFO compactions should always compact the whole range | |
2709 | assert(m->cfd->ioptions()->compaction_style != | |
2710 | kCompactionStyleUniversal || | |
2711 | m->cfd->ioptions()->num_levels > 1); | |
2712 | assert(m->cfd->ioptions()->compaction_style != kCompactionStyleFIFO); | |
2713 | m->tmp_storage = *m->manual_end; | |
2714 | m->begin = &m->tmp_storage; | |
2715 | m->incomplete = true; | |
2716 | } | |
11fdf7f2 | 2717 | m->in_progress = false; // not being processed anymore |
7c673cae FG |
2718 | } |
2719 | TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Finish"); | |
2720 | return status; | |
2721 | } | |
2722 | ||
2723 | bool DBImpl::HasPendingManualCompaction() { | |
2724 | return (!manual_compaction_dequeue_.empty()); | |
2725 | } | |
2726 | ||
11fdf7f2 | 2727 | void DBImpl::AddManualCompaction(DBImpl::ManualCompactionState* m) { |
7c673cae FG |
2728 | manual_compaction_dequeue_.push_back(m); |
2729 | } | |
2730 | ||
11fdf7f2 | 2731 | void DBImpl::RemoveManualCompaction(DBImpl::ManualCompactionState* m) { |
7c673cae | 2732 | // Remove from queue |
11fdf7f2 | 2733 | std::deque<ManualCompactionState*>::iterator it = |
7c673cae FG |
2734 | manual_compaction_dequeue_.begin(); |
2735 | while (it != manual_compaction_dequeue_.end()) { | |
2736 | if (m == (*it)) { | |
2737 | it = manual_compaction_dequeue_.erase(it); | |
2738 | return; | |
2739 | } | |
2740 | it++; | |
2741 | } | |
2742 | assert(false); | |
2743 | return; | |
2744 | } | |
2745 | ||
11fdf7f2 | 2746 | bool DBImpl::ShouldntRunManualCompaction(ManualCompactionState* m) { |
7c673cae FG |
2747 | if (num_running_ingest_file_ > 0) { |
2748 | // We need to wait for other IngestExternalFile() calls to finish | |
2749 | // before running a manual compaction. | |
2750 | return true; | |
2751 | } | |
2752 | if (m->exclusive) { | |
11fdf7f2 TL |
2753 | return (bg_bottom_compaction_scheduled_ > 0 || |
2754 | bg_compaction_scheduled_ > 0); | |
7c673cae | 2755 | } |
11fdf7f2 | 2756 | std::deque<ManualCompactionState*>::iterator it = |
7c673cae FG |
2757 | manual_compaction_dequeue_.begin(); |
2758 | bool seen = false; | |
2759 | while (it != manual_compaction_dequeue_.end()) { | |
2760 | if (m == (*it)) { | |
2761 | it++; | |
2762 | seen = true; | |
2763 | continue; | |
2764 | } else if (MCOverlap(m, (*it)) && (!seen && !(*it)->in_progress)) { | |
2765 | // Consider the other manual compaction *it, conflicts if: | |
2766 | // overlaps with m | |
2767 | // and (*it) is ahead in the queue and is not yet in progress | |
2768 | return true; | |
2769 | } | |
2770 | it++; | |
2771 | } | |
2772 | return false; | |
2773 | } | |
2774 | ||
2775 | bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) { | |
2776 | // Remove from priority queue | |
11fdf7f2 | 2777 | std::deque<ManualCompactionState*>::iterator it = |
7c673cae FG |
2778 | manual_compaction_dequeue_.begin(); |
2779 | while (it != manual_compaction_dequeue_.end()) { | |
2780 | if ((*it)->exclusive) { | |
2781 | return true; | |
2782 | } | |
2783 | if ((cfd == (*it)->cfd) && (!((*it)->in_progress || (*it)->done))) { | |
2784 | // Allow automatic compaction if manual compaction is | |
11fdf7f2 | 2785 | // in progress |
7c673cae FG |
2786 | return true; |
2787 | } | |
2788 | it++; | |
2789 | } | |
2790 | return false; | |
2791 | } | |
2792 | ||
2793 | bool DBImpl::HasExclusiveManualCompaction() { | |
2794 | // Remove from priority queue | |
11fdf7f2 | 2795 | std::deque<ManualCompactionState*>::iterator it = |
7c673cae FG |
2796 | manual_compaction_dequeue_.begin(); |
2797 | while (it != manual_compaction_dequeue_.end()) { | |
2798 | if ((*it)->exclusive) { | |
2799 | return true; | |
2800 | } | |
2801 | it++; | |
2802 | } | |
2803 | return false; | |
2804 | } | |
2805 | ||
11fdf7f2 | 2806 | bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) { |
7c673cae FG |
2807 | if ((m->exclusive) || (m1->exclusive)) { |
2808 | return true; | |
2809 | } | |
2810 | if (m->cfd != m1->cfd) { | |
2811 | return false; | |
2812 | } | |
2813 | return true; | |
2814 | } | |
2815 | ||
494da23a TL |
2816 | #ifndef ROCKSDB_LITE |
2817 | void DBImpl::BuildCompactionJobInfo( | |
2818 | const ColumnFamilyData* cfd, Compaction* c, const Status& st, | |
2819 | const CompactionJobStats& compaction_job_stats, const int job_id, | |
2820 | const Version* current, CompactionJobInfo* compaction_job_info) const { | |
2821 | assert(compaction_job_info != nullptr); | |
2822 | compaction_job_info->cf_id = cfd->GetID(); | |
2823 | compaction_job_info->cf_name = cfd->GetName(); | |
2824 | compaction_job_info->status = st; | |
2825 | compaction_job_info->thread_id = env_->GetThreadID(); | |
2826 | compaction_job_info->job_id = job_id; | |
2827 | compaction_job_info->base_input_level = c->start_level(); | |
2828 | compaction_job_info->output_level = c->output_level(); | |
2829 | compaction_job_info->stats = compaction_job_stats; | |
2830 | compaction_job_info->table_properties = c->GetOutputTableProperties(); | |
2831 | compaction_job_info->compaction_reason = c->compaction_reason(); | |
2832 | compaction_job_info->compression = c->output_compression(); | |
2833 | for (size_t i = 0; i < c->num_input_levels(); ++i) { | |
2834 | for (const auto fmd : *c->inputs(i)) { | |
2835 | auto fn = TableFileName(c->immutable_cf_options()->cf_paths, | |
2836 | fmd->fd.GetNumber(), fmd->fd.GetPathId()); | |
2837 | compaction_job_info->input_files.push_back(fn); | |
2838 | if (compaction_job_info->table_properties.count(fn) == 0) { | |
2839 | std::shared_ptr<const TableProperties> tp; | |
2840 | auto s = current->GetTableProperties(&tp, fmd, &fn); | |
2841 | if (s.ok()) { | |
2842 | compaction_job_info->table_properties[fn] = tp; | |
2843 | } | |
2844 | } | |
2845 | } | |
2846 | } | |
2847 | for (const auto& newf : c->edit()->GetNewFiles()) { | |
2848 | compaction_job_info->output_files.push_back( | |
2849 | TableFileName(c->immutable_cf_options()->cf_paths, | |
2850 | newf.second.fd.GetNumber(), newf.second.fd.GetPathId())); | |
2851 | } | |
2852 | } | |
2853 | #endif | |
2854 | ||
11fdf7f2 TL |
2855 | // SuperVersionContext gets created and destructed outside of the lock -- |
2856 | // we use this conveniently to: | |
7c673cae FG |
2857 | // * malloc one SuperVersion() outside of the lock -- new_superversion |
2858 | // * delete SuperVersion()s outside of the lock -- superversions_to_free | |
2859 | // | |
2860 | // However, if InstallSuperVersionAndScheduleWork() gets called twice with the | |
11fdf7f2 | 2861 | // same sv_context, we can't reuse the SuperVersion() that got |
7c673cae FG |
2862 | // malloced because |
2863 | // first call already used it. In that rare case, we take a hit and create a | |
2864 | // new SuperVersion() inside of the mutex. We do similar thing | |
2865 | // for superversion_to_free | |
7c673cae | 2866 | |
11fdf7f2 TL |
2867 | void DBImpl::InstallSuperVersionAndScheduleWork( |
2868 | ColumnFamilyData* cfd, SuperVersionContext* sv_context, | |
494da23a | 2869 | const MutableCFOptions& mutable_cf_options) { |
7c673cae FG |
2870 | mutex_.AssertHeld(); |
2871 | ||
2872 | // Update max_total_in_memory_state_ | |
2873 | size_t old_memtable_size = 0; | |
2874 | auto* old_sv = cfd->GetSuperVersion(); | |
2875 | if (old_sv) { | |
2876 | old_memtable_size = old_sv->mutable_cf_options.write_buffer_size * | |
2877 | old_sv->mutable_cf_options.max_write_buffer_number; | |
2878 | } | |
2879 | ||
11fdf7f2 TL |
2880 | // this branch is unlikely to step in |
2881 | if (UNLIKELY(sv_context->new_superversion == nullptr)) { | |
2882 | sv_context->NewSuperVersion(); | |
2883 | } | |
2884 | cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options); | |
7c673cae | 2885 | |
494da23a TL |
2886 | // There may be a small data race here. The snapshot tricking bottommost |
2887 | // compaction may already be released here. But assuming there will always be | |
2888 | // newer snapshot created and released frequently, the compaction will be | |
2889 | // triggered soon anyway. | |
2890 | bottommost_files_mark_threshold_ = kMaxSequenceNumber; | |
2891 | for (auto* my_cfd : *versions_->GetColumnFamilySet()) { | |
2892 | bottommost_files_mark_threshold_ = std::min( | |
2893 | bottommost_files_mark_threshold_, | |
2894 | my_cfd->current()->storage_info()->bottommost_files_mark_threshold()); | |
2895 | } | |
2896 | ||
7c673cae FG |
2897 | // Whenever we install new SuperVersion, we might need to issue new flushes or |
2898 | // compactions. | |
7c673cae FG |
2899 | SchedulePendingCompaction(cfd); |
2900 | MaybeScheduleFlushOrCompaction(); | |
2901 | ||
2902 | // Update max_total_in_memory_state_ | |
11fdf7f2 TL |
2903 | max_total_in_memory_state_ = max_total_in_memory_state_ - old_memtable_size + |
2904 | mutable_cf_options.write_buffer_size * | |
2905 | mutable_cf_options.max_write_buffer_number; | |
2906 | } | |
2907 | ||
2908 | // ShouldPurge is called by FindObsoleteFiles when doing a full scan, | |
2909 | // and db mutex (mutex_) should already be held. This function performs a | |
2910 | // linear scan of an vector (files_grabbed_for_purge_) in search of a | |
2911 | // certain element. We expect FindObsoleteFiles with full scan to occur once | |
2912 | // every 10 hours by default, and the size of the vector is small. | |
2913 | // Therefore, the cost is affordable even if the mutex is held. | |
2914 | // Actually, the current implementation of FindObsoleteFiles with | |
2915 | // full_scan=true can issue I/O requests to obtain list of files in | |
2916 | // directories, e.g. env_->getChildren while holding db mutex. | |
2917 | // In the future, if we want to reduce the cost of search, we may try to keep | |
2918 | // the vector sorted. | |
2919 | bool DBImpl::ShouldPurge(uint64_t file_number) const { | |
2920 | for (auto fn : files_grabbed_for_purge_) { | |
2921 | if (file_number == fn) { | |
2922 | return false; | |
2923 | } | |
2924 | } | |
2925 | for (const auto& purge_file_info : purge_queue_) { | |
2926 | if (purge_file_info.number == file_number) { | |
2927 | return false; | |
2928 | } | |
2929 | } | |
2930 | return true; | |
2931 | } | |
2932 | ||
2933 | // MarkAsGrabbedForPurge is called by FindObsoleteFiles, and db mutex | |
2934 | // (mutex_) should already be held. | |
2935 | void DBImpl::MarkAsGrabbedForPurge(uint64_t file_number) { | |
2936 | files_grabbed_for_purge_.emplace_back(file_number); | |
2937 | } | |
2938 | ||
2939 | void DBImpl::SetSnapshotChecker(SnapshotChecker* snapshot_checker) { | |
2940 | InstrumentedMutexLock l(&mutex_); | |
2941 | // snapshot_checker_ should only set once. If we need to set it multiple | |
2942 | // times, we need to make sure the old one is not deleted while it is still | |
2943 | // using by a compaction job. | |
2944 | assert(!snapshot_checker_); | |
2945 | snapshot_checker_.reset(snapshot_checker); | |
7c673cae | 2946 | } |
494da23a TL |
2947 | |
2948 | void DBImpl::GetSnapshotContext( | |
2949 | JobContext* job_context, std::vector<SequenceNumber>* snapshot_seqs, | |
2950 | SequenceNumber* earliest_write_conflict_snapshot, | |
2951 | SnapshotChecker** snapshot_checker_ptr) { | |
2952 | mutex_.AssertHeld(); | |
2953 | assert(job_context != nullptr); | |
2954 | assert(snapshot_seqs != nullptr); | |
2955 | assert(earliest_write_conflict_snapshot != nullptr); | |
2956 | assert(snapshot_checker_ptr != nullptr); | |
2957 | ||
2958 | *snapshot_checker_ptr = snapshot_checker_.get(); | |
2959 | if (use_custom_gc_ && *snapshot_checker_ptr == nullptr) { | |
2960 | *snapshot_checker_ptr = DisableGCSnapshotChecker::Instance(); | |
2961 | } | |
2962 | if (*snapshot_checker_ptr != nullptr) { | |
2963 | // If snapshot_checker is used, that means the flush/compaction may | |
2964 | // contain values not visible to snapshot taken after | |
2965 | // flush/compaction job starts. Take a snapshot and it will appear | |
2966 | // in snapshot_seqs and force compaction iterator to consider such | |
2967 | // snapshots. | |
2968 | const Snapshot* job_snapshot = | |
2969 | GetSnapshotImpl(false /*write_conflict_boundary*/, false /*lock*/); | |
2970 | job_context->job_snapshot.reset(new ManagedSnapshot(this, job_snapshot)); | |
2971 | } | |
2972 | *snapshot_seqs = snapshots_.GetAll(earliest_write_conflict_snapshot); | |
2973 | } | |
7c673cae | 2974 | } // namespace rocksdb |