]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/column_family.cc
bump version to 19.2.0-pve1
[ceph.git] / ceph / src / rocksdb / db / column_family.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10#include "db/column_family.h"
11
7c673cae 12#include <algorithm>
f67539c2 13#include <cinttypes>
7c673cae 14#include <limits>
20effc67 15#include <sstream>
f67539c2
TL
16#include <string>
17#include <vector>
7c673cae 18
20effc67 19#include "db/blob/blob_file_cache.h"
1e59de90 20#include "db/blob/blob_source.h"
f67539c2
TL
21#include "db/compaction/compaction_picker.h"
22#include "db/compaction/compaction_picker_fifo.h"
23#include "db/compaction/compaction_picker_level.h"
24#include "db/compaction/compaction_picker_universal.h"
25#include "db/db_impl/db_impl.h"
7c673cae
FG
26#include "db/internal_stats.h"
27#include "db/job_context.h"
494da23a 28#include "db/range_del_aggregator.h"
7c673cae
FG
29#include "db/table_properties_collector.h"
30#include "db/version_set.h"
31#include "db/write_controller.h"
f67539c2 32#include "file/sst_file_manager_impl.h"
1e59de90 33#include "logging/logging.h"
7c673cae
FG
34#include "monitoring/thread_status_util.h"
35#include "options/options_helper.h"
f67539c2 36#include "port/port.h"
1e59de90 37#include "rocksdb/convenience.h"
20effc67 38#include "rocksdb/table.h"
11fdf7f2 39#include "table/merging_iterator.h"
7c673cae 40#include "util/autovector.h"
20effc67 41#include "util/cast_util.h"
7c673cae
FG
42#include "util/compression.h"
43
f67539c2 44namespace ROCKSDB_NAMESPACE {
7c673cae
FG
45
46ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
47 ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
48 : cfd_(column_family_data), db_(db), mutex_(mutex) {
49 if (cfd_ != nullptr) {
50 cfd_->Ref();
51 }
52}
53
54ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
55 if (cfd_ != nullptr) {
56#ifndef ROCKSDB_LITE
57 for (auto& listener : cfd_->ioptions()->listeners) {
58 listener->OnColumnFamilyHandleDeletionStarted(this);
59 }
60#endif // ROCKSDB_LITE
61 // Job id == 0 means that this is not our background process, but rather
62 // user thread
11fdf7f2
TL
63 // Need to hold some shared pointers owned by the initial_cf_options
64 // before final cleaning up finishes.
65 ColumnFamilyOptions initial_cf_options_copy = cfd_->initial_cf_options();
7c673cae
FG
66 JobContext job_context(0);
67 mutex_->Lock();
f67539c2
TL
68 bool dropped = cfd_->IsDropped();
69 if (cfd_->UnrefAndTryDelete()) {
70 if (dropped) {
71 db_->FindObsoleteFiles(&job_context, false, true);
72 }
7c673cae 73 }
7c673cae
FG
74 mutex_->Unlock();
75 if (job_context.HaveSomethingToDelete()) {
494da23a
TL
76 bool defer_purge =
77 db_->immutable_db_options().avoid_unnecessary_blocking_io;
78 db_->PurgeObsoleteFiles(job_context, defer_purge);
7c673cae
FG
79 }
80 job_context.Clean();
81 }
82}
83
84uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }
85
86const std::string& ColumnFamilyHandleImpl::GetName() const {
87 return cfd()->GetName();
88}
89
90Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) {
91#ifndef ROCKSDB_LITE
92 // accessing mutable cf-options requires db mutex.
93 InstrumentedMutexLock l(mutex_);
94 *desc = ColumnFamilyDescriptor(cfd()->GetName(), cfd()->GetLatestCFOptions());
95 return Status::OK();
96#else
11fdf7f2 97 (void)desc;
7c673cae
FG
98 return Status::NotSupported();
99#endif // !ROCKSDB_LITE
100}
101
102const Comparator* ColumnFamilyHandleImpl::GetComparator() const {
103 return cfd()->user_comparator();
104}
105
106void GetIntTblPropCollectorFactory(
107 const ImmutableCFOptions& ioptions,
1e59de90
TL
108 IntTblPropCollectorFactories* int_tbl_prop_collector_factories) {
109 assert(int_tbl_prop_collector_factories);
110
7c673cae
FG
111 auto& collector_factories = ioptions.table_properties_collector_factories;
112 for (size_t i = 0; i < ioptions.table_properties_collector_factories.size();
113 ++i) {
114 assert(collector_factories[i]);
115 int_tbl_prop_collector_factories->emplace_back(
116 new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
117 }
7c673cae
FG
118}
119
120Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
121 if (!cf_options.compression_per_level.empty()) {
122 for (size_t level = 0; level < cf_options.compression_per_level.size();
123 ++level) {
124 if (!CompressionTypeSupported(cf_options.compression_per_level[level])) {
125 return Status::InvalidArgument(
126 "Compression type " +
127 CompressionTypeToString(cf_options.compression_per_level[level]) +
128 " is not linked with the binary.");
129 }
130 }
131 } else {
132 if (!CompressionTypeSupported(cf_options.compression)) {
133 return Status::InvalidArgument(
134 "Compression type " +
135 CompressionTypeToString(cf_options.compression) +
136 " is not linked with the binary.");
137 }
138 }
11fdf7f2 139 if (cf_options.compression_opts.zstd_max_train_bytes > 0) {
1e59de90
TL
140 if (cf_options.compression_opts.use_zstd_dict_trainer) {
141 if (!ZSTD_TrainDictionarySupported()) {
142 return Status::InvalidArgument(
143 "zstd dictionary trainer cannot be used because ZSTD 1.1.3+ "
144 "is not linked with the binary.");
145 }
146 } else if (!ZSTD_FinalizeDictionarySupported()) {
11fdf7f2 147 return Status::InvalidArgument(
1e59de90 148 "zstd finalizeDictionary cannot be used because ZSTD 1.4.5+ "
494da23a 149 "is not linked with the binary.");
11fdf7f2
TL
150 }
151 if (cf_options.compression_opts.max_dict_bytes == 0) {
152 return Status::InvalidArgument(
153 "The dictionary size limit (`CompressionOptions::max_dict_bytes`) "
154 "should be nonzero if we're using zstd's dictionary generator.");
155 }
156 }
20effc67
TL
157
158 if (!CompressionTypeSupported(cf_options.blob_compression_type)) {
159 std::ostringstream oss;
160 oss << "The specified blob compression type "
161 << CompressionTypeToString(cf_options.blob_compression_type)
162 << " is not available.";
163
164 return Status::InvalidArgument(oss.str());
165 }
166
7c673cae
FG
167 return Status::OK();
168}
169
170Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
171 if (cf_options.inplace_update_support) {
172 return Status::InvalidArgument(
173 "In-place memtable updates (inplace_update_support) is not compatible "
174 "with concurrent writes (allow_concurrent_memtable_write)");
175 }
176 if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) {
177 return Status::InvalidArgument(
178 "Memtable doesn't concurrent writes (allow_concurrent_memtable_write)");
179 }
180 return Status::OK();
181}
182
11fdf7f2
TL
183Status CheckCFPathsSupported(const DBOptions& db_options,
184 const ColumnFamilyOptions& cf_options) {
185 // More than one cf_paths are supported only in universal
186 // and level compaction styles. This function also checks the case
187 // in which cf_paths is not specified, which results in db_paths
188 // being used.
189 if ((cf_options.compaction_style != kCompactionStyleUniversal) &&
190 (cf_options.compaction_style != kCompactionStyleLevel)) {
191 if (cf_options.cf_paths.size() > 1) {
192 return Status::NotSupported(
193 "More than one CF paths are only supported in "
194 "universal and level compaction styles. ");
1e59de90 195 } else if (cf_options.cf_paths.empty() && db_options.db_paths.size() > 1) {
11fdf7f2
TL
196 return Status::NotSupported(
197 "More than one DB paths are only supported in "
198 "universal and level compaction styles. ");
199 }
200 }
201 return Status::OK();
202}
203
f67539c2
TL
204namespace {
205const uint64_t kDefaultTtl = 0xfffffffffffffffe;
206const uint64_t kDefaultPeriodicCompSecs = 0xfffffffffffffffe;
1e59de90 207} // anonymous namespace
f67539c2 208
7c673cae
FG
209ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
210 const ColumnFamilyOptions& src) {
211 ColumnFamilyOptions result = src;
212 size_t clamp_max = std::conditional<
213 sizeof(size_t) == 4, std::integral_constant<size_t, 0xffffffff>,
214 std::integral_constant<uint64_t, 64ull << 30>>::type::value;
1e59de90
TL
215 ClipToRange(&result.write_buffer_size, (static_cast<size_t>(64)) << 10,
216 clamp_max);
7c673cae
FG
217 // if user sets arena_block_size, we trust user to use this value. Otherwise,
218 // calculate a proper value from writer_buffer_size;
219 if (result.arena_block_size <= 0) {
1e59de90
TL
220 result.arena_block_size =
221 std::min(size_t{1024 * 1024}, result.write_buffer_size / 8);
7c673cae
FG
222
223 // Align up to 4k
224 const size_t align = 4 * 1024;
225 result.arena_block_size =
226 ((result.arena_block_size + align - 1) / align) * align;
227 }
228 result.min_write_buffer_number_to_merge =
229 std::min(result.min_write_buffer_number_to_merge,
230 result.max_write_buffer_number - 1);
231 if (result.min_write_buffer_number_to_merge < 1) {
232 result.min_write_buffer_number_to_merge = 1;
233 }
234
1e59de90
TL
235 if (db_options.atomic_flush && result.min_write_buffer_number_to_merge > 1) {
236 ROCKS_LOG_WARN(
237 db_options.logger,
238 "Currently, if atomic_flush is true, then triggering flush for any "
239 "column family internally (non-manual flush) will trigger flushing "
240 "all column families even if the number of memtables is smaller "
241 "min_write_buffer_number_to_merge. Therefore, configuring "
242 "min_write_buffer_number_to_merge > 1 is not compatible and should "
243 "be satinized to 1. Not doing so will lead to data loss and "
244 "inconsistent state across multiple column families when WAL is "
245 "disabled, which is a common setting for atomic flush");
246
247 result.min_write_buffer_number_to_merge = 1;
248 }
249
7c673cae
FG
250 if (result.num_levels < 1) {
251 result.num_levels = 1;
252 }
253 if (result.compaction_style == kCompactionStyleLevel &&
254 result.num_levels < 2) {
255 result.num_levels = 2;
256 }
11fdf7f2
TL
257
258 if (result.compaction_style == kCompactionStyleUniversal &&
259 db_options.allow_ingest_behind && result.num_levels < 3) {
260 result.num_levels = 3;
261 }
262
7c673cae
FG
263 if (result.max_write_buffer_number < 2) {
264 result.max_write_buffer_number = 2;
265 }
f67539c2
TL
266 // fall back max_write_buffer_number_to_maintain if
267 // max_write_buffer_size_to_maintain is not set
268 if (result.max_write_buffer_size_to_maintain < 0) {
269 result.max_write_buffer_size_to_maintain =
270 result.max_write_buffer_number *
271 static_cast<int64_t>(result.write_buffer_size);
272 } else if (result.max_write_buffer_size_to_maintain == 0 &&
273 result.max_write_buffer_number_to_maintain < 0) {
7c673cae
FG
274 result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
275 }
276 // bloom filter size shouldn't exceed 1/4 of memtable size.
277 if (result.memtable_prefix_bloom_size_ratio > 0.25) {
278 result.memtable_prefix_bloom_size_ratio = 0.25;
279 } else if (result.memtable_prefix_bloom_size_ratio < 0) {
280 result.memtable_prefix_bloom_size_ratio = 0;
281 }
282
283 if (!result.prefix_extractor) {
284 assert(result.memtable_factory);
285 Slice name = result.memtable_factory->Name();
286 if (name.compare("HashSkipListRepFactory") == 0 ||
287 name.compare("HashLinkListRepFactory") == 0) {
288 result.memtable_factory = std::make_shared<SkipListFactory>();
289 }
290 }
291
292 if (result.compaction_style == kCompactionStyleFIFO) {
7c673cae
FG
293 // since we delete level0 files in FIFO compaction when there are too many
294 // of them, these options don't really mean anything
7c673cae
FG
295 result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
296 result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
297 }
298
299 if (result.max_bytes_for_level_multiplier <= 0) {
300 result.max_bytes_for_level_multiplier = 1;
301 }
302
303 if (result.level0_file_num_compaction_trigger == 0) {
1e59de90 304 ROCKS_LOG_WARN(db_options.logger,
7c673cae
FG
305 "level0_file_num_compaction_trigger cannot be 0");
306 result.level0_file_num_compaction_trigger = 1;
307 }
308
309 if (result.level0_stop_writes_trigger <
310 result.level0_slowdown_writes_trigger ||
311 result.level0_slowdown_writes_trigger <
312 result.level0_file_num_compaction_trigger) {
1e59de90 313 ROCKS_LOG_WARN(db_options.logger,
7c673cae
FG
314 "This condition must be satisfied: "
315 "level0_stop_writes_trigger(%d) >= "
316 "level0_slowdown_writes_trigger(%d) >= "
317 "level0_file_num_compaction_trigger(%d)",
318 result.level0_stop_writes_trigger,
319 result.level0_slowdown_writes_trigger,
320 result.level0_file_num_compaction_trigger);
321 if (result.level0_slowdown_writes_trigger <
322 result.level0_file_num_compaction_trigger) {
323 result.level0_slowdown_writes_trigger =
324 result.level0_file_num_compaction_trigger;
325 }
326 if (result.level0_stop_writes_trigger <
327 result.level0_slowdown_writes_trigger) {
328 result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
329 }
1e59de90 330 ROCKS_LOG_WARN(db_options.logger,
7c673cae
FG
331 "Adjust the value to "
332 "level0_stop_writes_trigger(%d)"
333 "level0_slowdown_writes_trigger(%d)"
334 "level0_file_num_compaction_trigger(%d)",
335 result.level0_stop_writes_trigger,
336 result.level0_slowdown_writes_trigger,
337 result.level0_file_num_compaction_trigger);
338 }
339
340 if (result.soft_pending_compaction_bytes_limit == 0) {
341 result.soft_pending_compaction_bytes_limit =
342 result.hard_pending_compaction_bytes_limit;
343 } else if (result.hard_pending_compaction_bytes_limit > 0 &&
344 result.soft_pending_compaction_bytes_limit >
345 result.hard_pending_compaction_bytes_limit) {
346 result.soft_pending_compaction_bytes_limit =
347 result.hard_pending_compaction_bytes_limit;
348 }
349
11fdf7f2
TL
350#ifndef ROCKSDB_LITE
351 // When the DB is stopped, it's possible that there are some .trash files that
352 // were not deleted yet, when we open the DB we will find these .trash files
353 // and schedule them to be deleted (or delete immediately if SstFileManager
354 // was not used)
1e59de90
TL
355 auto sfm =
356 static_cast<SstFileManagerImpl*>(db_options.sst_file_manager.get());
11fdf7f2 357 for (size_t i = 0; i < result.cf_paths.size(); i++) {
1e59de90
TL
358 DeleteScheduler::CleanupDirectory(db_options.env, sfm,
359 result.cf_paths[i].path)
360 .PermitUncheckedError();
11fdf7f2
TL
361 }
362#endif
363
364 if (result.cf_paths.empty()) {
365 result.cf_paths = db_options.db_paths;
366 }
367
7c673cae 368 if (result.level_compaction_dynamic_level_bytes) {
1e59de90
TL
369 if (result.compaction_style != kCompactionStyleLevel) {
370 ROCKS_LOG_WARN(db_options.info_log.get(),
371 "level_compaction_dynamic_level_bytes only makes sense"
372 "for level-based compaction");
373 result.level_compaction_dynamic_level_bytes = false;
374 } else if (result.cf_paths.size() > 1U) {
375 // we don't yet know how to make both of this feature and multiple
376 // DB path work.
377 ROCKS_LOG_WARN(db_options.info_log.get(),
378 "multiple cf_paths/db_paths and"
379 "level_compaction_dynamic_level_bytes"
380 "can't be used together");
7c673cae
FG
381 result.level_compaction_dynamic_level_bytes = false;
382 }
383 }
384
385 if (result.max_compaction_bytes == 0) {
386 result.max_compaction_bytes = result.target_file_size_base * 25;
387 }
388
20effc67
TL
389 bool is_block_based_table = (result.table_factory->IsInstanceOf(
390 TableFactory::kBlockBasedTableName()));
f67539c2
TL
391
392 const uint64_t kAdjustedTtl = 30 * 24 * 60 * 60;
393 if (result.ttl == kDefaultTtl) {
394 if (is_block_based_table &&
395 result.compaction_style != kCompactionStyleFIFO) {
396 result.ttl = kAdjustedTtl;
397 } else {
398 result.ttl = 0;
399 }
400 }
401
402 const uint64_t kAdjustedPeriodicCompSecs = 30 * 24 * 60 * 60;
403
404 // Turn on periodic compactions and set them to occur once every 30 days if
405 // compaction filters are used and periodic_compaction_seconds is set to the
406 // default value.
407 if (result.compaction_style != kCompactionStyleFIFO) {
408 if ((result.compaction_filter != nullptr ||
409 result.compaction_filter_factory != nullptr) &&
410 result.periodic_compaction_seconds == kDefaultPeriodicCompSecs &&
411 is_block_based_table) {
412 result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
413 }
414 } else {
415 // result.compaction_style == kCompactionStyleFIFO
416 if (result.ttl == 0) {
417 if (is_block_based_table) {
418 if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
419 result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
420 }
421 result.ttl = result.periodic_compaction_seconds;
422 }
423 } else if (result.periodic_compaction_seconds != 0) {
424 result.ttl = std::min(result.ttl, result.periodic_compaction_seconds);
425 }
426 }
427
428 // TTL compactions would work similar to Periodic Compactions in Universal in
429 // most of the cases. So, if ttl is set, execute the periodic compaction
430 // codepath.
431 if (result.compaction_style == kCompactionStyleUniversal && result.ttl != 0) {
432 if (result.periodic_compaction_seconds != 0) {
433 result.periodic_compaction_seconds =
434 std::min(result.ttl, result.periodic_compaction_seconds);
435 } else {
436 result.periodic_compaction_seconds = result.ttl;
437 }
438 }
439
440 if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
441 result.periodic_compaction_seconds = 0;
442 }
443
7c673cae
FG
444 return result;
445}
446
447int SuperVersion::dummy = 0;
448void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
449void* const SuperVersion::kSVObsolete = nullptr;
450
451SuperVersion::~SuperVersion() {
452 for (auto td : to_delete) {
453 delete td;
454 }
455}
456
457SuperVersion* SuperVersion::Ref() {
458 refs.fetch_add(1, std::memory_order_relaxed);
459 return this;
460}
461
462bool SuperVersion::Unref() {
463 // fetch_sub returns the previous value of ref
464 uint32_t previous_refs = refs.fetch_sub(1);
465 assert(previous_refs > 0);
466 return previous_refs == 1;
467}
468
469void SuperVersion::Cleanup() {
470 assert(refs.load(std::memory_order_relaxed) == 0);
1e59de90
TL
471 // Since this SuperVersion object is being deleted,
472 // decrement reference to the immutable MemtableList
473 // this SV object was pointing to.
7c673cae
FG
474 imm->Unref(&to_delete);
475 MemTable* m = mem->Unref();
476 if (m != nullptr) {
477 auto* memory_usage = current->cfd()->imm()->current_memory_usage();
478 assert(*memory_usage >= m->ApproximateMemoryUsage());
479 *memory_usage -= m->ApproximateMemoryUsage();
480 to_delete.push_back(m);
481 }
482 current->Unref();
1e59de90 483 cfd->UnrefAndTryDelete();
7c673cae
FG
484}
485
f67539c2
TL
486void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
487 MemTableListVersion* new_imm, Version* new_current) {
488 cfd = new_cfd;
7c673cae
FG
489 mem = new_mem;
490 imm = new_imm;
491 current = new_current;
f67539c2 492 cfd->Ref();
7c673cae
FG
493 mem->Ref();
494 imm->Ref();
495 current->Ref();
496 refs.store(1, std::memory_order_relaxed);
497}
498
499namespace {
500void SuperVersionUnrefHandle(void* ptr) {
1e59de90
TL
501 // UnrefHandle is called when a thread exits or a ThreadLocalPtr gets
502 // destroyed. When the former happens, the thread shouldn't see kSVInUse.
503 // When the latter happens, only super_version_ holds a reference
504 // to ColumnFamilyData, so no further queries are possible.
7c673cae 505 SuperVersion* sv = static_cast<SuperVersion*>(ptr);
11fdf7f2
TL
506 bool was_last_ref __attribute__((__unused__));
507 was_last_ref = sv->Unref();
508 // Thread-local SuperVersions can't outlive ColumnFamilyData::super_version_.
509 // This is important because we can't do SuperVersion cleanup here.
510 // That would require locking DB mutex, which would deadlock because
511 // SuperVersionUnrefHandle is called with locked ThreadLocalPtr mutex.
512 assert(!was_last_ref);
7c673cae
FG
513}
514} // anonymous namespace
515
20effc67
TL
516std::vector<std::string> ColumnFamilyData::GetDbPaths() const {
517 std::vector<std::string> paths;
518 paths.reserve(ioptions_.cf_paths.size());
519 for (const DbPath& db_path : ioptions_.cf_paths) {
520 paths.emplace_back(db_path.path);
521 }
522 return paths;
523}
524
1e59de90
TL
525const uint32_t ColumnFamilyData::kDummyColumnFamilyDataId =
526 std::numeric_limits<uint32_t>::max();
20effc67 527
7c673cae
FG
528ColumnFamilyData::ColumnFamilyData(
529 uint32_t id, const std::string& name, Version* _dummy_versions,
530 Cache* _table_cache, WriteBufferManager* write_buffer_manager,
531 const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
1e59de90 532 const FileOptions* file_options, ColumnFamilySet* column_family_set,
20effc67 533 BlockCacheTracer* const block_cache_tracer,
1e59de90
TL
534 const std::shared_ptr<IOTracer>& io_tracer, const std::string& db_id,
535 const std::string& db_session_id)
7c673cae
FG
536 : id_(id),
537 name_(name),
538 dummy_versions_(_dummy_versions),
539 current_(nullptr),
540 refs_(0),
11fdf7f2 541 initialized_(false),
7c673cae
FG
542 dropped_(false),
543 internal_comparator_(cf_options.comparator),
544 initial_cf_options_(SanitizeOptions(db_options, cf_options)),
545 ioptions_(db_options, initial_cf_options_),
546 mutable_cf_options_(initial_cf_options_),
11fdf7f2
TL
547 is_delete_range_supported_(
548 cf_options.table_factory->IsDeleteRangeSupported()),
7c673cae
FG
549 write_buffer_manager_(write_buffer_manager),
550 mem_(nullptr),
551 imm_(ioptions_.min_write_buffer_number_to_merge,
f67539c2
TL
552 ioptions_.max_write_buffer_number_to_maintain,
553 ioptions_.max_write_buffer_size_to_maintain),
7c673cae
FG
554 super_version_(nullptr),
555 super_version_number_(0),
556 local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
557 next_(nullptr),
558 prev_(nullptr),
559 log_number_(0),
11fdf7f2 560 flush_reason_(FlushReason::kOthers),
7c673cae 561 column_family_set_(column_family_set),
11fdf7f2
TL
562 queued_for_flush_(false),
563 queued_for_compaction_(false),
7c673cae 564 prev_compaction_needed_bytes_(0),
11fdf7f2 565 allow_2pc_(db_options.allow_2pc),
20effc67 566 last_memtable_id_(0),
1e59de90
TL
567 db_paths_registered_(false),
568 mempurge_used_(false) {
20effc67
TL
569 if (id_ != kDummyColumnFamilyDataId) {
570 // TODO(cc): RegisterDbPaths can be expensive, considering moving it
571 // outside of this constructor which might be called with db mutex held.
572 // TODO(cc): considering using ioptions_.fs, currently some tests rely on
573 // EnvWrapper, that's the main reason why we use env here.
574 Status s = ioptions_.env->RegisterDbPaths(GetDbPaths());
575 if (s.ok()) {
576 db_paths_registered_ = true;
577 } else {
578 ROCKS_LOG_ERROR(
1e59de90 579 ioptions_.logger,
20effc67
TL
580 "Failed to register data paths of column family (id: %d, name: %s)",
581 id_, name_.c_str());
582 }
583 }
7c673cae
FG
584 Ref();
585
586 // Convert user defined table properties collector factories to internal ones.
587 GetIntTblPropCollectorFactory(ioptions_, &int_tbl_prop_collector_factories_);
588
589 // if _dummy_versions is nullptr, then this is a dummy column family.
590 if (_dummy_versions != nullptr) {
591 internal_stats_.reset(
1e59de90 592 new InternalStats(ioptions_.num_levels, ioptions_.clock, this));
f67539c2 593 table_cache_.reset(new TableCache(ioptions_, file_options, _table_cache,
1e59de90
TL
594 block_cache_tracer, io_tracer,
595 db_session_id));
20effc67
TL
596 blob_file_cache_.reset(
597 new BlobFileCache(_table_cache, ioptions(), soptions(), id_,
1e59de90
TL
598 internal_stats_->GetBlobFileReadHist(), io_tracer));
599 blob_source_.reset(new BlobSource(ioptions(), db_id, db_session_id,
600 blob_file_cache_.get()));
20effc67 601
7c673cae
FG
602 if (ioptions_.compaction_style == kCompactionStyleLevel) {
603 compaction_picker_.reset(
604 new LevelCompactionPicker(ioptions_, &internal_comparator_));
605#ifndef ROCKSDB_LITE
606 } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
607 compaction_picker_.reset(
608 new UniversalCompactionPicker(ioptions_, &internal_comparator_));
609 } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
610 compaction_picker_.reset(
611 new FIFOCompactionPicker(ioptions_, &internal_comparator_));
612 } else if (ioptions_.compaction_style == kCompactionStyleNone) {
1e59de90
TL
613 compaction_picker_.reset(
614 new NullCompactionPicker(ioptions_, &internal_comparator_));
615 ROCKS_LOG_WARN(ioptions_.logger,
7c673cae
FG
616 "Column family %s does not use any background compaction. "
617 "Compactions can only be done via CompactFiles\n",
618 GetName().c_str());
619#endif // !ROCKSDB_LITE
620 } else {
1e59de90 621 ROCKS_LOG_ERROR(ioptions_.logger,
7c673cae
FG
622 "Unable to recognize the specified compaction style %d. "
623 "Column family %s will use kCompactionStyleLevel.\n",
624 ioptions_.compaction_style, GetName().c_str());
625 compaction_picker_.reset(
626 new LevelCompactionPicker(ioptions_, &internal_comparator_));
627 }
628
629 if (column_family_set_->NumberOfColumnFamilies() < 10) {
1e59de90 630 ROCKS_LOG_INFO(ioptions_.logger,
7c673cae
FG
631 "--------------- Options for column family [%s]:\n",
632 name.c_str());
1e59de90 633 initial_cf_options_.Dump(ioptions_.logger);
7c673cae 634 } else {
1e59de90 635 ROCKS_LOG_INFO(ioptions_.logger, "\t(skipping printing options)\n");
7c673cae
FG
636 }
637 }
638
639 RecalculateWriteStallConditions(mutable_cf_options_);
1e59de90
TL
640
641 if (cf_options.table_factory->IsInstanceOf(
642 TableFactory::kBlockBasedTableName()) &&
643 cf_options.table_factory->GetOptions<BlockBasedTableOptions>()) {
644 const BlockBasedTableOptions* bbto =
645 cf_options.table_factory->GetOptions<BlockBasedTableOptions>();
646 const auto& options_overrides = bbto->cache_usage_options.options_overrides;
647 const auto file_metadata_charged =
648 options_overrides.at(CacheEntryRole::kFileMetadata).charged;
649 if (bbto->block_cache &&
650 file_metadata_charged == CacheEntryRoleOptions::Decision::kEnabled) {
651 // TODO(hx235): Add a `ConcurrentCacheReservationManager` at DB scope
652 // responsible for reservation of `ObsoleteFileInfo` so that we can keep
653 // this `file_metadata_cache_res_mgr_` nonconcurrent
654 file_metadata_cache_res_mgr_.reset(new ConcurrentCacheReservationManager(
655 std::make_shared<
656 CacheReservationManagerImpl<CacheEntryRole::kFileMetadata>>(
657 bbto->block_cache)));
658 }
659 }
7c673cae
FG
660}
661
662// DB mutex held
663ColumnFamilyData::~ColumnFamilyData() {
664 assert(refs_.load(std::memory_order_relaxed) == 0);
665 // remove from linked list
666 auto prev = prev_;
667 auto next = next_;
668 prev->next_ = next;
669 next->prev_ = prev;
670
671 if (!dropped_ && column_family_set_ != nullptr) {
672 // If it's dropped, it's already removed from column family set
673 // If column_family_set_ == nullptr, this is dummy CFD and not in
674 // ColumnFamilySet
675 column_family_set_->RemoveColumnFamily(this);
676 }
677
678 if (current_ != nullptr) {
679 current_->Unref();
680 }
681
682 // It would be wrong if this ColumnFamilyData is in flush_queue_ or
683 // compaction_queue_ and we destroyed it
11fdf7f2
TL
684 assert(!queued_for_flush_);
685 assert(!queued_for_compaction_);
f67539c2 686 assert(super_version_ == nullptr);
7c673cae
FG
687
688 if (dummy_versions_ != nullptr) {
689 // List must be empty
1e59de90 690 assert(dummy_versions_->Next() == dummy_versions_);
11fdf7f2
TL
691 bool deleted __attribute__((__unused__));
692 deleted = dummy_versions_->Unref();
7c673cae
FG
693 assert(deleted);
694 }
695
696 if (mem_ != nullptr) {
697 delete mem_->Unref();
698 }
699 autovector<MemTable*> to_delete;
700 imm_.current()->Unref(&to_delete);
701 for (MemTable* m : to_delete) {
702 delete m;
703 }
20effc67
TL
704
705 if (db_paths_registered_) {
706 // TODO(cc): considering using ioptions_.fs, currently some tests rely on
707 // EnvWrapper, that's the main reason why we use env here.
708 Status s = ioptions_.env->UnregisterDbPaths(GetDbPaths());
709 if (!s.ok()) {
710 ROCKS_LOG_ERROR(
1e59de90 711 ioptions_.logger,
20effc67
TL
712 "Failed to unregister data paths of column family (id: %d, name: %s)",
713 id_, name_.c_str());
714 }
715 }
7c673cae
FG
716}
717
f67539c2
TL
718bool ColumnFamilyData::UnrefAndTryDelete() {
719 int old_refs = refs_.fetch_sub(1);
720 assert(old_refs > 0);
721
722 if (old_refs == 1) {
723 assert(super_version_ == nullptr);
724 delete this;
725 return true;
726 }
727
728 if (old_refs == 2 && super_version_ != nullptr) {
729 // Only the super_version_ holds me
730 SuperVersion* sv = super_version_;
731 super_version_ = nullptr;
1e59de90
TL
732
733 // Release SuperVersion references kept in ThreadLocalPtr.
f67539c2 734 local_sv_.reset();
f67539c2
TL
735
736 if (sv->Unref()) {
1e59de90
TL
737 // Note: sv will delete this ColumnFamilyData during Cleanup()
738 assert(sv->cfd == this);
f67539c2
TL
739 sv->Cleanup();
740 delete sv;
741 return true;
742 }
743 }
744 return false;
745}
746
7c673cae
FG
747void ColumnFamilyData::SetDropped() {
748 // can't drop default CF
749 assert(id_ != 0);
750 dropped_ = true;
751 write_controller_token_.reset();
752
753 // remove from column_family_set
754 column_family_set_->RemoveColumnFamily(this);
755}
756
757ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
758 return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
759}
760
761uint64_t ColumnFamilyData::OldestLogToKeep() {
762 auto current_log = GetLogNumber();
763
764 if (allow_2pc_) {
1e59de90 765 auto imm_prep_log = imm()->PrecomputeMinLogContainingPrepSection();
7c673cae
FG
766 auto mem_prep_log = mem()->GetMinLogContainingPrepSection();
767
768 if (imm_prep_log > 0 && imm_prep_log < current_log) {
769 current_log = imm_prep_log;
770 }
771
772 if (mem_prep_log > 0 && mem_prep_log < current_log) {
773 current_log = mem_prep_log;
774 }
775 }
776
777 return current_log;
778}
779
780const double kIncSlowdownRatio = 0.8;
781const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
782const double kNearStopSlowdownRatio = 0.6;
783const double kDelayRecoverSlowdownRatio = 1.4;
784
785namespace {
786// If penalize_stop is true, we further reduce slowdown rate.
787std::unique_ptr<WriteControllerToken> SetupDelay(
788 WriteController* write_controller, uint64_t compaction_needed_bytes,
789 uint64_t prev_compaction_need_bytes, bool penalize_stop,
1e59de90 790 bool auto_compactions_disabled) {
7c673cae
FG
791 const uint64_t kMinWriteRate = 16 * 1024u; // Minimum write rate 16KB/s.
792
793 uint64_t max_write_rate = write_controller->max_delayed_write_rate();
794 uint64_t write_rate = write_controller->delayed_write_rate();
795
1e59de90 796 if (auto_compactions_disabled) {
7c673cae
FG
797 // When auto compaction is disabled, always use the value user gave.
798 write_rate = max_write_rate;
799 } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
800 // If user gives rate less than kMinWriteRate, don't adjust it.
801 //
802 // If already delayed, need to adjust based on previous compaction debt.
803 // When there are two or more column families require delay, we always
804 // increase or reduce write rate based on information for one single
805 // column family. It is likely to be OK but we can improve if there is a
806 // problem.
807 // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes
808 // is only available in level-based compaction
809 //
810 // If the compaction debt stays the same as previously, we also further slow
811 // down. It usually means a mem table is full. It's mainly for the case
812 // where both of flush and compaction are much slower than the speed we
813 // insert to mem tables, so we need to actively slow down before we get
814 // feedback signal from compaction and flushes to avoid the full stop
815 // because of hitting the max write buffer number.
816 //
817 // If DB just falled into the stop condition, we need to further reduce
818 // the write rate to avoid the stop condition.
819 if (penalize_stop) {
11fdf7f2 820 // Penalize the near stop or stop condition by more aggressive slowdown.
7c673cae
FG
821 // This is to provide the long term slowdown increase signal.
822 // The penalty is more than the reward of recovering to the normal
823 // condition.
824 write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
825 kNearStopSlowdownRatio);
826 if (write_rate < kMinWriteRate) {
827 write_rate = kMinWriteRate;
828 }
829 } else if (prev_compaction_need_bytes > 0 &&
830 prev_compaction_need_bytes <= compaction_needed_bytes) {
831 write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
832 kIncSlowdownRatio);
833 if (write_rate < kMinWriteRate) {
834 write_rate = kMinWriteRate;
835 }
836 } else if (prev_compaction_need_bytes > compaction_needed_bytes) {
837 // We are speeding up by ratio of kSlowdownRatio when we have paid
838 // compaction debt. But we'll never speed up to faster than the write rate
839 // given by users.
840 write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
841 kDecSlowdownRatio);
842 if (write_rate > max_write_rate) {
843 write_rate = max_write_rate;
844 }
845 }
846 }
847 return write_controller->GetDelayToken(write_rate);
848}
849
850int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
851 int level0_slowdown_writes_trigger) {
852 // SanitizeOptions() ensures it.
853 assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);
854
855 if (level0_file_num_compaction_trigger < 0) {
856 return std::numeric_limits<int>::max();
857 }
858
859 const int64_t twice_level0_trigger =
860 static_cast<int64_t>(level0_file_num_compaction_trigger) * 2;
861
862 const int64_t one_fourth_trigger_slowdown =
863 static_cast<int64_t>(level0_file_num_compaction_trigger) +
864 ((level0_slowdown_writes_trigger - level0_file_num_compaction_trigger) /
865 4);
866
867 assert(twice_level0_trigger >= 0);
868 assert(one_fourth_trigger_slowdown >= 0);
869
870 // 1/4 of the way between L0 compaction trigger threshold and slowdown
871 // condition.
872 // Or twice as compaction trigger, if it is smaller.
873 int64_t res = std::min(twice_level0_trigger, one_fourth_trigger_slowdown);
1e59de90
TL
874 if (res >= std::numeric_limits<int32_t>::max()) {
875 return std::numeric_limits<int32_t>::max();
7c673cae
FG
876 } else {
877 // res fits in int
878 return static_cast<int>(res);
879 }
880}
1e59de90 881} // anonymous namespace
7c673cae 882
11fdf7f2
TL
883std::pair<WriteStallCondition, ColumnFamilyData::WriteStallCause>
884ColumnFamilyData::GetWriteStallConditionAndCause(
885 int num_unflushed_memtables, int num_l0_files,
886 uint64_t num_compaction_needed_bytes,
1e59de90
TL
887 const MutableCFOptions& mutable_cf_options,
888 const ImmutableCFOptions& immutable_cf_options) {
11fdf7f2
TL
889 if (num_unflushed_memtables >= mutable_cf_options.max_write_buffer_number) {
890 return {WriteStallCondition::kStopped, WriteStallCause::kMemtableLimit};
891 } else if (!mutable_cf_options.disable_auto_compactions &&
892 num_l0_files >= mutable_cf_options.level0_stop_writes_trigger) {
893 return {WriteStallCondition::kStopped, WriteStallCause::kL0FileCountLimit};
894 } else if (!mutable_cf_options.disable_auto_compactions &&
895 mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
896 num_compaction_needed_bytes >=
897 mutable_cf_options.hard_pending_compaction_bytes_limit) {
898 return {WriteStallCondition::kStopped,
899 WriteStallCause::kPendingCompactionBytes};
900 } else if (mutable_cf_options.max_write_buffer_number > 3 &&
901 num_unflushed_memtables >=
1e59de90
TL
902 mutable_cf_options.max_write_buffer_number - 1 &&
903 num_unflushed_memtables - 1 >=
904 immutable_cf_options.min_write_buffer_number_to_merge) {
11fdf7f2
TL
905 return {WriteStallCondition::kDelayed, WriteStallCause::kMemtableLimit};
906 } else if (!mutable_cf_options.disable_auto_compactions &&
907 mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
908 num_l0_files >=
909 mutable_cf_options.level0_slowdown_writes_trigger) {
910 return {WriteStallCondition::kDelayed, WriteStallCause::kL0FileCountLimit};
911 } else if (!mutable_cf_options.disable_auto_compactions &&
912 mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
913 num_compaction_needed_bytes >=
914 mutable_cf_options.soft_pending_compaction_bytes_limit) {
915 return {WriteStallCondition::kDelayed,
916 WriteStallCause::kPendingCompactionBytes};
917 }
918 return {WriteStallCondition::kNormal, WriteStallCause::kNone};
919}
920
921WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
1e59de90 922 const MutableCFOptions& mutable_cf_options) {
11fdf7f2 923 auto write_stall_condition = WriteStallCondition::kNormal;
7c673cae
FG
924 if (current_ != nullptr) {
925 auto* vstorage = current_->storage_info();
926 auto write_controller = column_family_set_->write_controller_;
927 uint64_t compaction_needed_bytes =
928 vstorage->estimated_compaction_needed_bytes();
929
11fdf7f2
TL
930 auto write_stall_condition_and_cause = GetWriteStallConditionAndCause(
931 imm()->NumNotFlushed(), vstorage->l0_delay_trigger_count(),
1e59de90
TL
932 vstorage->estimated_compaction_needed_bytes(), mutable_cf_options,
933 *ioptions());
11fdf7f2
TL
934 write_stall_condition = write_stall_condition_and_cause.first;
935 auto write_stall_cause = write_stall_condition_and_cause.second;
936
7c673cae
FG
937 bool was_stopped = write_controller->IsStopped();
938 bool needed_delay = write_controller->NeedsDelay();
939
11fdf7f2
TL
940 if (write_stall_condition == WriteStallCondition::kStopped &&
941 write_stall_cause == WriteStallCause::kMemtableLimit) {
7c673cae 942 write_controller_token_ = write_controller->GetStopToken();
11fdf7f2 943 internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1);
7c673cae 944 ROCKS_LOG_WARN(
1e59de90 945 ioptions_.logger,
7c673cae
FG
946 "[%s] Stopping writes because we have %d immutable memtables "
947 "(waiting for flush), max_write_buffer_number is set to %d",
948 name_.c_str(), imm()->NumNotFlushed(),
949 mutable_cf_options.max_write_buffer_number);
11fdf7f2
TL
950 } else if (write_stall_condition == WriteStallCondition::kStopped &&
951 write_stall_cause == WriteStallCause::kL0FileCountLimit) {
7c673cae 952 write_controller_token_ = write_controller->GetStopToken();
11fdf7f2 953 internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1);
7c673cae
FG
954 if (compaction_picker_->IsLevel0CompactionInProgress()) {
955 internal_stats_->AddCFStats(
11fdf7f2 956 InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_STOPS, 1);
7c673cae 957 }
1e59de90 958 ROCKS_LOG_WARN(ioptions_.logger,
7c673cae
FG
959 "[%s] Stopping writes because we have %d level-0 files",
960 name_.c_str(), vstorage->l0_delay_trigger_count());
11fdf7f2
TL
961 } else if (write_stall_condition == WriteStallCondition::kStopped &&
962 write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
7c673cae
FG
963 write_controller_token_ = write_controller->GetStopToken();
964 internal_stats_->AddCFStats(
11fdf7f2 965 InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1);
7c673cae 966 ROCKS_LOG_WARN(
1e59de90 967 ioptions_.logger,
7c673cae
FG
968 "[%s] Stopping writes because of estimated pending compaction "
969 "bytes %" PRIu64,
970 name_.c_str(), compaction_needed_bytes);
11fdf7f2
TL
971 } else if (write_stall_condition == WriteStallCondition::kDelayed &&
972 write_stall_cause == WriteStallCause::kMemtableLimit) {
7c673cae
FG
973 write_controller_token_ =
974 SetupDelay(write_controller, compaction_needed_bytes,
975 prev_compaction_needed_bytes_, was_stopped,
976 mutable_cf_options.disable_auto_compactions);
11fdf7f2 977 internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_SLOWDOWNS, 1);
7c673cae 978 ROCKS_LOG_WARN(
1e59de90 979 ioptions_.logger,
7c673cae
FG
980 "[%s] Stalling writes because we have %d immutable memtables "
981 "(waiting for flush), max_write_buffer_number is set to %d "
982 "rate %" PRIu64,
983 name_.c_str(), imm()->NumNotFlushed(),
984 mutable_cf_options.max_write_buffer_number,
985 write_controller->delayed_write_rate());
11fdf7f2
TL
986 } else if (write_stall_condition == WriteStallCondition::kDelayed &&
987 write_stall_cause == WriteStallCause::kL0FileCountLimit) {
7c673cae
FG
988 // L0 is the last two files from stopping.
989 bool near_stop = vstorage->l0_delay_trigger_count() >=
990 mutable_cf_options.level0_stop_writes_trigger - 2;
991 write_controller_token_ =
992 SetupDelay(write_controller, compaction_needed_bytes,
993 prev_compaction_needed_bytes_, was_stopped || near_stop,
994 mutable_cf_options.disable_auto_compactions);
11fdf7f2
TL
995 internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_SLOWDOWNS,
996 1);
7c673cae
FG
997 if (compaction_picker_->IsLevel0CompactionInProgress()) {
998 internal_stats_->AddCFStats(
11fdf7f2 999 InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1);
7c673cae 1000 }
1e59de90 1001 ROCKS_LOG_WARN(ioptions_.logger,
7c673cae
FG
1002 "[%s] Stalling writes because we have %d level-0 files "
1003 "rate %" PRIu64,
1004 name_.c_str(), vstorage->l0_delay_trigger_count(),
1005 write_controller->delayed_write_rate());
11fdf7f2
TL
1006 } else if (write_stall_condition == WriteStallCondition::kDelayed &&
1007 write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
7c673cae
FG
1008 // If the distance to hard limit is less than 1/4 of the gap between soft
1009 // and
1010 // hard bytes limit, we think it is near stop and speed up the slowdown.
1011 bool near_stop =
1012 mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
1013 (compaction_needed_bytes -
1014 mutable_cf_options.soft_pending_compaction_bytes_limit) >
1e59de90
TL
1015 3 *
1016 (mutable_cf_options.hard_pending_compaction_bytes_limit -
7c673cae
FG
1017 mutable_cf_options.soft_pending_compaction_bytes_limit) /
1018 4;
1019
1020 write_controller_token_ =
1021 SetupDelay(write_controller, compaction_needed_bytes,
1022 prev_compaction_needed_bytes_, was_stopped || near_stop,
1023 mutable_cf_options.disable_auto_compactions);
1024 internal_stats_->AddCFStats(
11fdf7f2 1025 InternalStats::PENDING_COMPACTION_BYTES_LIMIT_SLOWDOWNS, 1);
7c673cae 1026 ROCKS_LOG_WARN(
1e59de90 1027 ioptions_.logger,
7c673cae
FG
1028 "[%s] Stalling writes because of estimated pending compaction "
1029 "bytes %" PRIu64 " rate %" PRIu64,
1030 name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
1031 write_controller->delayed_write_rate());
1032 } else {
11fdf7f2 1033 assert(write_stall_condition == WriteStallCondition::kNormal);
7c673cae
FG
1034 if (vstorage->l0_delay_trigger_count() >=
1035 GetL0ThresholdSpeedupCompaction(
1036 mutable_cf_options.level0_file_num_compaction_trigger,
1037 mutable_cf_options.level0_slowdown_writes_trigger)) {
1038 write_controller_token_ =
1039 write_controller->GetCompactionPressureToken();
11fdf7f2 1040 ROCKS_LOG_INFO(
1e59de90 1041 ioptions_.logger,
7c673cae
FG
1042 "[%s] Increasing compaction threads because we have %d level-0 "
1043 "files ",
1044 name_.c_str(), vstorage->l0_delay_trigger_count());
1045 } else if (vstorage->estimated_compaction_needed_bytes() >=
1046 mutable_cf_options.soft_pending_compaction_bytes_limit / 4) {
1047 // Increase compaction threads if bytes needed for compaction exceeds
1048 // 1/4 of threshold for slowing down.
1049 // If soft pending compaction byte limit is not set, always speed up
1050 // compaction.
1051 write_controller_token_ =
1052 write_controller->GetCompactionPressureToken();
1053 if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
11fdf7f2 1054 ROCKS_LOG_INFO(
1e59de90 1055 ioptions_.logger,
7c673cae
FG
1056 "[%s] Increasing compaction threads because of estimated pending "
1057 "compaction "
1058 "bytes %" PRIu64,
1059 name_.c_str(), vstorage->estimated_compaction_needed_bytes());
1060 }
1061 } else {
1062 write_controller_token_.reset();
1063 }
1064 // If the DB recovers from delay conditions, we reward with reducing
1065 // double the slowdown ratio. This is to balance the long term slowdown
1066 // increase signal.
1067 if (needed_delay) {
1068 uint64_t write_rate = write_controller->delayed_write_rate();
1069 write_controller->set_delayed_write_rate(static_cast<uint64_t>(
1070 static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio));
11fdf7f2
TL
1071 // Set the low pri limit to be 1/4 the delayed write rate.
1072 // Note we don't reset this value even after delay condition is relased.
1073 // Low-pri rate will continue to apply if there is a compaction
1074 // pressure.
1075 write_controller->low_pri_rate_limiter()->SetBytesPerSecond(write_rate /
1076 4);
7c673cae
FG
1077 }
1078 }
1079 prev_compaction_needed_bytes_ = compaction_needed_bytes;
1080 }
11fdf7f2 1081 return write_stall_condition;
7c673cae
FG
1082}
1083
f67539c2
TL
1084const FileOptions* ColumnFamilyData::soptions() const {
1085 return &(column_family_set_->file_options_);
7c673cae
FG
1086}
1087
1088void ColumnFamilyData::SetCurrent(Version* current_version) {
1089 current_ = current_version;
1090}
1091
1092uint64_t ColumnFamilyData::GetNumLiveVersions() const {
1093 return VersionSet::GetNumLiveVersions(dummy_versions_);
1094}
1095
1096uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
1097 return VersionSet::GetTotalSstFilesSize(dummy_versions_);
1098}
1099
1e59de90
TL
1100uint64_t ColumnFamilyData::GetTotalBlobFileSize() const {
1101 return VersionSet::GetTotalBlobFileSize(dummy_versions_);
1102}
1103
11fdf7f2
TL
1104uint64_t ColumnFamilyData::GetLiveSstFilesSize() const {
1105 return current_->GetSstFilesSize();
1106}
1107
7c673cae
FG
1108MemTable* ColumnFamilyData::ConstructNewMemtable(
1109 const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
7c673cae 1110 return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
11fdf7f2 1111 write_buffer_manager_, earliest_seq, id_);
7c673cae
FG
1112}
1113
1114void ColumnFamilyData::CreateNewMemtable(
1115 const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
1116 if (mem_ != nullptr) {
1117 delete mem_->Unref();
1118 }
1119 SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
1120 mem_->Ref();
1121}
1122
1123bool ColumnFamilyData::NeedsCompaction() const {
20effc67
TL
1124 return !mutable_cf_options_.disable_auto_compactions &&
1125 compaction_picker_->NeedsCompaction(current_->storage_info());
7c673cae
FG
1126}
1127
1128Compaction* ColumnFamilyData::PickCompaction(
20effc67
TL
1129 const MutableCFOptions& mutable_options,
1130 const MutableDBOptions& mutable_db_options, LogBuffer* log_buffer) {
f67539c2
TL
1131 SequenceNumber earliest_mem_seqno =
1132 std::min(mem_->GetEarliestSequenceNumber(),
1133 imm_.current()->GetEarliestSequenceNumber(false));
7c673cae 1134 auto* result = compaction_picker_->PickCompaction(
20effc67
TL
1135 GetName(), mutable_options, mutable_db_options, current_->storage_info(),
1136 log_buffer, earliest_mem_seqno);
7c673cae
FG
1137 if (result != nullptr) {
1138 result->SetInputVersion(current_);
1139 }
1140 return result;
1141}
1142
1143bool ColumnFamilyData::RangeOverlapWithCompaction(
1144 const Slice& smallest_user_key, const Slice& largest_user_key,
1145 int level) const {
1146 return compaction_picker_->RangeOverlapWithCompaction(
1147 smallest_user_key, largest_user_key, level);
1148}
1149
11fdf7f2
TL
1150Status ColumnFamilyData::RangesOverlapWithMemtables(
1151 const autovector<Range>& ranges, SuperVersion* super_version,
20effc67 1152 bool allow_data_in_errors, bool* overlap) {
11fdf7f2
TL
1153 assert(overlap != nullptr);
1154 *overlap = false;
1155 // Create an InternalIterator over all unflushed memtables
1156 Arena arena;
1157 ReadOptions read_opts;
1158 read_opts.total_order_seek = true;
1159 MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena);
1160 merge_iter_builder.AddIterator(
1161 super_version->mem->NewIterator(read_opts, &arena));
1e59de90
TL
1162 super_version->imm->AddIterators(read_opts, &merge_iter_builder,
1163 false /* add_range_tombstone_iter */);
11fdf7f2
TL
1164 ScopedArenaIterator memtable_iter(merge_iter_builder.Finish());
1165
494da23a
TL
1166 auto read_seq = super_version->current->version_set()->LastSequence();
1167 ReadRangeDelAggregator range_del_agg(&internal_comparator_, read_seq);
1e59de90
TL
1168 auto* active_range_del_iter = super_version->mem->NewRangeTombstoneIterator(
1169 read_opts, read_seq, false /* immutable_memtable */);
494da23a
TL
1170 range_del_agg.AddTombstones(
1171 std::unique_ptr<FragmentedRangeTombstoneIterator>(active_range_del_iter));
11fdf7f2 1172 Status status;
20effc67
TL
1173 status = super_version->imm->AddRangeTombstoneIterators(
1174 read_opts, nullptr /* arena */, &range_del_agg);
1175 // AddRangeTombstoneIterators always return Status::OK.
1176 assert(status.ok());
1177
11fdf7f2
TL
1178 for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) {
1179 auto* vstorage = super_version->current->storage_info();
1180 auto* ucmp = vstorage->InternalComparator()->user_comparator();
1181 InternalKey range_start(ranges[i].start, kMaxSequenceNumber,
1182 kValueTypeForSeek);
1183 memtable_iter->Seek(range_start.Encode());
1184 status = memtable_iter->status();
1185 ParsedInternalKey seek_result;
20effc67
TL
1186
1187 if (status.ok() && memtable_iter->Valid()) {
1188 status = ParseInternalKey(memtable_iter->key(), &seek_result,
1189 allow_data_in_errors);
11fdf7f2 1190 }
20effc67 1191
11fdf7f2
TL
1192 if (status.ok()) {
1193 if (memtable_iter->Valid() &&
1194 ucmp->Compare(seek_result.user_key, ranges[i].limit) <= 0) {
1195 *overlap = true;
1196 } else if (range_del_agg.IsRangeOverlapped(ranges[i].start,
1197 ranges[i].limit)) {
1198 *overlap = true;
1199 }
1200 }
1201 }
1202 return status;
1203}
1204
7c673cae
FG
1205const int ColumnFamilyData::kCompactAllLevels = -1;
1206const int ColumnFamilyData::kCompactToBaseLevel = -2;
1207
1208Compaction* ColumnFamilyData::CompactRange(
20effc67
TL
1209 const MutableCFOptions& mutable_cf_options,
1210 const MutableDBOptions& mutable_db_options, int input_level,
f67539c2 1211 int output_level, const CompactRangeOptions& compact_range_options,
11fdf7f2 1212 const InternalKey* begin, const InternalKey* end,
f67539c2 1213 InternalKey** compaction_end, bool* conflict,
1e59de90 1214 uint64_t max_file_num_to_ignore, const std::string& trim_ts) {
7c673cae 1215 auto* result = compaction_picker_->CompactRange(
20effc67
TL
1216 GetName(), mutable_cf_options, mutable_db_options,
1217 current_->storage_info(), input_level, output_level,
1218 compact_range_options, begin, end, compaction_end, conflict,
1e59de90 1219 max_file_num_to_ignore, trim_ts);
7c673cae
FG
1220 if (result != nullptr) {
1221 result->SetInputVersion(current_);
1222 }
1223 return result;
1224}
1225
f67539c2
TL
1226SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(DBImpl* db) {
1227 SuperVersion* sv = GetThreadLocalSuperVersion(db);
7c673cae
FG
1228 sv->Ref();
1229 if (!ReturnThreadLocalSuperVersion(sv)) {
11fdf7f2
TL
1230 // This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion()
1231 // when the thread-local pointer was populated. So, the Ref() earlier in
1232 // this function still prevents the returned SuperVersion* from being
1233 // deleted out from under the caller.
7c673cae
FG
1234 sv->Unref();
1235 }
1236 return sv;
1237}
1238
f67539c2 1239SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
7c673cae
FG
1240 // The SuperVersion is cached in thread local storage to avoid acquiring
1241 // mutex when SuperVersion does not change since the last use. When a new
1242 // SuperVersion is installed, the compaction or flush thread cleans up
1243 // cached SuperVersion in all existing thread local storage. To avoid
1244 // acquiring mutex for this operation, we use atomic Swap() on the thread
1245 // local pointer to guarantee exclusive access. If the thread local pointer
1246 // is being used while a new SuperVersion is installed, the cached
1247 // SuperVersion can become stale. In that case, the background thread would
1248 // have swapped in kSVObsolete. We re-check the value at when returning
1249 // SuperVersion back to thread local, with an atomic compare and swap.
1250 // The superversion will need to be released if detected to be stale.
1251 void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
1252 // Invariant:
1253 // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
1254 // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
1255 // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
1256 // (if no Scrape happens).
1257 assert(ptr != SuperVersion::kSVInUse);
11fdf7f2 1258 SuperVersion* sv = static_cast<SuperVersion*>(ptr);
7c673cae
FG
1259 if (sv == SuperVersion::kSVObsolete ||
1260 sv->version_number != super_version_number_.load()) {
1e59de90 1261 RecordTick(ioptions_.stats, NUMBER_SUPERVERSION_ACQUIRES);
7c673cae
FG
1262 SuperVersion* sv_to_delete = nullptr;
1263
1264 if (sv && sv->Unref()) {
1e59de90 1265 RecordTick(ioptions_.stats, NUMBER_SUPERVERSION_CLEANUPS);
f67539c2 1266 db->mutex()->Lock();
7c673cae
FG
1267 // NOTE: underlying resources held by superversion (sst files) might
1268 // not be released until the next background job.
1269 sv->Cleanup();
f67539c2
TL
1270 if (db->immutable_db_options().avoid_unnecessary_blocking_io) {
1271 db->AddSuperVersionsToFreeQueue(sv);
1272 db->SchedulePurge();
1273 } else {
1274 sv_to_delete = sv;
1275 }
7c673cae 1276 } else {
f67539c2 1277 db->mutex()->Lock();
7c673cae
FG
1278 }
1279 sv = super_version_->Ref();
f67539c2 1280 db->mutex()->Unlock();
7c673cae
FG
1281
1282 delete sv_to_delete;
1283 }
1284 assert(sv != nullptr);
1285 return sv;
1286}
1287
1288bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
1289 assert(sv != nullptr);
1290 // Put the SuperVersion back
1291 void* expected = SuperVersion::kSVInUse;
1292 if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
1293 // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
1294 // storage has not been altered and no Scrape has happened. The
1295 // SuperVersion is still current.
1296 return true;
1297 } else {
1298 // ThreadLocal scrape happened in the process of this GetImpl call (after
1299 // thread local Swap() at the beginning and before CompareAndSwap()).
1300 // This means the SuperVersion it holds is obsolete.
1301 assert(expected == SuperVersion::kSVObsolete);
1302 }
1303 return false;
1304}
1305
1e59de90
TL
1306void ColumnFamilyData::InstallSuperVersion(SuperVersionContext* sv_context,
1307 InstrumentedMutex* db_mutex) {
7c673cae 1308 db_mutex->AssertHeld();
1e59de90 1309 return InstallSuperVersion(sv_context, mutable_cf_options_);
7c673cae
FG
1310}
1311
11fdf7f2 1312void ColumnFamilyData::InstallSuperVersion(
1e59de90 1313 SuperVersionContext* sv_context,
7c673cae 1314 const MutableCFOptions& mutable_cf_options) {
11fdf7f2 1315 SuperVersion* new_superversion = sv_context->new_superversion.release();
7c673cae 1316 new_superversion->mutable_cf_options = mutable_cf_options;
f67539c2 1317 new_superversion->Init(this, mem_, imm_.current(), current_);
7c673cae
FG
1318 SuperVersion* old_superversion = super_version_;
1319 super_version_ = new_superversion;
1320 ++super_version_number_;
1321 super_version_->version_number = super_version_number_;
1e59de90
TL
1322 if (old_superversion == nullptr || old_superversion->current != current() ||
1323 old_superversion->mem != mem_ ||
1324 old_superversion->imm != imm_.current()) {
1325 // Should not recalculate slow down condition if nothing has changed, since
1326 // currently RecalculateWriteStallConditions() treats it as further slowing
1327 // down is needed.
1328 super_version_->write_stall_condition =
1329 RecalculateWriteStallConditions(mutable_cf_options);
1330 } else {
1331 super_version_->write_stall_condition =
1332 old_superversion->write_stall_condition;
1333 }
11fdf7f2
TL
1334 if (old_superversion != nullptr) {
1335 // Reset SuperVersions cached in thread local storage.
1336 // This should be done before old_superversion->Unref(). That's to ensure
1337 // that local_sv_ never holds the last reference to SuperVersion, since
1338 // it has no means to safely do SuperVersion cleanup.
1339 ResetThreadLocalSuperVersions();
1340
1341 if (old_superversion->mutable_cf_options.write_buffer_size !=
1342 mutable_cf_options.write_buffer_size) {
1343 mem_->UpdateWriteBufferSize(mutable_cf_options.write_buffer_size);
1344 }
1345 if (old_superversion->write_stall_condition !=
1346 new_superversion->write_stall_condition) {
1347 sv_context->PushWriteStallNotification(
1348 old_superversion->write_stall_condition,
1349 new_superversion->write_stall_condition, GetName(), ioptions());
1350 }
1351 if (old_superversion->Unref()) {
1352 old_superversion->Cleanup();
1353 sv_context->superversions_to_free.push_back(old_superversion);
1354 }
7c673cae 1355 }
7c673cae
FG
1356}
1357
1358void ColumnFamilyData::ResetThreadLocalSuperVersions() {
1359 autovector<void*> sv_ptrs;
1360 local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
1361 for (auto ptr : sv_ptrs) {
1362 assert(ptr);
1363 if (ptr == SuperVersion::kSVInUse) {
1364 continue;
1365 }
1366 auto sv = static_cast<SuperVersion*>(ptr);
11fdf7f2
TL
1367 bool was_last_ref __attribute__((__unused__));
1368 was_last_ref = sv->Unref();
1369 // sv couldn't have been the last reference because
1370 // ResetThreadLocalSuperVersions() is called before
1371 // unref'ing super_version_.
1372 assert(!was_last_ref);
7c673cae
FG
1373 }
1374}
1375
f67539c2
TL
1376Status ColumnFamilyData::ValidateOptions(
1377 const DBOptions& db_options, const ColumnFamilyOptions& cf_options) {
1378 Status s;
1379 s = CheckCompressionSupported(cf_options);
1380 if (s.ok() && db_options.allow_concurrent_memtable_write) {
1381 s = CheckConcurrentWritesSupported(cf_options);
1382 }
1383 if (s.ok() && db_options.unordered_write &&
1384 cf_options.max_successive_merges != 0) {
1385 s = Status::InvalidArgument(
1386 "max_successive_merges > 0 is incompatible with unordered_write");
1387 }
1388 if (s.ok()) {
1389 s = CheckCFPathsSupported(db_options, cf_options);
1390 }
1391 if (!s.ok()) {
1392 return s;
1393 }
1394
1395 if (cf_options.ttl > 0 && cf_options.ttl != kDefaultTtl) {
20effc67
TL
1396 if (!cf_options.table_factory->IsInstanceOf(
1397 TableFactory::kBlockBasedTableName())) {
f67539c2
TL
1398 return Status::NotSupported(
1399 "TTL is only supported in Block-Based Table format. ");
1400 }
1401 }
1402
1403 if (cf_options.periodic_compaction_seconds > 0 &&
1404 cf_options.periodic_compaction_seconds != kDefaultPeriodicCompSecs) {
20effc67
TL
1405 if (!cf_options.table_factory->IsInstanceOf(
1406 TableFactory::kBlockBasedTableName())) {
f67539c2
TL
1407 return Status::NotSupported(
1408 "Periodic Compaction is only supported in "
1409 "Block-Based Table format. ");
1410 }
1411 }
1e59de90
TL
1412
1413 if (cf_options.enable_blob_garbage_collection) {
1414 if (cf_options.blob_garbage_collection_age_cutoff < 0.0 ||
1415 cf_options.blob_garbage_collection_age_cutoff > 1.0) {
1416 return Status::InvalidArgument(
1417 "The age cutoff for blob garbage collection should be in the range "
1418 "[0.0, 1.0].");
1419 }
1420 if (cf_options.blob_garbage_collection_force_threshold < 0.0 ||
1421 cf_options.blob_garbage_collection_force_threshold > 1.0) {
1422 return Status::InvalidArgument(
1423 "The garbage ratio threshold for forcing blob garbage collection "
1424 "should be in the range [0.0, 1.0].");
1425 }
1426 }
1427
1428 if (cf_options.compaction_style == kCompactionStyleFIFO &&
1429 db_options.max_open_files != -1 && cf_options.ttl > 0) {
1430 return Status::NotSupported(
1431 "FIFO compaction only supported with max_open_files = -1.");
1432 }
1433
1434 std::vector<uint32_t> supported{0, 1, 2, 4, 8};
1435 if (std::find(supported.begin(), supported.end(),
1436 cf_options.memtable_protection_bytes_per_key) ==
1437 supported.end()) {
1438 return Status::NotSupported(
1439 "Memtable per key-value checksum protection only supports 0, 1, 2, 4 "
1440 "or 8 bytes per key.");
1441 }
f67539c2
TL
1442 return s;
1443}
1444
7c673cae
FG
1445#ifndef ROCKSDB_LITE
1446Status ColumnFamilyData::SetOptions(
1e59de90 1447 const DBOptions& db_opts,
f67539c2 1448 const std::unordered_map<std::string, std::string>& options_map) {
1e59de90
TL
1449 ColumnFamilyOptions cf_opts =
1450 BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
1451 ConfigOptions config_opts;
1452 config_opts.mutable_options_only = true;
1453 Status s = GetColumnFamilyOptionsFromMap(config_opts, cf_opts, options_map,
1454 &cf_opts);
f67539c2 1455 if (s.ok()) {
1e59de90 1456 s = ValidateOptions(db_opts, cf_opts);
f67539c2 1457 }
7c673cae 1458 if (s.ok()) {
1e59de90 1459 mutable_cf_options_ = MutableCFOptions(cf_opts);
7c673cae
FG
1460 mutable_cf_options_.RefreshDerivedOptions(ioptions_);
1461 }
1462 return s;
1463}
1464#endif // ROCKSDB_LITE
1465
11fdf7f2
TL
1466// REQUIRES: DB mutex held
1467Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) {
1468 if (initial_cf_options_.compaction_style != kCompactionStyleLevel) {
1469 return Env::WLTH_NOT_SET;
1470 }
1471 if (level == 0) {
1472 return Env::WLTH_MEDIUM;
1473 }
1474 int base_level = current_->storage_info()->base_level();
1475
1476 // L1: medium, L2: long, ...
1477 if (level - base_level >= 2) {
1478 return Env::WLTH_EXTREME;
f67539c2
TL
1479 } else if (level < base_level) {
1480 // There is no restriction which prevents level passed in to be smaller
1481 // than base_level.
1482 return Env::WLTH_MEDIUM;
11fdf7f2 1483 }
1e59de90
TL
1484 return static_cast<Env::WriteLifeTimeHint>(
1485 level - base_level + static_cast<int>(Env::WLTH_MEDIUM));
11fdf7f2
TL
1486}
1487
f67539c2 1488Status ColumnFamilyData::AddDirectories(
20effc67 1489 std::map<std::string, std::shared_ptr<FSDirectory>>* created_dirs) {
11fdf7f2 1490 Status s;
f67539c2 1491 assert(created_dirs != nullptr);
11fdf7f2
TL
1492 assert(data_dirs_.empty());
1493 for (auto& p : ioptions_.cf_paths) {
f67539c2
TL
1494 auto existing_dir = created_dirs->find(p.path);
1495
1496 if (existing_dir == created_dirs->end()) {
20effc67 1497 std::unique_ptr<FSDirectory> path_directory;
1e59de90
TL
1498 s = DBImpl::CreateAndNewDirectory(ioptions_.fs.get(), p.path,
1499 &path_directory);
f67539c2
TL
1500 if (!s.ok()) {
1501 return s;
1502 }
1503 assert(path_directory != nullptr);
1504 data_dirs_.emplace_back(path_directory.release());
1505 (*created_dirs)[p.path] = data_dirs_.back();
1506 } else {
1507 data_dirs_.emplace_back(existing_dir->second);
11fdf7f2 1508 }
11fdf7f2
TL
1509 }
1510 assert(data_dirs_.size() == ioptions_.cf_paths.size());
1511 return s;
1512}
1513
20effc67 1514FSDirectory* ColumnFamilyData::GetDataDir(size_t path_id) const {
11fdf7f2
TL
1515 if (data_dirs_.empty()) {
1516 return nullptr;
1517 }
1518
1519 assert(path_id < data_dirs_.size());
1520 return data_dirs_[path_id].get();
1521}
1522
7c673cae
FG
1523ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
1524 const ImmutableDBOptions* db_options,
f67539c2 1525 const FileOptions& file_options,
7c673cae 1526 Cache* table_cache,
20effc67
TL
1527 WriteBufferManager* _write_buffer_manager,
1528 WriteController* _write_controller,
1529 BlockCacheTracer* const block_cache_tracer,
1e59de90
TL
1530 const std::shared_ptr<IOTracer>& io_tracer,
1531 const std::string& db_id,
1532 const std::string& db_session_id)
7c673cae 1533 : max_column_family_(0),
1e59de90 1534 file_options_(file_options),
f67539c2 1535 dummy_cfd_(new ColumnFamilyData(
20effc67 1536 ColumnFamilyData::kDummyColumnFamilyDataId, "", nullptr, nullptr,
1e59de90
TL
1537 nullptr, ColumnFamilyOptions(), *db_options, &file_options_, nullptr,
1538 block_cache_tracer, io_tracer, db_id, db_session_id)),
7c673cae
FG
1539 default_cfd_cache_(nullptr),
1540 db_name_(dbname),
1541 db_options_(db_options),
7c673cae 1542 table_cache_(table_cache),
20effc67
TL
1543 write_buffer_manager_(_write_buffer_manager),
1544 write_controller_(_write_controller),
1545 block_cache_tracer_(block_cache_tracer),
1e59de90
TL
1546 io_tracer_(io_tracer),
1547 db_id_(db_id),
1548 db_session_id_(db_session_id) {
7c673cae
FG
1549 // initialize linked list
1550 dummy_cfd_->prev_ = dummy_cfd_;
1551 dummy_cfd_->next_ = dummy_cfd_;
1552}
1553
1554ColumnFamilySet::~ColumnFamilySet() {
1555 while (column_family_data_.size() > 0) {
1556 // cfd destructor will delete itself from column_family_data_
1557 auto cfd = column_family_data_.begin()->second;
11fdf7f2 1558 bool last_ref __attribute__((__unused__));
f67539c2 1559 last_ref = cfd->UnrefAndTryDelete();
11fdf7f2 1560 assert(last_ref);
7c673cae 1561 }
11fdf7f2 1562 bool dummy_last_ref __attribute__((__unused__));
f67539c2 1563 dummy_last_ref = dummy_cfd_->UnrefAndTryDelete();
11fdf7f2 1564 assert(dummy_last_ref);
7c673cae
FG
1565}
1566
1567ColumnFamilyData* ColumnFamilySet::GetDefault() const {
1568 assert(default_cfd_cache_ != nullptr);
1569 return default_cfd_cache_;
1570}
1571
1572ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
1573 auto cfd_iter = column_family_data_.find(id);
1574 if (cfd_iter != column_family_data_.end()) {
1575 return cfd_iter->second;
1576 } else {
1577 return nullptr;
1578 }
1579}
1580
1e59de90
TL
1581ColumnFamilyData* ColumnFamilySet::GetColumnFamily(
1582 const std::string& name) const {
7c673cae
FG
1583 auto cfd_iter = column_families_.find(name);
1584 if (cfd_iter != column_families_.end()) {
1585 auto cfd = GetColumnFamily(cfd_iter->second);
1586 assert(cfd != nullptr);
1587 return cfd;
1588 } else {
1589 return nullptr;
1590 }
1591}
1592
1593uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
1594 return ++max_column_family_;
1595}
1596
1597uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }
1598
1599void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
1600 max_column_family_ = std::max(new_max_column_family, max_column_family_);
1601}
1602
1603size_t ColumnFamilySet::NumberOfColumnFamilies() const {
1604 return column_families_.size();
1605}
1606
1607// under a DB mutex AND write thread
1608ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
1609 const std::string& name, uint32_t id, Version* dummy_versions,
1610 const ColumnFamilyOptions& options) {
1611 assert(column_families_.find(name) == column_families_.end());
1612 ColumnFamilyData* new_cfd = new ColumnFamilyData(
1613 id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
1e59de90
TL
1614 *db_options_, &file_options_, this, block_cache_tracer_, io_tracer_,
1615 db_id_, db_session_id_);
7c673cae
FG
1616 column_families_.insert({name, id});
1617 column_family_data_.insert({id, new_cfd});
1618 max_column_family_ = std::max(max_column_family_, id);
1619 // add to linked list
1620 new_cfd->next_ = dummy_cfd_;
1621 auto prev = dummy_cfd_->prev_;
1622 new_cfd->prev_ = prev;
1623 prev->next_ = new_cfd;
1624 dummy_cfd_->prev_ = new_cfd;
1625 if (id == 0) {
1626 default_cfd_cache_ = new_cfd;
1627 }
1628 return new_cfd;
1629}
1630
7c673cae
FG
1631// under a DB mutex AND from a write thread
1632void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
1633 auto cfd_iter = column_family_data_.find(cfd->GetID());
1634 assert(cfd_iter != column_family_data_.end());
1635 column_family_data_.erase(cfd_iter);
1636 column_families_.erase(cfd->GetName());
1637}
1638
1639// under a DB mutex OR from a write thread
1640bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
1641 if (column_family_id == 0) {
1642 // optimization for common case
1643 current_ = column_family_set_->GetDefault();
1644 } else {
1645 current_ = column_family_set_->GetColumnFamily(column_family_id);
1646 }
1647 handle_.SetCFD(current_);
1648 return current_ != nullptr;
1649}
1650
1651uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
1652 assert(current_ != nullptr);
1653 return current_->GetLogNumber();
1654}
1655
1656MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
1657 assert(current_ != nullptr);
1658 return current_->mem();
1659}
1660
1661ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
1662 assert(current_ != nullptr);
1663 return &handle_;
1664}
1665
1666uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
1667 uint32_t column_family_id = 0;
1668 if (column_family != nullptr) {
20effc67 1669 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
7c673cae
FG
1670 column_family_id = cfh->GetID();
1671 }
1672 return column_family_id;
1673}
1674
1675const Comparator* GetColumnFamilyUserComparator(
1676 ColumnFamilyHandle* column_family) {
1677 if (column_family != nullptr) {
1678 return column_family->GetComparator();
1679 }
1680 return nullptr;
1681}
1682
f67539c2 1683} // namespace ROCKSDB_NAMESPACE