1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
11 #include <sys/types.h>
14 #include "rocksdb/db.h"
15 #include "rocksdb/table.h"
16 #include "rocksdb/env.h"
17 #include "rocksdb/slice.h"
18 #include "rocksdb/cache.h"
19 #include "rocksdb/filter_policy.h"
20 #include "rocksdb/utilities/convenience.h"
21 #include "rocksdb/utilities/table_properties_collectors.h"
22 #include "rocksdb/merge_operator.h"
24 #include "common/perf_counters.h"
25 #include "common/PriorityCache.h"
26 #include "include/common_fwd.h"
27 #include "include/scope_guard.h"
28 #include "include/str_list.h"
29 #include "include/stringify.h"
30 #include "include/str_map.h"
31 #include "KeyValueDB.h"
32 #include "RocksDBStore.h"
34 #include "common/debug.h"
36 #define dout_context cct
37 #define dout_subsys ceph_subsys_rocksdb
39 #define dout_prefix *_dout << "rocksdb: "
41 namespace fs
= std::filesystem
;
50 using std::unique_ptr
;
53 using ceph::bufferlist
;
54 using ceph::bufferptr
;
55 using ceph::Formatter
;
57 static const char* sharding_def_dir
= "sharding";
58 static const char* sharding_def_file
= "sharding/def";
59 static const char* sharding_recreate
= "sharding/recreate_columns";
60 static const char* resharding_column_lock
= "reshardingXcommencingXlocked";
62 static bufferlist
to_bufferlist(rocksdb::Slice in
) {
64 bl
.append(bufferptr(in
.data(), in
.size()));
68 static rocksdb::SliceParts
prepare_sliceparts(const bufferlist
&bl
,
69 vector
<rocksdb::Slice
> *slices
)
72 for (auto& buf
: bl
.buffers()) {
73 (*slices
)[n
].data_
= buf
.c_str();
74 (*slices
)[n
].size_
= buf
.length();
77 return rocksdb::SliceParts(slices
->data(), slices
->size());
82 // One of these for the default rocksdb column family, routing each prefix
83 // to the appropriate MergeOperator.
85 class RocksDBStore::MergeOperatorRouter
86 : public rocksdb::AssociativeMergeOperator
90 const char *Name() const override
{
91 // Construct a name that rocksDB will validate against. We want to
92 // do this in a way that doesn't constrain the ordering of calls
93 // to set_merge_operator, so sort the merge operators and then
94 // construct a name from all of those parts.
95 store
.assoc_name
.clear();
96 map
<std::string
,std::string
> names
;
98 for (auto& p
: store
.merge_ops
) {
99 names
[p
.first
] = p
.second
->name();
101 for (auto& p
: names
) {
102 store
.assoc_name
+= '.';
103 store
.assoc_name
+= p
.first
;
104 store
.assoc_name
+= ':';
105 store
.assoc_name
+= p
.second
;
107 return store
.assoc_name
.c_str();
110 explicit MergeOperatorRouter(RocksDBStore
&_store
) : store(_store
) {}
112 bool Merge(const rocksdb::Slice
& key
,
113 const rocksdb::Slice
* existing_value
,
114 const rocksdb::Slice
& value
,
115 std::string
* new_value
,
116 rocksdb::Logger
* logger
) const override
{
117 // for default column family
118 // extract prefix from key and compare against each registered merge op;
119 // even though merge operator for explicit CF is included in merge_ops,
120 // it won't be picked up, since it won't match.
121 for (auto& p
: store
.merge_ops
) {
122 if (p
.first
.compare(0, p
.first
.length(),
123 key
.data(), p
.first
.length()) == 0 &&
124 key
.data()[p
.first
.length()] == 0) {
125 if (existing_value
) {
126 p
.second
->merge(existing_value
->data(), existing_value
->size(),
127 value
.data(), value
.size(),
130 p
.second
->merge_nonexistent(value
.data(), value
.size(), new_value
);
135 return true; // OK :)
140 // One of these per non-default column family, linked directly to the
141 // merge operator for that CF/prefix (if any).
143 class RocksDBStore::MergeOperatorLinker
144 : public rocksdb::AssociativeMergeOperator
147 std::shared_ptr
<KeyValueDB::MergeOperator
> mop
;
149 explicit MergeOperatorLinker(const std::shared_ptr
<KeyValueDB::MergeOperator
> &o
) : mop(o
) {}
151 const char *Name() const override
{
155 bool Merge(const rocksdb::Slice
& key
,
156 const rocksdb::Slice
* existing_value
,
157 const rocksdb::Slice
& value
,
158 std::string
* new_value
,
159 rocksdb::Logger
* logger
) const override
{
160 if (existing_value
) {
161 mop
->merge(existing_value
->data(), existing_value
->size(),
162 value
.data(), value
.size(),
165 mop
->merge_nonexistent(value
.data(), value
.size(), new_value
);
171 int RocksDBStore::set_merge_operator(
172 const string
& prefix
,
173 std::shared_ptr
<KeyValueDB::MergeOperator
> mop
)
175 // If you fail here, it's because you can't do this on an open database
176 ceph_assert(db
== nullptr);
177 merge_ops
.push_back(std::make_pair(prefix
,mop
));
181 class CephRocksdbLogger
: public rocksdb::Logger
{
184 explicit CephRocksdbLogger(CephContext
*c
) : cct(c
) {
187 ~CephRocksdbLogger() override
{
191 // Write an entry to the log file with the specified format.
192 void Logv(const char* format
, va_list ap
) override
{
193 Logv(rocksdb::INFO_LEVEL
, format
, ap
);
196 // Write an entry to the log file with the specified log level
197 // and format. Any log with level under the internal log level
198 // of *this (see @SetInfoLogLevel and @GetInfoLogLevel) will not be
200 void Logv(const rocksdb::InfoLogLevel log_level
, const char* format
,
201 va_list ap
) override
{
202 int v
= rocksdb::NUM_INFO_LOG_LEVELS
- log_level
- 1;
203 dout(ceph::dout::need_dynamic(v
));
205 vsnprintf(buf
, sizeof(buf
), format
, ap
);
206 *_dout
<< buf
<< dendl
;
210 rocksdb::Logger
*create_rocksdb_ceph_logger()
212 return new CephRocksdbLogger(g_ceph_context
);
215 static int string2bool(const string
&val
, bool &b_val
)
217 if (strcasecmp(val
.c_str(), "false") == 0) {
220 } else if (strcasecmp(val
.c_str(), "true") == 0) {
225 int b
= strict_strtol(val
.c_str(), 10, &err
);
234 extern std::string
trim(const std::string
& str
);
237 // this function is a modification of rocksdb's StringToMap:
238 // 1) accepts ' \n ; as separators
239 // 2) leaves compound options with enclosing { and }
240 rocksdb::Status
StringToMap(const std::string
& opts_str
,
241 std::unordered_map
<std::string
, std::string
>* opts_map
)
243 using rocksdb::Status
;
247 // opts_str = "write_buffer_size=1024;max_write_buffer_number=2;"
248 // "nested_opt={opt1=1;opt2=2};max_bytes_for_level_base=100"
250 std::string opts
= trim(opts_str
);
251 while (pos
< opts
.size()) {
252 size_t eq_pos
= opts
.find('=', pos
);
253 if (eq_pos
== std::string::npos
) {
254 return Status::InvalidArgument("Mismatched key value pair, '=' expected");
256 std::string key
= trim(opts
.substr(pos
, eq_pos
- pos
));
258 return Status::InvalidArgument("Empty key found");
261 // skip space after '=' and look for '{' for possible nested options
263 while (pos
< opts
.size() && isspace(opts
[pos
])) {
266 // Empty value at the end
267 if (pos
>= opts
.size()) {
268 (*opts_map
)[key
] = "";
271 if (opts
[pos
] == '{') {
273 size_t brace_pos
= pos
+ 1;
274 while (brace_pos
< opts
.size()) {
275 if (opts
[brace_pos
] == '{') {
277 } else if (opts
[brace_pos
] == '}') {
285 // found the matching closing brace
287 //include both '{' and '}'
288 (*opts_map
)[key
] = trim(opts
.substr(pos
, brace_pos
- pos
+ 1));
289 // skip all whitespace and move to the next ';,'
290 // brace_pos points to the matching '}'
292 while (pos
< opts
.size() && isspace(opts
[pos
])) {
295 if (pos
< opts
.size() && opts
[pos
] != ';' && opts
[pos
] != ',') {
296 return Status::InvalidArgument(
297 "Unexpected chars after nested options");
301 return Status::InvalidArgument(
302 "Mismatched curly braces for nested options");
305 size_t sc_pos
= opts
.find_first_of(",;", pos
);
306 if (sc_pos
== std::string::npos
) {
307 (*opts_map
)[key
] = trim(opts
.substr(pos
));
308 // It either ends with a trailing , ; or the last key-value pair
311 (*opts_map
)[key
] = trim(opts
.substr(pos
, sc_pos
- pos
));
319 int RocksDBStore::tryInterpret(const string
&key
, const string
&val
, rocksdb::Options
&opt
)
321 if (key
== "compaction_threads") {
323 int f
= strict_iecstrtoll(val
, &err
);
326 //Low priority threadpool is used for compaction
327 opt
.env
->SetBackgroundThreads(f
, rocksdb::Env::Priority::LOW
);
328 } else if (key
== "flusher_threads") {
330 int f
= strict_iecstrtoll(val
, &err
);
333 //High priority threadpool is used for flusher
334 opt
.env
->SetBackgroundThreads(f
, rocksdb::Env::Priority::HIGH
);
335 } else if (key
== "compact_on_mount") {
336 int ret
= string2bool(val
, compact_on_mount
);
339 } else if (key
== "disableWAL") {
340 int ret
= string2bool(val
, disableWAL
);
344 //unrecognize config options.
350 int RocksDBStore::ParseOptionsFromString(const string
&opt_str
, rocksdb::Options
&opt
)
352 return ParseOptionsFromStringStatic(cct
, opt_str
, opt
,
353 [&](const string
& k
, const string
& v
, rocksdb::Options
& o
) {
354 return tryInterpret(k
, v
, o
);
359 int RocksDBStore::ParseOptionsFromStringStatic(
361 const string
& opt_str
,
362 rocksdb::Options
& opt
,
363 function
<int(const string
&, const string
&, rocksdb::Options
&)> interp
)
365 // keep aligned with func tryInterpret
366 const set
<string
> need_interp_keys
= {"compaction_threads", "flusher_threads", "compact_on_mount", "disableWAL"};
367 rocksdb::Status status
;
368 std::unordered_map
<std::string
, std::string
> str_map
;
369 status
= StringToMap(opt_str
, &str_map
);
371 dout(5) << __func__
<< " error '" << status
.getState() <<
372 "' while parsing options '" << opt_str
<< "'" << dendl
;
376 for (auto it
= str_map
.begin(); it
!= str_map
.end(); ++it
) {
377 string this_opt
= it
->first
+ "=" + it
->second
;
378 rocksdb::Status status
=
379 rocksdb::GetOptionsFromString(opt
, this_opt
, &opt
);
382 if (interp
!= nullptr) {
383 r
= interp(it
->first
, it
->second
, opt
);
384 } else if (!need_interp_keys
.count(it
->first
)) {
388 derr
<< status
.ToString() << dendl
;
392 lgeneric_dout(cct
, 1) << " set rocksdb option " << it
->first
393 << " = " << it
->second
<< dendl
;
398 int RocksDBStore::init(string _options_str
)
400 options_str
= _options_str
;
401 rocksdb::Options opt
;
403 if (options_str
.length()) {
404 int r
= ParseOptionsFromString(options_str
, opt
);
412 int RocksDBStore::create_db_dir()
415 unique_ptr
<rocksdb::Directory
> dir
;
416 env
->NewDirectory(path
, &dir
);
418 if (!fs::exists(path
)) {
420 if (!fs::create_directory(path
, ec
)) {
421 derr
<< __func__
<< " failed to create " << path
422 << ": " << ec
.message() << dendl
;
425 fs::permissions(path
,
426 fs::perms::owner_all
|
427 fs::perms::group_read
| fs::perms::group_exec
|
428 fs::perms::others_read
| fs::perms::others_exec
);
434 int RocksDBStore::install_cf_mergeop(
435 const string
&key_prefix
,
436 rocksdb::ColumnFamilyOptions
*cf_opt
)
438 ceph_assert(cf_opt
!= nullptr);
439 cf_opt
->merge_operator
.reset();
440 for (auto& i
: merge_ops
) {
441 if (i
.first
== key_prefix
) {
442 cf_opt
->merge_operator
.reset(new MergeOperatorLinker(i
.second
));
448 int RocksDBStore::create_and_open(ostream
&out
,
449 const std::string
& cfs
)
451 int r
= create_db_dir();
454 return do_open(out
, true, false, cfs
);
457 std::shared_ptr
<rocksdb::Cache
> RocksDBStore::create_block_cache(
458 const std::string
& cache_type
, size_t cache_size
, double cache_prio_high
) {
459 std::shared_ptr
<rocksdb::Cache
> cache
;
460 auto shard_bits
= cct
->_conf
->rocksdb_cache_shard_bits
;
461 if (cache_type
== "binned_lru") {
462 cache
= rocksdb_cache::NewBinnedLRUCache(cct
, cache_size
, shard_bits
, false, cache_prio_high
);
463 } else if (cache_type
== "lru") {
464 cache
= rocksdb::NewLRUCache(cache_size
, shard_bits
);
465 } else if (cache_type
== "clock") {
466 cache
= rocksdb::NewClockCache(cache_size
, shard_bits
);
468 derr
<< "rocksdb_cache_type '" << cache
469 << "' chosen, but RocksDB not compiled with LibTBB. "
473 derr
<< "unrecognized rocksdb_cache_type '" << cache_type
<< "'" << dendl
;
478 int RocksDBStore::load_rocksdb_options(bool create_if_missing
, rocksdb::Options
& opt
)
480 rocksdb::Status status
;
482 if (options_str
.length()) {
483 int r
= ParseOptionsFromString(options_str
, opt
);
489 if (cct
->_conf
->rocksdb_perf
) {
490 dbstats
= rocksdb::CreateDBStatistics();
491 opt
.statistics
= dbstats
;
494 opt
.create_if_missing
= create_if_missing
;
495 if (kv_options
.count("separate_wal_dir")) {
496 opt
.wal_dir
= path
+ ".wal";
499 // Since ceph::for_each_substr doesn't return a value and
500 // std::stoull does throw, we may as well just catch everything here.
502 if (kv_options
.count("db_paths")) {
504 get_str_list(kv_options
["db_paths"], "; \t", paths
);
505 for (auto& p
: paths
) {
506 size_t pos
= p
.find(',');
507 if (pos
== std::string::npos
) {
508 derr
<< __func__
<< " invalid db path item " << p
<< " in "
509 << kv_options
["db_paths"] << dendl
;
512 string path
= p
.substr(0, pos
);
513 string size_str
= p
.substr(pos
+ 1);
514 uint64_t size
= atoll(size_str
.c_str());
516 derr
<< __func__
<< " invalid db path item " << p
<< " in "
517 << kv_options
["db_paths"] << dendl
;
520 opt
.db_paths
.push_back(rocksdb::DbPath(path
, size
));
521 dout(10) << __func__
<< " db_path " << path
<< " size " << size
<< dendl
;
524 } catch (const std::system_error
& e
) {
525 return -e
.code().value();
528 if (cct
->_conf
->rocksdb_log_to_ceph_log
) {
529 opt
.info_log
.reset(new CephRocksdbLogger(cct
));
533 dout(10) << __func__
<< " using custom Env " << priv
<< dendl
;
534 opt
.env
= static_cast<rocksdb::Env
*>(priv
);
539 opt
.env
->SetAllowNonOwnerAccess(false);
542 if (!set_cache_flag
) {
543 cache_size
= cct
->_conf
->rocksdb_cache_size
;
545 uint64_t row_cache_size
= cache_size
* cct
->_conf
->rocksdb_cache_row_ratio
;
546 uint64_t block_cache_size
= cache_size
- row_cache_size
;
548 bbt_opts
.block_cache
= create_block_cache(cct
->_conf
->rocksdb_cache_type
, block_cache_size
);
549 if (!bbt_opts
.block_cache
) {
552 bbt_opts
.block_size
= cct
->_conf
->rocksdb_block_size
;
554 if (row_cache_size
> 0)
555 opt
.row_cache
= rocksdb::NewLRUCache(row_cache_size
,
556 cct
->_conf
->rocksdb_cache_shard_bits
);
557 uint64_t bloom_bits
= cct
->_conf
.get_val
<uint64_t>("rocksdb_bloom_bits_per_key");
558 if (bloom_bits
> 0) {
559 dout(10) << __func__
<< " set bloom filter bits per key to "
560 << bloom_bits
<< dendl
;
561 bbt_opts
.filter_policy
.reset(rocksdb::NewBloomFilterPolicy(bloom_bits
));
563 using std::placeholders::_1
;
564 if (cct
->_conf
.with_val
<std::string
>("rocksdb_index_type",
565 std::bind(std::equal_to
<std::string
>(), _1
,
567 bbt_opts
.index_type
= rocksdb::BlockBasedTableOptions::IndexType::kBinarySearch
;
568 if (cct
->_conf
.with_val
<std::string
>("rocksdb_index_type",
569 std::bind(std::equal_to
<std::string
>(), _1
,
571 bbt_opts
.index_type
= rocksdb::BlockBasedTableOptions::IndexType::kHashSearch
;
572 if (cct
->_conf
.with_val
<std::string
>("rocksdb_index_type",
573 std::bind(std::equal_to
<std::string
>(), _1
,
575 bbt_opts
.index_type
= rocksdb::BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch
;
576 if (!bbt_opts
.no_block_cache
) {
577 bbt_opts
.cache_index_and_filter_blocks
=
578 cct
->_conf
.get_val
<bool>("rocksdb_cache_index_and_filter_blocks");
579 bbt_opts
.cache_index_and_filter_blocks_with_high_priority
=
580 cct
->_conf
.get_val
<bool>("rocksdb_cache_index_and_filter_blocks_with_high_priority");
581 bbt_opts
.pin_l0_filter_and_index_blocks_in_cache
=
582 cct
->_conf
.get_val
<bool>("rocksdb_pin_l0_filter_and_index_blocks_in_cache");
584 bbt_opts
.partition_filters
= cct
->_conf
.get_val
<bool>("rocksdb_partition_filters");
585 if (cct
->_conf
.get_val
<Option::size_t>("rocksdb_metadata_block_size") > 0)
586 bbt_opts
.metadata_block_size
= cct
->_conf
.get_val
<Option::size_t>("rocksdb_metadata_block_size");
588 opt
.table_factory
.reset(rocksdb::NewBlockBasedTableFactory(bbt_opts
));
589 dout(10) << __func__
<< " block size " << cct
->_conf
->rocksdb_block_size
590 << ", block_cache size " << byte_u_t(block_cache_size
)
591 << ", row_cache size " << byte_u_t(row_cache_size
)
593 << (1 << cct
->_conf
->rocksdb_cache_shard_bits
)
594 << ", type " << cct
->_conf
->rocksdb_cache_type
597 opt
.merge_operator
.reset(new MergeOperatorRouter(*this));
598 comparator
= opt
.comparator
;
602 void RocksDBStore::add_column_family(const std::string
& cf_name
, uint32_t hash_l
, uint32_t hash_h
,
603 size_t shard_idx
, rocksdb::ColumnFamilyHandle
*handle
) {
604 dout(10) << __func__
<< " column_name=" << cf_name
<< " shard_idx=" << shard_idx
<<
605 " hash_l=" << hash_l
<< " hash_h=" << hash_h
<< " handle=" << (void*) handle
<< dendl
;
606 bool exists
= cf_handles
.count(cf_name
) > 0;
607 auto& column
= cf_handles
[cf_name
];
609 ceph_assert(hash_l
== column
.hash_l
);
610 ceph_assert(hash_h
== column
.hash_h
);
612 ceph_assert(hash_l
< hash_h
);
613 column
.hash_l
= hash_l
;
614 column
.hash_h
= hash_h
;
616 if (column
.handles
.size() <= shard_idx
)
617 column
.handles
.resize(shard_idx
+ 1);
618 column
.handles
[shard_idx
] = handle
;
619 cf_ids_to_prefix
.emplace(handle
->GetID(), cf_name
);
622 bool RocksDBStore::is_column_family(const std::string
& prefix
) {
623 return cf_handles
.count(prefix
);
626 std::string_view
RocksDBStore::get_key_hash_view(const prefix_shards
& shards
, const char* key
, const size_t keylen
) {
627 uint32_t hash_l
= std::min
<uint32_t>(shards
.hash_l
, keylen
);
628 uint32_t hash_h
= std::min
<uint32_t>(shards
.hash_h
, keylen
);
629 return { key
+ hash_l
, hash_h
- hash_l
};
632 rocksdb::ColumnFamilyHandle
*RocksDBStore::get_key_cf(const prefix_shards
& shards
, const char* key
, const size_t keylen
) {
633 auto sv
= get_key_hash_view(shards
, key
, keylen
);
634 uint32_t hash
= ceph_str_hash_rjenkins(sv
.data(), sv
.size());
635 return shards
.handles
[hash
% shards
.handles
.size()];
638 rocksdb::ColumnFamilyHandle
*RocksDBStore::get_cf_handle(const std::string
& prefix
, const std::string
& key
) {
639 auto iter
= cf_handles
.find(prefix
);
640 if (iter
== cf_handles
.end()) {
643 if (iter
->second
.handles
.size() == 1) {
644 return iter
->second
.handles
[0];
646 return get_key_cf(iter
->second
, key
.data(), key
.size());
651 rocksdb::ColumnFamilyHandle
*RocksDBStore::get_cf_handle(const std::string
& prefix
, const char* key
, size_t keylen
) {
652 auto iter
= cf_handles
.find(prefix
);
653 if (iter
== cf_handles
.end()) {
656 if (iter
->second
.handles
.size() == 1) {
657 return iter
->second
.handles
[0];
659 return get_key_cf(iter
->second
, key
, keylen
);
665 * If the specified IteratorBounds arg has both an upper and a lower bound defined, and they have equal placement hash
666 * strings, we can be sure that the entire iteration range exists in a single CF. In that case, we return the relevant
667 * CF handle. In all other cases, we return a nullptr to indicate that the specified bounds cannot necessarily be mapped
670 rocksdb::ColumnFamilyHandle
*RocksDBStore::check_cf_handle_bounds(const cf_handles_iterator
& iter
, const IteratorBounds
& bounds
) {
671 if (!bounds
.lower_bound
|| !bounds
.upper_bound
) {
674 ceph_assert(iter
!= cf_handles
.end());
675 ceph_assert(iter
->second
.handles
.size() != 1);
676 if (iter
->second
.hash_l
!= 0) {
679 auto lower_bound_hash_str
= get_key_hash_view(iter
->second
, bounds
.lower_bound
->data(), bounds
.lower_bound
->size());
680 auto upper_bound_hash_str
= get_key_hash_view(iter
->second
, bounds
.upper_bound
->data(), bounds
.upper_bound
->size());
681 if (lower_bound_hash_str
== upper_bound_hash_str
) {
682 auto key
= *bounds
.lower_bound
;
683 return get_key_cf(iter
->second
, key
.data(), key
.size());
690 * Definition of sharding:
691 * space-separated list of: column_def [ '=' options ]
692 * column_def := column_name '(' shard_count ')'
693 * column_def := column_name '(' shard_count ',' hash_begin '-' ')'
694 * column_def := column_name '(' shard_count ',' hash_begin '-' hash_end ')'
695 * I=write_buffer_size=1048576 O(6) m(7,10-) prefix(4,0-10)=disable_auto_compactions=true,max_bytes_for_level_base=1048576
697 bool RocksDBStore::parse_sharding_def(const std::string_view text_def_in
,
698 std::vector
<ColumnFamily
>& sharding_def
,
699 char const* *error_position
,
700 std::string
*error_msg
)
702 std::string_view text_def
= text_def_in
;
703 char const* error_position_local
= nullptr;
704 std::string error_msg_local
;
705 if (error_position
== nullptr) {
706 error_position
= &error_position_local
;
708 *error_position
= nullptr;
709 if (error_msg
== nullptr) {
710 error_msg
= &error_msg_local
;
714 sharding_def
.clear();
715 while (!text_def
.empty()) {
716 std::string_view options
;
717 std::string_view name
;
718 size_t shard_cnt
= 1;
719 uint32_t l_bound
= 0;
720 uint32_t h_bound
= std::numeric_limits
<uint32_t>::max();
722 std::string_view column_def
;
723 size_t spos
= text_def
.find(' ');
724 if (spos
== std::string_view::npos
) {
725 column_def
= text_def
;
726 text_def
= std::string_view(text_def
.end(), 0);
728 column_def
= text_def
.substr(0, spos
);
729 text_def
= text_def
.substr(spos
+ 1);
731 size_t eqpos
= column_def
.find('=');
732 if (eqpos
!= std::string_view::npos
) {
733 options
= column_def
.substr(eqpos
+ 1);
734 column_def
= column_def
.substr(0, eqpos
);
737 size_t bpos
= column_def
.find('(');
738 if (bpos
!= std::string_view::npos
) {
739 name
= column_def
.substr(0, bpos
);
740 const char* nptr
= &column_def
[bpos
+ 1];
742 shard_cnt
= strtol(nptr
, &endptr
, 10);
743 if (nptr
== endptr
) {
744 *error_position
= nptr
;
745 *error_msg
= "expecting integer";
751 l_bound
= strtol(nptr
, &endptr
, 10);
752 if (nptr
== endptr
) {
753 *error_position
= nptr
;
754 *error_msg
= "expecting integer";
759 *error_position
= nptr
;
760 *error_msg
= "expecting '-'";
764 h_bound
= strtol(nptr
, &endptr
, 10);
765 if (nptr
== endptr
) {
766 h_bound
= std::numeric_limits
<uint32_t>::max();
771 *error_position
= nptr
;
772 *error_msg
= "expecting ')'";
778 sharding_def
.emplace_back(std::string(name
), shard_cnt
, std::string(options
), l_bound
, h_bound
);
780 return *error_position
== nullptr;
783 void RocksDBStore::sharding_def_to_columns(const std::vector
<ColumnFamily
>& sharding_def
,
784 std::vector
<std::string
>& columns
)
787 for (size_t i
= 0; i
< sharding_def
.size(); i
++) {
788 if (sharding_def
[i
].shard_cnt
== 1) {
789 columns
.push_back(sharding_def
[i
].name
);
791 for (size_t j
= 0; j
< sharding_def
[i
].shard_cnt
; j
++) {
792 columns
.push_back(sharding_def
[i
].name
+ "-" + std::to_string(j
));
798 int RocksDBStore::create_shards(const rocksdb::Options
& opt
,
799 const std::vector
<ColumnFamily
>& sharding_def
)
801 for (auto& p
: sharding_def
) {
802 // copy default CF settings, block cache, merge operators as
803 // the base for new CF
804 rocksdb::ColumnFamilyOptions
cf_opt(opt
);
805 rocksdb::Status status
;
806 // apply options to column family
807 int r
= update_column_family_options(p
.name
, p
.options
, &cf_opt
);
811 for (size_t idx
= 0; idx
< p
.shard_cnt
; idx
++) {
813 if (p
.shard_cnt
== 1)
816 cf_name
= p
.name
+ "-" + std::to_string(idx
);
817 rocksdb::ColumnFamilyHandle
*cf
;
818 status
= db
->CreateColumnFamily(cf_opt
, cf_name
, &cf
);
820 derr
<< __func__
<< " Failed to create rocksdb column family: "
824 // store the new CF handle
825 add_column_family(p
.name
, p
.hash_l
, p
.hash_h
, idx
, cf
);
831 int RocksDBStore::apply_sharding(const rocksdb::Options
& opt
,
832 const std::string
& sharding_text
)
834 // create and open column families
835 if (!sharding_text
.empty()) {
838 rocksdb::Status status
;
839 std::vector
<ColumnFamily
> sharding_def
;
840 char const* error_position
;
841 std::string error_msg
;
842 b
= parse_sharding_def(sharding_text
, sharding_def
, &error_position
, &error_msg
);
844 dout(1) << __func__
<< " bad sharding: " << dendl
;
845 dout(1) << __func__
<< sharding_text
<< dendl
;
846 dout(1) << __func__
<< std::string(error_position
- &sharding_text
[0], ' ') << "^" << error_msg
<< dendl
;
849 r
= create_shards(opt
, sharding_def
);
851 derr
<< __func__
<< " create_shards failed error=" << r
<< dendl
;
854 opt
.env
->CreateDir(sharding_def_dir
);
855 status
= rocksdb::WriteStringToFile(opt
.env
, sharding_text
,
856 sharding_def_file
, true);
858 derr
<< __func__
<< " cannot write to " << sharding_def_file
<< dendl
;
862 opt
.env
->DeleteFile(sharding_def_file
);
867 // linking to rocksdb function defined in options_helper.cc
868 // it can parse nested params like "nested_opt={opt1=1;opt2=2}"
869 extern rocksdb::Status
rocksdb::StringToMap(const std::string
& opts_str
,
870 std::unordered_map
<std::string
, std::string
>* opts_map
);
872 // Splits column family options from single string into name->value column_opts_map.
873 // The split is done using RocksDB parser that understands "{" and "}", so it
874 // properly extracts compound options.
875 // If non-RocksDB option "block_cache" is defined it is extracted to block_cache_opt.
876 int RocksDBStore::split_column_family_options(const std::string
& options
,
877 std::unordered_map
<std::string
, std::string
>* opt_map
,
878 std::string
* block_cache_opt
)
880 dout(20) << __func__
<< " options=" << options
<< dendl
;
881 rocksdb::Status status
= rocksdb::StringToMap(options
, opt_map
);
883 dout(5) << __func__
<< " error '" << status
.getState()
884 << "' while parsing options '" << options
<< "'" << dendl
;
887 // if "block_cache" option exists, then move it to separate string
888 if (auto it
= opt_map
->find("block_cache"); it
!= opt_map
->end()) {
889 *block_cache_opt
= it
->second
;
892 block_cache_opt
->clear();
897 // Updates column family options.
898 // Take options from more_options and apply them to cf_opt.
899 // Allowed options are exactly the same as allowed for column families in RocksDB.
900 // Ceph addition is "block_cache" option that is translated to block_cache and
901 // allows to specialize separate block cache for O column family.
903 // base_name - name of column without shard suffix: "-"+number
904 // options - additional options to apply
905 // cf_opt - column family options to update
906 int RocksDBStore::update_column_family_options(const std::string
& base_name
,
907 const std::string
& more_options
,
908 rocksdb::ColumnFamilyOptions
* cf_opt
)
910 std::unordered_map
<std::string
, std::string
> options_map
;
911 std::string block_cache_opt
;
912 rocksdb::Status status
;
913 int r
= split_column_family_options(more_options
, &options_map
, &block_cache_opt
);
915 dout(5) << __func__
<< " failed to parse options; column family=" << base_name
916 << " options=" << more_options
<< dendl
;
919 status
= rocksdb::GetColumnFamilyOptionsFromMap(*cf_opt
, options_map
, cf_opt
);
921 dout(5) << __func__
<< " invalid column family optionsp; column family="
922 << base_name
<< " options=" << more_options
<< dendl
;
923 dout(5) << __func__
<< " RocksDB error='" << status
.getState() << "'" << dendl
;
926 if (base_name
!= rocksdb::kDefaultColumnFamilyName
) {
927 // default cf has its merge operator defined in load_rocksdb_options, should not override it
928 install_cf_mergeop(base_name
, cf_opt
);
930 if (!block_cache_opt
.empty()) {
931 r
= apply_block_cache_options(base_name
, block_cache_opt
, cf_opt
);
933 // apply_block_cache_options already does all necessary douts
938 // Set Compact on Deletion Factory
939 if (cct
->_conf
->rocksdb_cf_compact_on_deletion
) {
940 size_t sliding_window
= cct
->_conf
->rocksdb_cf_compact_on_deletion_sliding_window
;
941 size_t trigger
= cct
->_conf
->rocksdb_cf_compact_on_deletion_trigger
;
942 cf_opt
->table_properties_collector_factories
.emplace_back(
943 rocksdb::NewCompactOnDeletionCollectorFactory(sliding_window
, trigger
));
948 int RocksDBStore::apply_block_cache_options(const std::string
& column_name
,
949 const std::string
& block_cache_opt
,
950 rocksdb::ColumnFamilyOptions
* cf_opt
)
952 rocksdb::Status status
;
953 std::unordered_map
<std::string
, std::string
> cache_options_map
;
954 status
= rocksdb::StringToMap(block_cache_opt
, &cache_options_map
);
956 dout(5) << __func__
<< " invalid block cache options; column=" << column_name
957 << " options=" << block_cache_opt
<< dendl
;
958 dout(5) << __func__
<< " RocksDB error='" << status
.getState() << "'" << dendl
;
961 bool require_new_block_cache
= false;
962 std::string cache_type
= cct
->_conf
->rocksdb_cache_type
;
963 if (const auto it
= cache_options_map
.find("type"); it
!= cache_options_map
.end()) {
964 cache_type
= it
->second
;
965 cache_options_map
.erase(it
);
966 require_new_block_cache
= true;
968 size_t cache_size
= cct
->_conf
->rocksdb_cache_size
;
969 if (auto it
= cache_options_map
.find("size"); it
!= cache_options_map
.end()) {
971 cache_size
= strict_iecstrtoll(it
->second
.c_str(), &error
);
972 if (!error
.empty()) {
973 dout(10) << __func__
<< " invalid size: '" << it
->second
<< "'" << dendl
;
976 cache_options_map
.erase(it
);
977 require_new_block_cache
= true;
979 double high_pri_pool_ratio
= 0.0;
980 if (auto it
= cache_options_map
.find("high_ratio"); it
!= cache_options_map
.end()) {
982 high_pri_pool_ratio
= strict_strtod(it
->second
.c_str(), &error
);
983 if (!error
.empty()) {
984 dout(10) << __func__
<< " invalid high_pri (float): '" << it
->second
<< "'" << dendl
;
987 cache_options_map
.erase(it
);
988 require_new_block_cache
= true;
991 rocksdb::BlockBasedTableOptions column_bbt_opts
;
992 status
= GetBlockBasedTableOptionsFromMap(bbt_opts
, cache_options_map
, &column_bbt_opts
);
994 dout(5) << __func__
<< " invalid block cache options; column=" << column_name
995 << " options=" << block_cache_opt
<< dendl
;
996 dout(5) << __func__
<< " RocksDB error='" << status
.getState() << "'" << dendl
;
999 std::shared_ptr
<rocksdb::Cache
> block_cache
;
1000 if (column_bbt_opts
.no_block_cache
) {
1001 // clear all settings except no_block_cache
1002 // rocksdb does not like then
1003 column_bbt_opts
= rocksdb::BlockBasedTableOptions();
1004 column_bbt_opts
.no_block_cache
= true;
1006 if (require_new_block_cache
) {
1007 block_cache
= create_block_cache(cache_type
, cache_size
, high_pri_pool_ratio
);
1009 dout(5) << __func__
<< " failed to create block cache for params: " << block_cache_opt
<< dendl
;
1013 block_cache
= bbt_opts
.block_cache
;
1016 column_bbt_opts
.block_cache
= block_cache
;
1017 cf_bbt_opts
[column_name
] = column_bbt_opts
;
1018 cf_opt
->table_factory
.reset(NewBlockBasedTableFactory(cf_bbt_opts
[column_name
]));
1022 int RocksDBStore::verify_sharding(const rocksdb::Options
& opt
,
1023 std::vector
<rocksdb::ColumnFamilyDescriptor
>& existing_cfs
,
1024 std::vector
<std::pair
<size_t, RocksDBStore::ColumnFamily
> >& existing_cfs_shard
,
1025 std::vector
<rocksdb::ColumnFamilyDescriptor
>& missing_cfs
,
1026 std::vector
<std::pair
<size_t, RocksDBStore::ColumnFamily
> >& missing_cfs_shard
)
1028 rocksdb::Status status
;
1029 std::string stored_sharding_text
;
1030 status
= opt
.env
->FileExists(sharding_def_file
);
1032 status
= rocksdb::ReadFileToString(opt
.env
,
1034 &stored_sharding_text
);
1036 derr
<< __func__
<< " cannot read from " << sharding_def_file
<< dendl
;
1039 dout(20) << __func__
<< " sharding=" << stored_sharding_text
<< dendl
;
1041 dout(30) << __func__
<< " no sharding" << dendl
;
1042 //no "sharding_def" present
1044 //check if sharding_def matches stored_sharding_def
1045 std::vector
<ColumnFamily
> stored_sharding_def
;
1046 parse_sharding_def(stored_sharding_text
, stored_sharding_def
);
1048 std::sort(stored_sharding_def
.begin(), stored_sharding_def
.end(),
1049 [](ColumnFamily
& a
, ColumnFamily
& b
) { return a
.name
< b
.name
; } );
1051 std::vector
<string
> rocksdb_cfs
;
1052 status
= rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(opt
),
1053 path
, &rocksdb_cfs
);
1055 derr
<< __func__
<< " unable to list column families: " << status
.ToString() << dendl
;
1058 dout(5) << __func__
<< " column families from rocksdb: " << rocksdb_cfs
<< dendl
;
1060 auto emplace_cf
= [&] (const RocksDBStore::ColumnFamily
& column
,
1062 const std::string
& shard_name
,
1063 const rocksdb::ColumnFamilyOptions
& opt
) {
1064 if (std::find(rocksdb_cfs
.begin(), rocksdb_cfs
.end(), shard_name
) != rocksdb_cfs
.end()) {
1065 existing_cfs
.emplace_back(shard_name
, opt
);
1066 existing_cfs_shard
.emplace_back(shard_id
, column
);
1068 missing_cfs
.emplace_back(shard_name
, opt
);
1069 missing_cfs_shard
.emplace_back(shard_id
, column
);
1073 for (auto& column
: stored_sharding_def
) {
1074 rocksdb::ColumnFamilyOptions
cf_opt(opt
);
1075 int r
= update_column_family_options(column
.name
, column
.options
, &cf_opt
);
1079 if (column
.shard_cnt
== 1) {
1080 emplace_cf(column
, 0, column
.name
, cf_opt
);
1082 for (size_t i
= 0; i
< column
.shard_cnt
; i
++) {
1083 std::string cf_name
= column
.name
+ "-" + std::to_string(i
);
1084 emplace_cf(column
, i
, cf_name
, cf_opt
);
1088 existing_cfs
.emplace_back("default", opt
);
1090 if (existing_cfs
.size() != rocksdb_cfs
.size()) {
1091 std::vector
<std::string
> columns_from_stored
;
1092 sharding_def_to_columns(stored_sharding_def
, columns_from_stored
);
1093 derr
<< __func__
<< " extra columns in rocksdb. rocksdb columns = " << rocksdb_cfs
1094 << " target columns = " << columns_from_stored
<< dendl
;
1100 std::ostream
& operator<<(std::ostream
& out
, const RocksDBStore::ColumnFamily
& cf
)
1105 out
<< cf
.shard_cnt
;
1109 if (cf
.hash_h
!= std::numeric_limits
<uint32_t>::max()) {
1118 int RocksDBStore::do_open(ostream
&out
,
1119 bool create_if_missing
,
1121 const std::string
& sharding_text
)
1123 ceph_assert(!(create_if_missing
&& open_readonly
));
1124 rocksdb::Options opt
;
1125 int r
= load_rocksdb_options(create_if_missing
, opt
);
1127 dout(1) << __func__
<< " load rocksdb options failed" << dendl
;
1130 rocksdb::Status status
;
1131 if (create_if_missing
) {
1132 status
= rocksdb::DB::Open(opt
, path
, &db
);
1134 derr
<< status
.ToString() << dendl
;
1137 r
= apply_sharding(opt
, sharding_text
);
1141 default_cf
= db
->DefaultColumnFamily();
1143 std::vector
<rocksdb::ColumnFamilyDescriptor
> existing_cfs
;
1144 std::vector
<std::pair
<size_t, RocksDBStore::ColumnFamily
> > existing_cfs_shard
;
1145 std::vector
<rocksdb::ColumnFamilyDescriptor
> missing_cfs
;
1146 std::vector
<std::pair
<size_t, RocksDBStore::ColumnFamily
> > missing_cfs_shard
;
1148 r
= verify_sharding(opt
,
1149 existing_cfs
, existing_cfs_shard
,
1150 missing_cfs
, missing_cfs_shard
);
1154 std::string sharding_recreate_text
;
1155 status
= rocksdb::ReadFileToString(opt
.env
,
1157 &sharding_recreate_text
);
1158 bool recreate_mode
= status
.ok() && sharding_recreate_text
== "1";
1160 ceph_assert(!recreate_mode
|| !open_readonly
);
1161 if (recreate_mode
== false && missing_cfs
.size() != 0) {
1162 // We do not accept when there are missing column families, except case that we are during resharding.
1163 // We can get into this case if resharding was interrupted. It gives a chance to continue.
1164 // Opening DB is only allowed in read-only mode.
1165 if (open_readonly
== false &&
1166 std::find_if(missing_cfs
.begin(), missing_cfs
.end(),
1167 [](const rocksdb::ColumnFamilyDescriptor
& c
) { return c
.name
== resharding_column_lock
; }
1168 ) != missing_cfs
.end()) {
1169 derr
<< __func__
<< " missing column families: " << missing_cfs_shard
<< dendl
;
1174 if (existing_cfs
.empty()) {
1175 // no column families
1176 if (open_readonly
) {
1177 status
= rocksdb::DB::OpenForReadOnly(opt
, path
, &db
);
1179 status
= rocksdb::DB::Open(opt
, path
, &db
);
1182 derr
<< status
.ToString() << dendl
;
1185 default_cf
= db
->DefaultColumnFamily();
1187 std::vector
<rocksdb::ColumnFamilyHandle
*> handles
;
1188 if (open_readonly
) {
1189 status
= rocksdb::DB::OpenForReadOnly(rocksdb::DBOptions(opt
),
1193 status
= rocksdb::DB::Open(rocksdb::DBOptions(opt
),
1194 path
, existing_cfs
, &handles
, &db
);
1197 derr
<< status
.ToString() << dendl
;
1200 ceph_assert(existing_cfs
.size() == existing_cfs_shard
.size() + 1);
1201 ceph_assert(handles
.size() == existing_cfs
.size());
1202 dout(10) << __func__
<< " existing_cfs=" << existing_cfs
.size() << dendl
;
1203 for (size_t i
= 0; i
< existing_cfs_shard
.size(); i
++) {
1204 add_column_family(existing_cfs_shard
[i
].second
.name
,
1205 existing_cfs_shard
[i
].second
.hash_l
,
1206 existing_cfs_shard
[i
].second
.hash_h
,
1207 existing_cfs_shard
[i
].first
,
1210 default_cf
= handles
[handles
.size() - 1];
1211 must_close_default_cf
= true;
1213 if (missing_cfs
.size() > 0 &&
1214 std::find_if(missing_cfs
.begin(), missing_cfs
.end(),
1215 [](const rocksdb::ColumnFamilyDescriptor
& c
) { return c
.name
== resharding_column_lock
; }
1216 ) == missing_cfs
.end())
1218 dout(10) << __func__
<< " missing_cfs=" << missing_cfs
.size() << dendl
;
1219 ceph_assert(recreate_mode
);
1220 ceph_assert(missing_cfs
.size() == missing_cfs_shard
.size());
1221 for (size_t i
= 0; i
< missing_cfs
.size(); i
++) {
1222 rocksdb::ColumnFamilyHandle
*cf
;
1223 status
= db
->CreateColumnFamily(missing_cfs
[i
].options
, missing_cfs
[i
].name
, &cf
);
1225 derr
<< __func__
<< " Failed to create rocksdb column family: "
1226 << missing_cfs
[i
].name
<< dendl
;
1229 add_column_family(missing_cfs_shard
[i
].second
.name
,
1230 missing_cfs_shard
[i
].second
.hash_l
,
1231 missing_cfs_shard
[i
].second
.hash_h
,
1232 missing_cfs_shard
[i
].first
,
1235 opt
.env
->DeleteFile(sharding_recreate
);
1239 ceph_assert(default_cf
!= nullptr);
1241 PerfCountersBuilder
plb(cct
, "rocksdb", l_rocksdb_first
, l_rocksdb_last
);
1242 plb
.add_time_avg(l_rocksdb_get_latency
, "get_latency", "Get latency");
1243 plb
.add_time_avg(l_rocksdb_submit_latency
, "submit_latency", "Submit Latency");
1244 plb
.add_time_avg(l_rocksdb_submit_sync_latency
, "submit_sync_latency", "Submit Sync Latency");
1245 plb
.add_u64_counter(l_rocksdb_compact
, "compact", "Compactions");
1246 plb
.add_u64_counter(l_rocksdb_compact_range
, "compact_range", "Compactions by range");
1247 plb
.add_u64_counter(l_rocksdb_compact_queue_merge
, "compact_queue_merge", "Mergings of ranges in compaction queue");
1248 plb
.add_u64(l_rocksdb_compact_queue_len
, "compact_queue_len", "Length of compaction queue");
1249 plb
.add_time_avg(l_rocksdb_write_wal_time
, "rocksdb_write_wal_time", "Rocksdb write wal time");
1250 plb
.add_time_avg(l_rocksdb_write_memtable_time
, "rocksdb_write_memtable_time", "Rocksdb write memtable time");
1251 plb
.add_time_avg(l_rocksdb_write_delay_time
, "rocksdb_write_delay_time", "Rocksdb write delay time");
1252 plb
.add_time_avg(l_rocksdb_write_pre_and_post_process_time
,
1253 "rocksdb_write_pre_and_post_time", "total time spent on writing a record, excluding write process");
1254 logger
= plb
.create_perf_counters();
1255 cct
->get_perfcounters_collection()->add(logger
);
1257 if (compact_on_mount
) {
1258 derr
<< "Compacting rocksdb store..." << dendl
;
1260 derr
<< "Finished compacting rocksdb store" << dendl
;
1265 int RocksDBStore::_test_init(const string
& dir
)
1267 rocksdb::Options options
;
1268 options
.create_if_missing
= true;
1270 rocksdb::Status status
= rocksdb::DB::Open(options
, dir
, &db
);
1273 return status
.ok() ? 0 : -EIO
;
1276 RocksDBStore::~RocksDBStore()
1280 delete static_cast<rocksdb::Env
*>(priv
);
1284 void RocksDBStore::close()
1286 // stop compaction thread
1287 compact_queue_lock
.lock();
1288 if (compact_thread
.is_started()) {
1289 dout(1) << __func__
<< " waiting for compaction thread to stop" << dendl
;
1290 compact_queue_stop
= true;
1291 compact_queue_cond
.notify_all();
1292 compact_queue_lock
.unlock();
1293 compact_thread
.join();
1294 dout(1) << __func__
<< " compaction thread to stopped" << dendl
;
1296 compact_queue_lock
.unlock();
1300 cct
->get_perfcounters_collection()->remove(logger
);
1305 // Ensure db is destroyed before dependent db_cache and filterpolicy
1306 for (auto& p
: cf_handles
) {
1307 for (size_t i
= 0; i
< p
.second
.handles
.size(); i
++) {
1308 db
->DestroyColumnFamilyHandle(p
.second
.handles
[i
]);
1312 if (must_close_default_cf
) {
1313 db
->DestroyColumnFamilyHandle(default_cf
);
1314 must_close_default_cf
= false;
1316 default_cf
= nullptr;
1321 int RocksDBStore::repair(std::ostream
&out
)
1323 rocksdb::Status status
;
1324 rocksdb::Options opt
;
1325 int r
= load_rocksdb_options(false, opt
);
1327 dout(1) << __func__
<< " load rocksdb options failed" << dendl
;
1328 out
<< "load rocksdb options failed" << std::endl
;
1331 //need to save sharding definition, repairDB will delete files it does not know
1332 std::string stored_sharding_text
;
1333 status
= opt
.env
->FileExists(sharding_def_file
);
1335 status
= rocksdb::ReadFileToString(opt
.env
,
1337 &stored_sharding_text
);
1339 stored_sharding_text
.clear();
1342 dout(10) << __func__
<< " stored_sharding: " << stored_sharding_text
<< dendl
;
1343 status
= rocksdb::RepairDB(path
, opt
);
1344 bool repaired
= status
.ok();
1345 if (!stored_sharding_text
.empty()) {
1346 //recreate markers even if repair failed
1347 opt
.env
->CreateDir(sharding_def_dir
);
1348 status
= rocksdb::WriteStringToFile(opt
.env
, stored_sharding_text
,
1349 sharding_def_file
, true);
1351 derr
<< __func__
<< " cannot write to " << sharding_def_file
<< dendl
;
1354 status
= rocksdb::WriteStringToFile(opt
.env
, "1",
1355 sharding_recreate
, true);
1357 derr
<< __func__
<< " cannot write to " << sharding_recreate
<< dendl
;
1360 // fiinalize sharding recreate
1361 if (do_open(out
, false, false)) {
1362 derr
<< __func__
<< " cannot finalize repair" << dendl
;
1368 if (repaired
&& status
.ok()) {
1371 out
<< "repair rocksdb failed : " << status
.ToString() << std::endl
;
1376 void RocksDBStore::split_stats(const std::string
&s
, char delim
, std::vector
<std::string
> &elems
) {
1377 std::stringstream ss
;
1380 while (std::getline(ss
, item
, delim
)) {
1381 elems
.push_back(item
);
1385 bool RocksDBStore::get_property(
1386 const std::string
&property
,
1389 return db
->GetIntProperty(property
, out
);
1392 int64_t RocksDBStore::estimate_prefix_size(const string
& prefix
,
1393 const string
& key_prefix
)
1396 auto p_iter
= cf_handles
.find(prefix
);
1397 if (p_iter
!= cf_handles
.end()) {
1398 for (auto cf
: p_iter
->second
.handles
) {
1400 string start
= key_prefix
+ string(1, '\x00');
1401 string limit
= key_prefix
+ string("\xff\xff\xff\xff");
1402 rocksdb::Range
r(start
, limit
);
1403 db
->GetApproximateSizes(cf
, &r
, 1, &s
);
1407 string start
= combine_strings(prefix
, key_prefix
);
1408 string limit
= combine_strings(prefix
, key_prefix
+ "\xff\xff\xff\xff");
1409 rocksdb::Range
r(start
, limit
);
1410 db
->GetApproximateSizes(default_cf
, &r
, 1, &size
);
1415 void RocksDBStore::get_statistics(Formatter
*f
)
1417 if (!cct
->_conf
->rocksdb_perf
) {
1418 dout(20) << __func__
<< " RocksDB perf is disabled, can't probe for stats"
1423 if (cct
->_conf
->rocksdb_collect_compaction_stats
) {
1424 std::string stat_str
;
1425 bool status
= db
->GetProperty("rocksdb.stats", &stat_str
);
1427 f
->open_object_section("rocksdb_statistics");
1428 f
->dump_string("rocksdb_compaction_statistics", "");
1429 vector
<string
> stats
;
1430 split_stats(stat_str
, '\n', stats
);
1431 for (auto st
:stats
) {
1432 f
->dump_string("", st
);
1437 if (cct
->_conf
->rocksdb_collect_extended_stats
) {
1439 f
->open_object_section("rocksdb_extended_statistics");
1440 string stat_str
= dbstats
->ToString();
1441 vector
<string
> stats
;
1442 split_stats(stat_str
, '\n', stats
);
1443 f
->dump_string("rocksdb_extended_statistics", "");
1444 for (auto st
:stats
) {
1445 f
->dump_string(".", st
);
1449 f
->open_object_section("rocksdbstore_perf_counters");
1450 logger
->dump_formatted(f
, false, false);
1453 if (cct
->_conf
->rocksdb_collect_memory_stats
) {
1454 f
->open_object_section("rocksdb_memtable_statistics");
1456 if (!bbt_opts
.no_block_cache
) {
1457 str
.append(stringify(bbt_opts
.block_cache
->GetUsage()));
1458 f
->dump_string("block_cache_usage", str
.data());
1460 str
.append(stringify(bbt_opts
.block_cache
->GetPinnedUsage()));
1461 f
->dump_string("block_cache_pinned_blocks_usage", str
);
1464 db
->GetProperty("rocksdb.cur-size-all-mem-tables", &str
);
1465 f
->dump_string("rocksdb_memtable_usage", str
);
1467 db
->GetProperty("rocksdb.estimate-table-readers-mem", &str
);
1468 f
->dump_string("rocksdb_index_filter_blocks_usage", str
);
1473 struct RocksDBStore::RocksWBHandler
: public rocksdb::WriteBatch::Handler
{
1474 RocksWBHandler(const RocksDBStore
& db
) : db(db
) {}
1475 const RocksDBStore
& db
;
1476 std::stringstream seen
;
1479 void dump(const char* op_name
,
1480 uint32_t column_family_id
,
1481 const rocksdb::Slice
& key_in
,
1482 const rocksdb::Slice
* value
= nullptr) {
1485 ssize_t size
= value
? value
->size() : -1;
1486 seen
<< std::endl
<< op_name
<< "(";
1488 if (column_family_id
== 0) {
1489 db
.split_key(key_in
, &prefix
, &key
);
1491 auto it
= db
.cf_ids_to_prefix
.find(column_family_id
);
1492 ceph_assert(it
!= db
.cf_ids_to_prefix
.end());
1493 prefix
= it
->second
;
1494 key
= key_in
.ToString();
1496 seen
<< " prefix = " << prefix
;
1497 seen
<< " key = " << pretty_binary_string(key
);
1499 seen
<< " value size = " << std::to_string(size
);
1503 void Put(const rocksdb::Slice
& key
,
1504 const rocksdb::Slice
& value
) override
{
1505 dump("Put", 0, key
, &value
);
1507 rocksdb::Status
PutCF(uint32_t column_family_id
, const rocksdb::Slice
& key
,
1508 const rocksdb::Slice
& value
) override
{
1509 dump("PutCF", column_family_id
, key
, &value
);
1510 return rocksdb::Status::OK();
1512 void SingleDelete(const rocksdb::Slice
& key
) override
{
1513 dump("SingleDelete", 0, key
);
1515 rocksdb::Status
SingleDeleteCF(uint32_t column_family_id
, const rocksdb::Slice
& key
) override
{
1516 dump("SingleDeleteCF", column_family_id
, key
);
1517 return rocksdb::Status::OK();
1519 void Delete(const rocksdb::Slice
& key
) override
{
1520 dump("Delete", 0, key
);
1522 rocksdb::Status
DeleteCF(uint32_t column_family_id
, const rocksdb::Slice
& key
) override
{
1523 dump("DeleteCF", column_family_id
, key
);
1524 return rocksdb::Status::OK();
1526 void Merge(const rocksdb::Slice
& key
,
1527 const rocksdb::Slice
& value
) override
{
1528 dump("Merge", 0, key
, &value
);
1530 rocksdb::Status
MergeCF(uint32_t column_family_id
, const rocksdb::Slice
& key
,
1531 const rocksdb::Slice
& value
) override
{
1532 dump("MergeCF", column_family_id
, key
, &value
);
1533 return rocksdb::Status::OK();
1535 bool Continue() override
{ return num_seen
< 50; }
1538 int RocksDBStore::submit_common(rocksdb::WriteOptions
& woptions
, KeyValueDB::Transaction t
)
1540 // enable rocksdb breakdown
1541 // considering performance overhead, default is disabled
1542 if (cct
->_conf
->rocksdb_perf
) {
1543 rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex
);
1544 rocksdb::get_perf_context()->Reset();
1547 RocksDBTransactionImpl
* _t
=
1548 static_cast<RocksDBTransactionImpl
*>(t
.get());
1549 woptions
.disableWAL
= disableWAL
;
1550 lgeneric_subdout(cct
, rocksdb
, 30) << __func__
;
1551 RocksWBHandler
bat_txc(*this);
1552 _t
->bat
.Iterate(&bat_txc
);
1553 *_dout
<< " Rocksdb transaction: " << bat_txc
.seen
.str() << dendl
;
1555 rocksdb::Status s
= db
->Write(woptions
, &_t
->bat
);
1557 RocksWBHandler
rocks_txc(*this);
1558 _t
->bat
.Iterate(&rocks_txc
);
1559 derr
<< __func__
<< " error: " << s
.ToString() << " code = " << s
.code()
1560 << " Rocksdb transaction: " << rocks_txc
.seen
.str() << dendl
;
1563 if (cct
->_conf
->rocksdb_perf
) {
1564 utime_t write_memtable_time
;
1565 utime_t write_delay_time
;
1566 utime_t write_wal_time
;
1567 utime_t write_pre_and_post_process_time
;
1568 write_wal_time
.set_from_double(
1569 static_cast<double>(rocksdb::get_perf_context()->write_wal_time
)/1000000000);
1570 write_memtable_time
.set_from_double(
1571 static_cast<double>(rocksdb::get_perf_context()->write_memtable_time
)/1000000000);
1572 write_delay_time
.set_from_double(
1573 static_cast<double>(rocksdb::get_perf_context()->write_delay_time
)/1000000000);
1574 write_pre_and_post_process_time
.set_from_double(
1575 static_cast<double>(rocksdb::get_perf_context()->write_pre_and_post_process_time
)/1000000000);
1576 logger
->tinc(l_rocksdb_write_memtable_time
, write_memtable_time
);
1577 logger
->tinc(l_rocksdb_write_delay_time
, write_delay_time
);
1578 logger
->tinc(l_rocksdb_write_wal_time
, write_wal_time
);
1579 logger
->tinc(l_rocksdb_write_pre_and_post_process_time
, write_pre_and_post_process_time
);
1582 return s
.ok() ? 0 : -1;
1585 int RocksDBStore::submit_transaction(KeyValueDB::Transaction t
)
1587 utime_t start
= ceph_clock_now();
1588 rocksdb::WriteOptions woptions
;
1589 woptions
.sync
= false;
1591 int result
= submit_common(woptions
, t
);
1593 utime_t lat
= ceph_clock_now() - start
;
1594 logger
->tinc(l_rocksdb_submit_latency
, lat
);
1599 int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t
)
1601 utime_t start
= ceph_clock_now();
1602 rocksdb::WriteOptions woptions
;
1603 // if disableWAL, sync can't set
1604 woptions
.sync
= !disableWAL
;
1606 int result
= submit_common(woptions
, t
);
1608 utime_t lat
= ceph_clock_now() - start
;
1609 logger
->tinc(l_rocksdb_submit_sync_latency
, lat
);
1614 RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore
*_db
)
1619 void RocksDBStore::RocksDBTransactionImpl::put_bat(
1620 rocksdb::WriteBatch
& bat
,
1621 rocksdb::ColumnFamilyHandle
*cf
,
1623 const bufferlist
&to_set_bl
)
1625 // bufferlist::c_str() is non-constant, so we can't call c_str()
1626 if (to_set_bl
.is_contiguous() && to_set_bl
.length() > 0) {
1628 rocksdb::Slice(key
),
1629 rocksdb::Slice(to_set_bl
.buffers().front().c_str(),
1630 to_set_bl
.length()));
1632 rocksdb::Slice
key_slice(key
);
1633 vector
<rocksdb::Slice
> value_slices(to_set_bl
.get_num_buffers());
1635 rocksdb::SliceParts(&key_slice
, 1),
1636 prepare_sliceparts(to_set_bl
, &value_slices
));
1640 void RocksDBStore::RocksDBTransactionImpl::set(
1641 const string
&prefix
,
1643 const bufferlist
&to_set_bl
)
1645 auto cf
= db
->get_cf_handle(prefix
, k
);
1647 put_bat(bat
, cf
, k
, to_set_bl
);
1649 string key
= combine_strings(prefix
, k
);
1650 put_bat(bat
, db
->default_cf
, key
, to_set_bl
);
1654 void RocksDBStore::RocksDBTransactionImpl::set(
1655 const string
&prefix
,
1656 const char *k
, size_t keylen
,
1657 const bufferlist
&to_set_bl
)
1659 auto cf
= db
->get_cf_handle(prefix
, k
, keylen
);
1661 string
key(k
, keylen
); // fixme?
1662 put_bat(bat
, cf
, key
, to_set_bl
);
1665 combine_strings(prefix
, k
, keylen
, &key
);
1666 put_bat(bat
, cf
, key
, to_set_bl
);
1670 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string
&prefix
,
1673 auto cf
= db
->get_cf_handle(prefix
, k
);
1675 bat
.Delete(cf
, rocksdb::Slice(k
));
1677 bat
.Delete(db
->default_cf
, combine_strings(prefix
, k
));
1681 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string
&prefix
,
1685 auto cf
= db
->get_cf_handle(prefix
, k
, keylen
);
1687 bat
.Delete(cf
, rocksdb::Slice(k
, keylen
));
1690 combine_strings(prefix
, k
, keylen
, &key
);
1691 bat
.Delete(db
->default_cf
, rocksdb::Slice(key
));
1695 void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string
&prefix
,
1698 auto cf
= db
->get_cf_handle(prefix
, k
);
1700 bat
.SingleDelete(cf
, k
);
1702 bat
.SingleDelete(db
->default_cf
, combine_strings(prefix
, k
));
1706 void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string
&prefix
)
1708 auto p_iter
= db
->cf_handles
.find(prefix
);
1709 if (p_iter
== db
->cf_handles
.end()) {
1710 uint64_t cnt
= db
->get_delete_range_threshold();
1712 auto it
= db
->get_iterator(prefix
);
1713 for (it
->seek_to_first(); it
->valid() && (--cnt
) != 0; it
->next()) {
1714 bat
.Delete(db
->default_cf
, combine_strings(prefix
, it
->key()));
1717 bat
.RollbackToSavePoint();
1718 string endprefix
= prefix
;
1719 endprefix
.push_back('\x01');
1720 bat
.DeleteRange(db
->default_cf
,
1721 combine_strings(prefix
, string()),
1722 combine_strings(endprefix
, string()));
1727 ceph_assert(p_iter
->second
.handles
.size() >= 1);
1728 for (auto cf
: p_iter
->second
.handles
) {
1729 uint64_t cnt
= db
->get_delete_range_threshold();
1731 auto it
= db
->new_shard_iterator(cf
);
1732 for (it
->seek_to_first(); it
->valid() && (--cnt
) != 0; it
->next()) {
1733 bat
.Delete(cf
, it
->key());
1736 bat
.RollbackToSavePoint();
1737 string endprefix
= "\xff\xff\xff\xff"; // FIXME: this is cheating...
1738 bat
.DeleteRange(cf
, string(), endprefix
);
1746 void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string
&prefix
,
1747 const string
&start
,
1750 ldout(db
->cct
, 10) << __func__
1751 << " enter prefix=" << prefix
1752 << " start=" << pretty_binary_string(start
)
1753 << " end=" << pretty_binary_string(end
) << dendl
;
1754 auto p_iter
= db
->cf_handles
.find(prefix
);
1755 uint64_t cnt
= db
->get_delete_range_threshold();
1756 if (p_iter
== db
->cf_handles
.end()) {
1757 uint64_t cnt0
= cnt
;
1759 auto it
= db
->get_iterator(prefix
);
1760 for (it
->lower_bound(start
);
1761 it
->valid() && db
->comparator
->Compare(it
->key(), end
) < 0 && (--cnt
) != 0;
1763 bat
.Delete(db
->default_cf
, combine_strings(prefix
, it
->key()));
1765 ldout(db
->cct
, 15) << __func__
1766 << " count = " << cnt0
- cnt
1769 ldout(db
->cct
, 10) << __func__
<< " p_iter == end(), resorting to DeleteRange"
1771 bat
.RollbackToSavePoint();
1772 bat
.DeleteRange(db
->default_cf
,
1773 rocksdb::Slice(combine_strings(prefix
, start
)),
1774 rocksdb::Slice(combine_strings(prefix
, end
)));
1778 } else if (cnt
== 0) {
1779 ceph_assert(p_iter
->second
.handles
.size() >= 1);
1780 for (auto cf
: p_iter
->second
.handles
) {
1781 ldout(db
->cct
, 10) << __func__
<< " p_iter != end(), resorting to DeleteRange"
1783 bat
.DeleteRange(cf
, rocksdb::Slice(start
), rocksdb::Slice(end
));
1786 auto bounds
= KeyValueDB::IteratorBounds();
1787 bounds
.lower_bound
= start
;
1788 bounds
.upper_bound
= end
;
1789 ceph_assert(p_iter
->second
.handles
.size() >= 1);
1790 for (auto cf
: p_iter
->second
.handles
) {
1791 cnt
= db
->get_delete_range_threshold();
1792 uint64_t cnt0
= cnt
;
1794 auto it
= db
->new_shard_iterator(cf
, prefix
, bounds
);
1795 for (it
->lower_bound(start
);
1796 it
->valid() && (--cnt
) != 0;
1798 bat
.Delete(cf
, it
->key());
1800 ldout(db
->cct
, 10) << __func__
1801 << " count = " << cnt0
- cnt
1804 ldout(db
->cct
, 10) << __func__
<< " p_iter != end(), resorting to DeleteRange"
1806 bat
.RollbackToSavePoint();
1807 bat
.DeleteRange(cf
, rocksdb::Slice(start
), rocksdb::Slice(end
));
1813 ldout(db
->cct
, 10) << __func__
<< " end" << dendl
;
1816 void RocksDBStore::RocksDBTransactionImpl::merge(
1817 const string
&prefix
,
1819 const bufferlist
&to_set_bl
)
1821 auto cf
= db
->get_cf_handle(prefix
, k
);
1823 // bufferlist::c_str() is non-constant, so we can't call c_str()
1824 if (to_set_bl
.is_contiguous() && to_set_bl
.length() > 0) {
1828 rocksdb::Slice(to_set_bl
.buffers().front().c_str(), to_set_bl
.length()));
1831 rocksdb::Slice
key_slice(k
);
1832 vector
<rocksdb::Slice
> value_slices(to_set_bl
.get_num_buffers());
1833 bat
.Merge(cf
, rocksdb::SliceParts(&key_slice
, 1),
1834 prepare_sliceparts(to_set_bl
, &value_slices
));
1837 string key
= combine_strings(prefix
, k
);
1838 // bufferlist::c_str() is non-constant, so we can't call c_str()
1839 if (to_set_bl
.is_contiguous() && to_set_bl
.length() > 0) {
1842 rocksdb::Slice(key
),
1843 rocksdb::Slice(to_set_bl
.buffers().front().c_str(), to_set_bl
.length()));
1846 rocksdb::Slice
key_slice(key
);
1847 vector
<rocksdb::Slice
> value_slices(to_set_bl
.get_num_buffers());
1850 rocksdb::SliceParts(&key_slice
, 1),
1851 prepare_sliceparts(to_set_bl
, &value_slices
));
1856 int RocksDBStore::get(
1857 const string
&prefix
,
1858 const std::set
<string
> &keys
,
1859 std::map
<string
, bufferlist
> *out
)
1861 rocksdb::PinnableSlice value
;
1862 utime_t start
= ceph_clock_now();
1863 if (cf_handles
.count(prefix
) > 0) {
1864 for (auto& key
: keys
) {
1865 auto cf_handle
= get_cf_handle(prefix
, key
);
1866 auto status
= db
->Get(rocksdb::ReadOptions(),
1868 rocksdb::Slice(key
),
1871 (*out
)[key
].append(value
.data(), value
.size());
1872 } else if (status
.IsIOError()) {
1873 ceph_abort_msg(status
.getState());
1878 for (auto& key
: keys
) {
1879 string k
= combine_strings(prefix
, key
);
1880 auto status
= db
->Get(rocksdb::ReadOptions(),
1885 (*out
)[key
].append(value
.data(), value
.size());
1886 } else if (status
.IsIOError()) {
1887 ceph_abort_msg(status
.getState());
1892 utime_t lat
= ceph_clock_now() - start
;
1893 logger
->tinc(l_rocksdb_get_latency
, lat
);
1897 int RocksDBStore::get(
1898 const string
&prefix
,
1902 ceph_assert(out
&& (out
->length() == 0));
1903 utime_t start
= ceph_clock_now();
1905 rocksdb::PinnableSlice value
;
1907 auto cf
= get_cf_handle(prefix
, key
);
1909 s
= db
->Get(rocksdb::ReadOptions(),
1911 rocksdb::Slice(key
),
1914 string k
= combine_strings(prefix
, key
);
1915 s
= db
->Get(rocksdb::ReadOptions(),
1921 out
->append(value
.data(), value
.size());
1922 } else if (s
.IsNotFound()) {
1925 ceph_abort_msg(s
.getState());
1927 utime_t lat
= ceph_clock_now() - start
;
1928 logger
->tinc(l_rocksdb_get_latency
, lat
);
1932 int RocksDBStore::get(
1933 const string
& prefix
,
1938 ceph_assert(out
&& (out
->length() == 0));
1939 utime_t start
= ceph_clock_now();
1941 rocksdb::PinnableSlice value
;
1943 auto cf
= get_cf_handle(prefix
, key
, keylen
);
1945 s
= db
->Get(rocksdb::ReadOptions(),
1947 rocksdb::Slice(key
, keylen
),
1951 combine_strings(prefix
, key
, keylen
, &k
);
1952 s
= db
->Get(rocksdb::ReadOptions(),
1958 out
->append(value
.data(), value
.size());
1959 } else if (s
.IsNotFound()) {
1962 ceph_abort_msg(s
.getState());
1964 utime_t lat
= ceph_clock_now() - start
;
1965 logger
->tinc(l_rocksdb_get_latency
, lat
);
1969 int RocksDBStore::split_key(rocksdb::Slice in
, string
*prefix
, string
*key
)
1971 size_t prefix_len
= 0;
1973 // Find separator inside Slice
1974 char* separator
= (char*) memchr(in
.data(), 0, in
.size());
1975 if (separator
== NULL
)
1977 prefix_len
= size_t(separator
- in
.data());
1978 if (prefix_len
>= in
.size())
1981 // Fetch prefix and/or key directly from Slice
1983 *prefix
= string(in
.data(), prefix_len
);
1985 *key
= string(separator
+1, in
.size()-prefix_len
-1);
1989 void RocksDBStore::compact()
1991 logger
->inc(l_rocksdb_compact
);
1992 rocksdb::CompactRangeOptions options
;
1993 db
->CompactRange(options
, default_cf
, nullptr, nullptr);
1994 for (auto cf
: cf_handles
) {
1995 for (auto shard_cf
: cf
.second
.handles
) {
2004 void RocksDBStore::compact_thread_entry()
2006 std::unique_lock l
{compact_queue_lock
};
2007 dout(10) << __func__
<< " enter" << dendl
;
2008 while (!compact_queue_stop
) {
2009 if (!compact_queue
.empty()) {
2010 auto range
= compact_queue
.front();
2011 compact_queue
.pop_front();
2012 logger
->set(l_rocksdb_compact_queue_len
, compact_queue
.size());
2014 logger
->inc(l_rocksdb_compact_range
);
2015 if (range
.first
.empty() && range
.second
.empty()) {
2018 compact_range(range
.first
, range
.second
);
2023 dout(10) << __func__
<< " waiting" << dendl
;
2024 compact_queue_cond
.wait(l
);
2026 dout(10) << __func__
<< " exit" << dendl
;
2029 void RocksDBStore::compact_range_async(const string
& start
, const string
& end
)
2031 std::lock_guard
l(compact_queue_lock
);
2033 // try to merge adjacent ranges. this is O(n), but the queue should
2034 // be short. note that we do not cover all overlap cases and merge
2035 // opportunities here, but we capture the ones we currently need.
2036 list
< pair
<string
,string
> >::iterator p
= compact_queue
.begin();
2037 while (p
!= compact_queue
.end()) {
2038 if (p
->first
== start
&& p
->second
== end
) {
2042 if (start
<= p
->first
&& p
->first
<= end
) {
2043 // new region crosses start of existing range
2044 // select right bound that is bigger
2045 compact_queue
.push_back(make_pair(start
, end
> p
->second
? end
: p
->second
));
2046 compact_queue
.erase(p
);
2047 logger
->inc(l_rocksdb_compact_queue_merge
);
2050 if (start
<= p
->second
&& p
->second
<= end
) {
2051 // new region crosses end of existing range
2052 //p->first < p->second and p->second <= end, so p->first <= end.
2053 //But we break if previous condition, so start > p->first.
2054 compact_queue
.push_back(make_pair(p
->first
, end
));
2055 compact_queue
.erase(p
);
2056 logger
->inc(l_rocksdb_compact_queue_merge
);
2061 if (p
== compact_queue
.end()) {
2062 // no merge, new entry.
2063 compact_queue
.push_back(make_pair(start
, end
));
2064 logger
->set(l_rocksdb_compact_queue_len
, compact_queue
.size());
2066 compact_queue_cond
.notify_all();
2067 if (!compact_thread
.is_started()) {
2068 compact_thread
.create("rstore_compact");
2071 bool RocksDBStore::check_omap_dir(string
&omap_dir
)
2073 rocksdb::Options options
;
2074 options
.create_if_missing
= true;
2076 rocksdb::Status status
= rocksdb::DB::Open(options
, omap_dir
, &db
);
2082 void RocksDBStore::compact_range(const string
& start
, const string
& end
)
2084 rocksdb::CompactRangeOptions options
;
2085 rocksdb::Slice
cstart(start
);
2086 rocksdb::Slice
cend(end
);
2087 string prefix_start
, key_start
;
2088 string prefix_end
, key_end
;
2089 string key_highest
= "\xff\xff\xff\xff"; //cheating
2090 string key_lowest
= "";
2092 auto compact_range
= [&] (const decltype(cf_handles
)::iterator column_it
,
2093 const std::string
& start
,
2094 const std::string
& end
) {
2095 rocksdb::Slice
cstart(start
);
2096 rocksdb::Slice
cend(end
);
2097 for (const auto& shard_it
: column_it
->second
.handles
) {
2098 db
->CompactRange(options
, shard_it
, &cstart
, &cend
);
2101 db
->CompactRange(options
, default_cf
, &cstart
, &cend
);
2102 split_key(cstart
, &prefix_start
, &key_start
);
2103 split_key(cend
, &prefix_end
, &key_end
);
2104 if (prefix_start
== prefix_end
) {
2105 const auto& column
= cf_handles
.find(prefix_start
);
2106 if (column
!= cf_handles
.end()) {
2107 compact_range(column
, key_start
, key_end
);
2110 auto column
= cf_handles
.find(prefix_start
);
2111 if (column
!= cf_handles
.end()) {
2112 compact_range(column
, key_start
, key_highest
);
2115 const auto& column_end
= cf_handles
.find(prefix_end
);
2116 while (column
!= column_end
) {
2117 compact_range(column
, key_lowest
, key_highest
);
2120 if (column
!= cf_handles
.end()) {
2121 compact_range(column
, key_lowest
, key_end
);
2126 RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl()
2130 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first()
2132 dbiter
->SeekToFirst();
2133 ceph_assert(!dbiter
->status().IsIOError());
2134 return dbiter
->status().ok() ? 0 : -1;
2136 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string
&prefix
)
2138 rocksdb::Slice
slice_prefix(prefix
);
2139 dbiter
->Seek(slice_prefix
);
2140 ceph_assert(!dbiter
->status().IsIOError());
2141 return dbiter
->status().ok() ? 0 : -1;
2143 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last()
2145 dbiter
->SeekToLast();
2146 ceph_assert(!dbiter
->status().IsIOError());
2147 return dbiter
->status().ok() ? 0 : -1;
2149 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string
&prefix
)
2151 string limit
= past_prefix(prefix
);
2152 rocksdb::Slice
slice_limit(limit
);
2153 dbiter
->Seek(slice_limit
);
2155 if (!dbiter
->Valid()) {
2156 dbiter
->SeekToLast();
2160 return dbiter
->status().ok() ? 0 : -1;
2162 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string
&prefix
, const string
&after
)
2164 lower_bound(prefix
, after
);
2166 pair
<string
,string
> key
= raw_key();
2167 if (key
.first
== prefix
&& key
.second
== after
)
2170 return dbiter
->status().ok() ? 0 : -1;
2172 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string
&prefix
, const string
&to
)
2174 string bound
= combine_strings(prefix
, to
);
2175 rocksdb::Slice
slice_bound(bound
);
2176 dbiter
->Seek(slice_bound
);
2177 return dbiter
->status().ok() ? 0 : -1;
2179 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid()
2181 return dbiter
->Valid();
2183 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next()
2188 ceph_assert(!dbiter
->status().IsIOError());
2189 return dbiter
->status().ok() ? 0 : -1;
2191 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev()
2196 ceph_assert(!dbiter
->status().IsIOError());
2197 return dbiter
->status().ok() ? 0 : -1;
2199 string
RocksDBStore::RocksDBWholeSpaceIteratorImpl::key()
2202 split_key(dbiter
->key(), 0, &out_key
);
2205 pair
<string
,string
> RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key()
2208 split_key(dbiter
->key(), &prefix
, &key
);
2209 return make_pair(prefix
, key
);
2212 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key_is_prefixed(const string
&prefix
) {
2213 // Look for "prefix\0" right in rocksb::Slice
2214 rocksdb::Slice key
= dbiter
->key();
2215 if ((key
.size() > prefix
.length()) && (key
[prefix
.length()] == '\0')) {
2216 return memcmp(key
.data(), prefix
.c_str(), prefix
.length()) == 0;
2222 bufferlist
RocksDBStore::RocksDBWholeSpaceIteratorImpl::value()
2224 return to_bufferlist(dbiter
->value());
2227 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::key_size()
2229 return dbiter
->key().size();
2232 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_size()
2234 return dbiter
->value().size();
2237 bufferptr
RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_as_ptr()
2239 rocksdb::Slice val
= dbiter
->value();
2240 return bufferptr(val
.data(), val
.size());
2243 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status()
2245 return dbiter
->status().ok() ? 0 : -1;
2248 string
RocksDBStore::past_prefix(const string
&prefix
)
2250 string limit
= prefix
;
2255 class CFIteratorImpl
: public KeyValueDB::IteratorImpl
{
2258 rocksdb::Iterator
*dbiter
;
2259 const KeyValueDB::IteratorBounds bounds
;
2260 const rocksdb::Slice iterate_lower_bound
;
2261 const rocksdb::Slice iterate_upper_bound
;
2263 explicit CFIteratorImpl(const RocksDBStore
* db
,
2264 const std::string
& p
,
2265 rocksdb::ColumnFamilyHandle
* cf
,
2266 KeyValueDB::IteratorBounds bounds_
)
2267 : prefix(p
), bounds(std::move(bounds_
)),
2268 iterate_lower_bound(make_slice(bounds
.lower_bound
)),
2269 iterate_upper_bound(make_slice(bounds
.upper_bound
))
2271 auto options
= rocksdb::ReadOptions();
2272 if (db
->cct
->_conf
->osd_rocksdb_iterator_bounds_enabled
) {
2273 if (bounds
.lower_bound
) {
2274 options
.iterate_lower_bound
= &iterate_lower_bound
;
2276 if (bounds
.upper_bound
) {
2277 options
.iterate_upper_bound
= &iterate_upper_bound
;
2280 dbiter
= db
->db
->NewIterator(options
, cf
);
2286 int seek_to_first() override
{
2287 dbiter
->SeekToFirst();
2288 return dbiter
->status().ok() ? 0 : -1;
2290 int seek_to_last() override
{
2291 dbiter
->SeekToLast();
2292 return dbiter
->status().ok() ? 0 : -1;
2294 int upper_bound(const string
&after
) override
{
2296 if (valid() && (key() == after
)) {
2299 return dbiter
->status().ok() ? 0 : -1;
2301 int lower_bound(const string
&to
) override
{
2302 rocksdb::Slice
slice_bound(to
);
2303 dbiter
->Seek(slice_bound
);
2304 return dbiter
->status().ok() ? 0 : -1;
2306 int next() override
{
2310 return dbiter
->status().ok() ? 0 : -1;
2312 int prev() override
{
2316 return dbiter
->status().ok() ? 0 : -1;
2318 bool valid() override
{
2319 return dbiter
->Valid();
2321 string
key() override
{
2322 return dbiter
->key().ToString();
2324 std::pair
<std::string
, std::string
> raw_key() override
{
2325 return make_pair(prefix
, key());
2327 bufferlist
value() override
{
2328 return to_bufferlist(dbiter
->value());
2330 bufferptr
value_as_ptr() override
{
2331 rocksdb::Slice val
= dbiter
->value();
2332 return bufferptr(val
.data(), val
.size());
2334 int status() override
{
2335 return dbiter
->status().ok() ? 0 : -1;
2340 //merge column iterators and rest iterator
2341 class WholeMergeIteratorImpl
: public KeyValueDB::WholeSpaceIteratorImpl
{
2344 KeyValueDB::WholeSpaceIterator main
;
2345 std::map
<std::string
, KeyValueDB::Iterator
> shards
;
2346 std::map
<std::string
, KeyValueDB::Iterator
>::iterator current_shard
;
2347 enum {on_main
, on_shard
} smaller
;
2350 WholeMergeIteratorImpl(RocksDBStore
* db
)
2352 , main(db
->get_default_cf_iterator())
2354 for (auto& e
: db
->cf_handles
) {
2355 shards
.emplace(e
.first
, db
->get_iterator(e
.first
));
2359 // returns true if value in main is smaller then in shards
2360 // invalid is larger then actual value
2361 bool is_main_smaller() {
2362 if (main
->valid()) {
2363 if (current_shard
!= shards
.end()) {
2364 auto main_rk
= main
->raw_key();
2365 ceph_assert(current_shard
->second
->valid());
2366 auto shards_rk
= current_shard
->second
->raw_key();
2367 if (main_rk
.first
< shards_rk
.first
)
2369 if (main_rk
.first
> shards_rk
.first
)
2371 return main_rk
.second
< shards_rk
.second
;
2376 if (current_shard
!= shards
.end()) {
2379 //this means that neither is valid
2380 //we select main to be smaller, so valid() will signal properly
2386 int seek_to_first() override
{
2387 int r0
= main
->seek_to_first();
2389 // find first shard that has some data
2390 current_shard
= shards
.begin();
2391 while (current_shard
!= shards
.end()) {
2392 r1
= current_shard
->second
->seek_to_first();
2393 if (r1
!= 0 || current_shard
->second
->valid()) {
2394 //this is the first shard that will yield some keys
2399 smaller
= is_main_smaller() ? on_main
: on_shard
;
2400 return r0
== 0 && r1
== 0 ? 0 : -1;
2403 int seek_to_first(const std::string
&prefix
) override
{
2404 int r0
= main
->seek_to_first(prefix
);
2406 // find first shard that has some data
2407 current_shard
= shards
.lower_bound(prefix
);
2408 while (current_shard
!= shards
.end()) {
2409 r1
= current_shard
->second
->seek_to_first();
2410 if (r1
!= 0 || current_shard
->second
->valid()) {
2411 //this is the first shard that will yield some keys
2416 smaller
= is_main_smaller() ? on_main
: on_shard
;
2417 return r0
== 0 && r1
== 0 ? 0 : -1;
2420 int seek_to_last() override
{
2421 int r0
= main
->seek_to_last();
2423 r1
= shards_seek_to_last();
2424 //if we have 2 candidates, we need to select
2425 if (main
->valid()) {
2426 if (shards_valid()) {
2427 if (is_main_smaller()) {
2438 if (shards_valid()) {
2444 return r0
== 0 && r1
== 0 ? 0 : -1;
2447 int seek_to_last(const std::string
&prefix
) override
{
2448 int r0
= main
->seek_to_last(prefix
);
2450 // find last shard that has some data
2452 current_shard
= shards
.lower_bound(prefix
);
2453 while (current_shard
!= shards
.begin()) {
2454 r1
= current_shard
->second
->seek_to_last();
2457 if (current_shard
->second
->valid()) {
2462 //if we have 2 candidates, we need to select
2463 if (main
->valid() && found
) {
2464 if (is_main_smaller()) {
2471 //set shards state that properly represents eof
2472 current_shard
= shards
.end();
2474 smaller
= is_main_smaller() ? on_main
: on_shard
;
2475 return r0
== 0 && r1
== 0 ? 0 : -1;
2478 int upper_bound(const std::string
&prefix
, const std::string
&after
) override
{
2479 int r0
= main
->upper_bound(prefix
, after
);
2483 current_shard
= shards
.lower_bound(prefix
);
2484 if (current_shard
!= shards
.end()) {
2485 bool located
= false;
2486 if (current_shard
->first
== prefix
) {
2487 r1
= current_shard
->second
->upper_bound(after
);
2490 if (current_shard
->second
->valid()) {
2495 while (current_shard
!= shards
.end()) {
2496 r1
= current_shard
->second
->seek_to_first();
2499 if (current_shard
->second
->valid())
2505 smaller
= is_main_smaller() ? on_main
: on_shard
;
2509 int lower_bound(const std::string
&prefix
, const std::string
&to
) override
{
2510 int r0
= main
->lower_bound(prefix
, to
);
2514 current_shard
= shards
.lower_bound(prefix
);
2515 if (current_shard
!= shards
.end()) {
2516 bool located
= false;
2517 if (current_shard
->first
== prefix
) {
2518 r1
= current_shard
->second
->lower_bound(to
);
2521 if (current_shard
->second
->valid()) {
2526 while (current_shard
!= shards
.end()) {
2527 r1
= current_shard
->second
->seek_to_first();
2530 if (current_shard
->second
->valid())
2536 smaller
= is_main_smaller() ? on_main
: on_shard
;
2540 bool valid() override
{
2541 if (smaller
== on_main
) {
2542 return main
->valid();
2544 if (current_shard
== shards
.end())
2546 return current_shard
->second
->valid();
2550 int next() override
{
2552 if (smaller
== on_main
) {
2559 smaller
= is_main_smaller() ? on_main
: on_shard
;
2563 int prev() override
{
2565 bool main_was_valid
= false;
2566 if (main
->valid()) {
2567 main_was_valid
= true;
2570 r
= main
->seek_to_last();
2575 bool shards_was_valid
= false;
2576 if (shards_valid()) {
2577 shards_was_valid
= true;
2580 r
= shards_seek_to_last();
2585 if (!main
->valid() && !shards_valid()) {
2586 //end, no previous. set marker so valid() can work
2591 //if 1 is valid, select it
2592 //if 2 are valid select larger and advance the other
2593 if (main
->valid()) {
2594 if (shards_valid()) {
2595 if (is_main_smaller()) {
2597 if (main_was_valid
) {
2598 if (main
->valid()) {
2601 r
= main
->seek_to_first();
2604 //if we have resurrected main, kill it
2605 if (main
->valid()) {
2611 if (shards_was_valid
) {
2612 if (shards_valid()) {
2615 r
= shards_seek_to_first();
2618 //if we have resurected shards, kill it
2619 if (shards_valid()) {
2626 r
= shards_seek_to_first();
2630 r
= main
->seek_to_first();
2635 std::string
key() override
2637 if (smaller
== on_main
) {
2640 return current_shard
->second
->key();
2644 std::pair
<std::string
,std::string
> raw_key() override
2646 if (smaller
== on_main
) {
2647 return main
->raw_key();
2649 return { current_shard
->first
, current_shard
->second
->key() };
2653 bool raw_key_is_prefixed(const std::string
&prefix
) override
2655 if (smaller
== on_main
) {
2656 return main
->raw_key_is_prefixed(prefix
);
2658 return current_shard
->first
== prefix
;
2662 ceph::buffer::list
value() override
2664 if (smaller
== on_main
) {
2665 return main
->value();
2667 return current_shard
->second
->value();
2671 int status() override
2673 //because we already had to inspect key, it must be ok
2677 size_t key_size() override
2679 if (smaller
== on_main
) {
2680 return main
->key_size();
2682 return current_shard
->second
->key().size();
2685 size_t value_size() override
2687 if (smaller
== on_main
) {
2688 return main
->value_size();
2690 return current_shard
->second
->value().length();
2694 int shards_valid() {
2695 if (current_shard
== shards
.end())
2697 return current_shard
->second
->valid();
2701 if (current_shard
== shards
.end()) {
2702 //illegal to next() on !valid()
2706 r
= current_shard
->second
->next();
2709 if (current_shard
->second
->valid())
2711 //current shard exhaused, search for key
2713 while (current_shard
!= shards
.end()) {
2714 r
= current_shard
->second
->seek_to_first();
2717 if (current_shard
->second
->valid())
2721 //either we found key or not, but it is success
2726 if (current_shard
== shards
.end()) {
2727 //illegal to prev() on !valid()
2730 int r
= current_shard
->second
->prev();
2732 if (current_shard
->second
->valid()) {
2735 if (current_shard
== shards
.begin()) {
2736 //we have reached pre-first element
2737 //this makes it !valid(), but guarantees next() moves to first element
2741 r
= current_shard
->second
->seek_to_last();
2746 int shards_seek_to_last() {
2748 current_shard
= shards
.end();
2749 if (current_shard
== shards
.begin()) {
2753 while (current_shard
!= shards
.begin()) {
2755 r
= current_shard
->second
->seek_to_last();
2758 if (current_shard
->second
->valid()) {
2763 current_shard
= shards
.end();
2767 int shards_seek_to_first() {
2769 current_shard
= shards
.begin();
2770 while (current_shard
!= shards
.end()) {
2771 r
= current_shard
->second
->seek_to_first();
2774 if (current_shard
->second
->valid()) {
2775 //this is the first shard that will yield some keys
2784 class ShardMergeIteratorImpl
: public KeyValueDB::IteratorImpl
{
2788 const rocksdb::Comparator
* comparator
;
2790 KeyLess(const rocksdb::Comparator
* comparator
) : comparator(comparator
) { };
2792 bool operator()(rocksdb::Iterator
* a
, rocksdb::Iterator
* b
) const
2796 return comparator
->Compare(a
->key(), b
->key()) < 0;
2810 const RocksDBStore
* db
;
2813 const KeyValueDB::IteratorBounds bounds
;
2814 const rocksdb::Slice iterate_lower_bound
;
2815 const rocksdb::Slice iterate_upper_bound
;
2816 std::vector
<rocksdb::Iterator
*> iters
;
2818 explicit ShardMergeIteratorImpl(const RocksDBStore
* db
,
2819 const std::string
& prefix
,
2820 const std::vector
<rocksdb::ColumnFamilyHandle
*>& shards
,
2821 KeyValueDB::IteratorBounds bounds_
)
2822 : db(db
), keyless(db
->comparator
), prefix(prefix
), bounds(std::move(bounds_
)),
2823 iterate_lower_bound(make_slice(bounds
.lower_bound
)),
2824 iterate_upper_bound(make_slice(bounds
.upper_bound
))
2826 iters
.reserve(shards
.size());
2827 auto options
= rocksdb::ReadOptions();
2828 if (db
->cct
->_conf
->osd_rocksdb_iterator_bounds_enabled
) {
2829 if (bounds
.lower_bound
) {
2830 options
.iterate_lower_bound
= &iterate_lower_bound
;
2832 if (bounds
.upper_bound
) {
2833 options
.iterate_upper_bound
= &iterate_upper_bound
;
2836 for (auto& s
: shards
) {
2837 iters
.push_back(db
->db
->NewIterator(options
, s
));
2840 ~ShardMergeIteratorImpl() {
2841 for (auto& it
: iters
) {
2845 int seek_to_first() override
{
2846 for (auto& it
: iters
) {
2848 if (!it
->status().ok()) {
2852 //all iterators seeked, sort
2853 std::sort(iters
.begin(), iters
.end(), keyless
);
2856 int seek_to_last() override
{
2857 for (auto& it
: iters
) {
2859 if (!it
->status().ok()) {
2863 for (size_t i
= 1; i
< iters
.size(); i
++) {
2864 if (iters
[0]->Valid()) {
2865 if (iters
[i
]->Valid()) {
2866 if (keyless(iters
[0], iters
[i
])) {
2867 std::swap(iters
[0], iters
[i
]);
2873 if (iters
[i
]->Valid()) {
2874 std::swap(iters
[0], iters
[i
]);
2877 //it might happen that cf was empty
2878 if (iters
[i
]->Valid()) {
2882 //no need to sort, as at most 1 iterator is valid now
2885 int upper_bound(const string
&after
) override
{
2886 rocksdb::Slice
slice_bound(after
);
2887 for (auto& it
: iters
) {
2888 it
->Seek(slice_bound
);
2889 if (it
->Valid() && it
->key() == after
) {
2892 if (!it
->status().ok()) {
2896 std::sort(iters
.begin(), iters
.end(), keyless
);
2899 int lower_bound(const string
&to
) override
{
2900 rocksdb::Slice
slice_bound(to
);
2901 for (auto& it
: iters
) {
2902 it
->Seek(slice_bound
);
2903 if (!it
->status().ok()) {
2907 std::sort(iters
.begin(), iters
.end(), keyless
);
2910 int next() override
{
2912 if (iters
[0]->Valid()) {
2914 if (iters
[0]->status().ok()) {
2917 for (size_t i
= 0; i
< iters
.size() - 1; i
++) {
2918 if (keyless(iters
[i
], iters
[i
+ 1])) {
2922 std::swap(iters
[i
], iters
[i
+ 1]);
2928 // iters are sorted, so
2929 // a[0] < b[0] < c[0] < d[0]
2930 // a[0] > a[-1], a[0] > b[-1], a[0] > c[-1], a[0] > d[-1]
2931 // so, prev() will be one of:
2932 // a[-1], b[-1], c[-1], d[-1]
2933 // prev() will be the one that is *largest* of them
2936 // 1. go prev() on each iterator we can
2937 // 2. select largest key from those iterators
2938 // 3. go next() on all iterators except (2)
2940 int prev() override
{
2941 std::vector
<rocksdb::Iterator
*> prev_done
;
2943 for (auto it
: iters
) {
2947 prev_done
.push_back(it
);
2954 prev_done
.push_back(it
);
2958 if (prev_done
.size() == 0) {
2959 /* there is no previous element */
2960 if (iters
[0]->Valid()) {
2962 ceph_assert(!iters
[0]->Valid());
2967 rocksdb::Iterator
* highest
= prev_done
[0];
2968 for (size_t i
= 1; i
< prev_done
.size(); i
++) {
2969 if (keyless(highest
, prev_done
[i
])) {
2971 highest
= prev_done
[i
];
2973 prev_done
[i
]->Next();
2977 //insert highest in the beginning, and shift values until we pick highest
2978 //untouched rest is sorted - we just prev()/next() them
2979 rocksdb::Iterator
* hold
= highest
;
2980 for (size_t i
= 0; i
< iters
.size(); i
++) {
2981 std::swap(hold
, iters
[i
]);
2982 if (hold
== highest
) break;
2984 ceph_assert(hold
== highest
);
2987 bool valid() override
{
2988 return iters
[0]->Valid();
2990 string
key() override
{
2991 return iters
[0]->key().ToString();
2993 std::pair
<std::string
, std::string
> raw_key() override
{
2994 return make_pair(prefix
, key());
2996 bufferlist
value() override
{
2997 return to_bufferlist(iters
[0]->value());
2999 bufferptr
value_as_ptr() override
{
3000 rocksdb::Slice val
= iters
[0]->value();
3001 return bufferptr(val
.data(), val
.size());
3003 int status() override
{
3004 return iters
[0]->status().ok() ? 0 : -1;
3008 KeyValueDB::Iterator
RocksDBStore::get_iterator(const std::string
& prefix
, IteratorOpts opts
, IteratorBounds bounds
)
3010 auto cf_it
= cf_handles
.find(prefix
);
3011 if (cf_it
!= cf_handles
.end()) {
3012 rocksdb::ColumnFamilyHandle
* cf
= nullptr;
3013 if (cf_it
->second
.handles
.size() == 1) {
3014 cf
= cf_it
->second
.handles
[0];
3015 } else if (cct
->_conf
->osd_rocksdb_iterator_bounds_enabled
) {
3016 cf
= check_cf_handle_bounds(cf_it
, bounds
);
3019 return std::make_shared
<CFIteratorImpl
>(
3025 return std::make_shared
<ShardMergeIteratorImpl
>(
3028 cf_it
->second
.handles
,
3032 // use wholespace engine if no cfs are configured
3033 // or use default cf otherwise as there is no
3034 // matching cf for the specified prefix.
3035 auto w_it
= cf_handles
.size() == 0 || prefix
.empty() ?
3036 get_wholespace_iterator(opts
) :
3037 get_default_cf_iterator();
3038 return KeyValueDB::make_iterator(prefix
, w_it
);
3042 RocksDBStore::WholeSpaceIterator
RocksDBStore::new_shard_iterator(rocksdb::ColumnFamilyHandle
* cf
)
3044 return std::make_shared
<RocksDBWholeSpaceIteratorImpl
>(
3050 KeyValueDB::Iterator
RocksDBStore::new_shard_iterator(rocksdb::ColumnFamilyHandle
* cf
,
3051 const std::string
& prefix
,
3052 IteratorBounds bounds
)
3054 return std::make_shared
<CFIteratorImpl
>(
3061 RocksDBStore::WholeSpaceIterator
RocksDBStore::get_wholespace_iterator(IteratorOpts opts
)
3063 if (cf_handles
.size() == 0) {
3064 return std::make_shared
<RocksDBWholeSpaceIteratorImpl
>(
3065 this, default_cf
, opts
);
3067 return std::make_shared
<WholeMergeIteratorImpl
>(this);
3071 RocksDBStore::WholeSpaceIterator
RocksDBStore::get_default_cf_iterator()
3073 return std::make_shared
<RocksDBWholeSpaceIteratorImpl
>(this, default_cf
, 0);
3076 int RocksDBStore::prepare_for_reshard(const std::string
& new_sharding
,
3077 RocksDBStore::columns_t
& to_process_columns
)
3079 //0. lock db from opening
3080 //1. list existing columns
3081 //2. apply merge operator to (main + columns) opts
3082 //3. prepare std::vector<rocksdb::ColumnFamilyDescriptor> existing_cfs
3083 //4. open db, acquire existing column handles
3084 //5. calculate missing columns
3085 //6. create missing columns
3086 //7. construct cf_handles according to new sharding
3087 //8. check is all cf_handles are filled
3090 std::vector
<ColumnFamily
> new_sharding_def
;
3091 char const* error_position
;
3092 std::string error_msg
;
3093 b
= parse_sharding_def(new_sharding
, new_sharding_def
, &error_position
, &error_msg
);
3095 dout(1) << __func__
<< " bad sharding: " << dendl
;
3096 dout(1) << __func__
<< new_sharding
<< dendl
;
3097 dout(1) << __func__
<< std::string(error_position
- &new_sharding
[0], ' ') << "^" << error_msg
<< dendl
;
3101 //0. lock db from opening
3102 std::string stored_sharding_text
;
3103 rocksdb::ReadFileToString(env
,
3105 &stored_sharding_text
);
3106 if (stored_sharding_text
.find(resharding_column_lock
) == string::npos
) {
3107 rocksdb::Status status
;
3108 if (stored_sharding_text
.size() != 0)
3109 stored_sharding_text
+= " ";
3110 stored_sharding_text
+= resharding_column_lock
;
3111 env
->CreateDir(sharding_def_dir
);
3112 status
= rocksdb::WriteStringToFile(env
, stored_sharding_text
,
3113 sharding_def_file
, true);
3115 derr
<< __func__
<< " cannot write to " << sharding_def_file
<< dendl
;
3120 //1. list existing columns
3122 rocksdb::Status status
;
3123 std::vector
<std::string
> existing_columns
;
3124 rocksdb::Options opt
;
3125 int r
= load_rocksdb_options(false, opt
);
3127 dout(1) << __func__
<< " load rocksdb options failed" << dendl
;
3130 status
= rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(opt
), path
, &existing_columns
);
3132 derr
<< "Unable to list column families: " << status
.ToString() << dendl
;
3135 dout(5) << "existing columns = " << existing_columns
<< dendl
;
3137 //2. apply merge operator to (main + columns) opts
3138 //3. prepare std::vector<rocksdb::ColumnFamilyDescriptor> cfs_to_open
3140 std::vector
<rocksdb::ColumnFamilyDescriptor
> cfs_to_open
;
3141 for (const auto& full_name
: existing_columns
) {
3142 //split col_name to <prefix>-<number>
3143 std::string base_name
;
3144 size_t pos
= full_name
.find('-');
3145 if (std::string::npos
== pos
)
3146 base_name
= full_name
;
3148 base_name
= full_name
.substr(0,pos
);
3150 rocksdb::ColumnFamilyOptions
cf_opt(opt
);
3151 // search if we have options for this column
3152 std::string options
;
3153 for (const auto& nsd
: new_sharding_def
) {
3154 if (nsd
.name
== base_name
) {
3155 options
= nsd
.options
;
3159 int r
= update_column_family_options(base_name
, options
, &cf_opt
);
3163 cfs_to_open
.emplace_back(full_name
, cf_opt
);
3166 //4. open db, acquire existing column handles
3167 std::vector
<rocksdb::ColumnFamilyHandle
*> handles
;
3168 status
= rocksdb::DB::Open(rocksdb::DBOptions(opt
),
3169 path
, cfs_to_open
, &handles
, &db
);
3171 derr
<< status
.ToString() << dendl
;
3174 for (size_t i
= 0; i
< cfs_to_open
.size(); i
++) {
3175 dout(10) << "column " << cfs_to_open
[i
].name
<< " handle " << (void*)handles
[i
] << dendl
;
3178 //5. calculate missing columns
3179 std::vector
<std::string
> new_sharding_columns
;
3180 std::vector
<std::string
> missing_columns
;
3181 sharding_def_to_columns(new_sharding_def
,
3182 new_sharding_columns
);
3183 dout(5) << "target columns = " << new_sharding_columns
<< dendl
;
3184 for (const auto& n
: new_sharding_columns
) {
3186 for (const auto& e
: existing_columns
) {
3193 missing_columns
.push_back(n
);
3196 dout(5) << "missing columns = " << missing_columns
<< dendl
;
3198 //6. create missing columns
3199 for (const auto& full_name
: missing_columns
) {
3200 std::string base_name
;
3201 size_t pos
= full_name
.find('-');
3202 if (std::string::npos
== pos
)
3203 base_name
= full_name
;
3205 base_name
= full_name
.substr(0,pos
);
3207 rocksdb::ColumnFamilyOptions
cf_opt(opt
);
3208 // search if we have options for this column
3209 std::string options
;
3210 for (const auto& nsd
: new_sharding_def
) {
3211 if (nsd
.name
== base_name
) {
3212 options
= nsd
.options
;
3216 int r
= update_column_family_options(base_name
, options
, &cf_opt
);
3220 rocksdb::ColumnFamilyHandle
*cf
;
3221 status
= db
->CreateColumnFamily(cf_opt
, full_name
, &cf
);
3223 derr
<< __func__
<< " Failed to create rocksdb column family: "
3224 << full_name
<< dendl
;
3227 dout(10) << "created column " << full_name
<< " handle = " << (void*)cf
<< dendl
;
3228 existing_columns
.push_back(full_name
);
3229 handles
.push_back(cf
);
3232 //7. construct cf_handles according to new sharding
3233 for (size_t i
= 0; i
< existing_columns
.size(); i
++) {
3234 std::string full_name
= existing_columns
[i
];
3235 rocksdb::ColumnFamilyHandle
*cf
= handles
[i
];
3236 std::string base_name
;
3237 size_t shard_idx
= 0;
3238 size_t pos
= full_name
.find('-');
3239 dout(10) << "processing column " << full_name
<< dendl
;
3240 if (std::string::npos
== pos
) {
3241 base_name
= full_name
;
3243 base_name
= full_name
.substr(0,pos
);
3244 shard_idx
= atoi(full_name
.substr(pos
+1).c_str());
3246 if (rocksdb::kDefaultColumnFamilyName
== base_name
) {
3247 default_cf
= handles
[i
];
3248 must_close_default_cf
= true;
3249 std::unique_ptr
<rocksdb::ColumnFamilyHandle
, cf_deleter_t
> ptr
{
3250 cf
, [](rocksdb::ColumnFamilyHandle
*) {}};
3251 to_process_columns
.emplace(full_name
, std::move(ptr
));
3253 for (const auto& nsd
: new_sharding_def
) {
3254 if (nsd
.name
== base_name
) {
3255 if (shard_idx
< nsd
.shard_cnt
) {
3256 add_column_family(base_name
, nsd
.hash_l
, nsd
.hash_h
, shard_idx
, cf
);
3258 //ignore columns with index larger then shard count
3263 std::unique_ptr
<rocksdb::ColumnFamilyHandle
, cf_deleter_t
> ptr
{
3264 cf
, [this](rocksdb::ColumnFamilyHandle
* handle
) {
3265 db
->DestroyColumnFamilyHandle(handle
);
3267 to_process_columns
.emplace(full_name
, std::move(ptr
));
3271 //8. check if all cf_handles are filled
3272 for (const auto& col
: cf_handles
) {
3273 for (size_t i
= 0; i
< col
.second
.handles
.size(); i
++) {
3274 if (col
.second
.handles
[i
] == nullptr) {
3275 derr
<< "missing handle for column " << col
.first
<< " shard " << i
<< dendl
;
3283 int RocksDBStore::reshard_cleanup(const RocksDBStore::columns_t
& current_columns
)
3285 std::vector
<std::string
> new_sharding_columns
;
3286 for (const auto& [name
, handle
] : cf_handles
) {
3287 if (handle
.handles
.size() == 1) {
3288 new_sharding_columns
.push_back(name
);
3290 for (size_t i
= 0; i
< handle
.handles
.size(); i
++) {
3291 new_sharding_columns
.push_back(name
+ "-" + std::to_string(i
));
3296 for (auto& [name
, handle
] : current_columns
) {
3297 auto found
= std::find(new_sharding_columns
.begin(),
3298 new_sharding_columns
.end(),
3299 name
) != new_sharding_columns
.end();
3300 if (found
|| name
== rocksdb::kDefaultColumnFamilyName
) {
3301 dout(5) << "Column " << name
<< " is part of new sharding." << dendl
;
3304 dout(5) << "Column " << name
<< " not part of new sharding. Deleting." << dendl
;
3306 // verify that column is empty
3307 std::unique_ptr
<rocksdb::Iterator
> it
{
3308 db
->NewIterator(rocksdb::ReadOptions(), handle
.get())};
3311 ceph_assert(!it
->Valid());
3313 if (rocksdb::Status status
= db
->DropColumnFamily(handle
.get()); !status
.ok()) {
3314 derr
<< __func__
<< " Failed to drop column: " << name
<< dendl
;
3321 int RocksDBStore::reshard(const std::string
& new_sharding
, const RocksDBStore::resharding_ctrl
* ctrl_in
)
3324 resharding_ctrl ctrl
= ctrl_in
? *ctrl_in
: resharding_ctrl();
3325 size_t bytes_in_batch
= 0;
3326 size_t keys_in_batch
= 0;
3327 size_t bytes_per_iterator
= 0;
3328 size_t keys_per_iterator
= 0;
3329 size_t keys_processed
= 0;
3330 size_t keys_moved
= 0;
3332 auto flush_batch
= [&](rocksdb::WriteBatch
* batch
) {
3333 dout(10) << "flushing batch, " << keys_in_batch
<< " keys, for "
3334 << bytes_in_batch
<< " bytes" << dendl
;
3335 rocksdb::WriteOptions woptions
;
3336 woptions
.sync
= true;
3337 rocksdb::Status s
= db
->Write(woptions
, batch
);
3338 ceph_assert(s
.ok());
3344 auto process_column
= [&](rocksdb::ColumnFamilyHandle
* handle
,
3345 const std::string
& fixed_prefix
)
3347 dout(5) << " column=" << (void*)handle
<< " prefix=" << fixed_prefix
<< dendl
;
3348 std::unique_ptr
<rocksdb::Iterator
> it
{
3349 db
->NewIterator(rocksdb::ReadOptions(), handle
)};
3352 rocksdb::WriteBatch bat
;
3353 for (it
->SeekToFirst(); it
->Valid(); it
->Next()) {
3354 rocksdb::Slice raw_key
= it
->key();
3355 dout(30) << "key=" << pretty_binary_string(raw_key
.ToString()) << dendl
;
3356 //check if need to refresh iterator
3357 if (bytes_per_iterator
>= ctrl
.bytes_per_iterator
||
3358 keys_per_iterator
>= ctrl
.keys_per_iterator
) {
3359 dout(8) << "refreshing iterator" << dendl
;
3360 bytes_per_iterator
= 0;
3361 keys_per_iterator
= 0;
3362 std::string raw_key_str
= raw_key
.ToString();
3363 it
.reset(db
->NewIterator(rocksdb::ReadOptions(), handle
));
3365 it
->Seek(raw_key_str
);
3366 ceph_assert(it
->Valid());
3367 raw_key
= it
->key();
3369 rocksdb::Slice value
= it
->value();
3370 std::string prefix
, key
;
3371 if (fixed_prefix
.size() == 0) {
3372 split_key(raw_key
, &prefix
, &key
);
3374 prefix
= fixed_prefix
;
3375 key
= raw_key
.ToString();
3378 if ((keys_processed
% 10000) == 0) {
3379 dout(10) << "processed " << keys_processed
<< " keys, moved " << keys_moved
<< dendl
;
3381 rocksdb::ColumnFamilyHandle
* new_handle
= get_cf_handle(prefix
, key
);
3382 if (new_handle
== nullptr) {
3383 new_handle
= default_cf
;
3385 if (handle
== new_handle
) {
3388 std::string new_raw_key
;
3389 if (new_handle
== default_cf
) {
3390 new_raw_key
= combine_strings(prefix
, key
);
3394 bat
.Delete(handle
, raw_key
);
3395 bat
.Put(new_handle
, new_raw_key
, value
);
3396 dout(25) << "moving " << (void*)handle
<< "/" << pretty_binary_string(raw_key
.ToString()) <<
3397 " to " << (void*)new_handle
<< "/" << pretty_binary_string(new_raw_key
) <<
3398 " size " << value
.size() << dendl
;
3400 bytes_in_batch
+= new_raw_key
.size() * 2 + value
.size();
3402 bytes_per_iterator
+= new_raw_key
.size() * 2 + value
.size();
3403 keys_per_iterator
++;
3405 //check if need to write batch
3406 if (bytes_in_batch
>= ctrl
.bytes_per_batch
||
3407 keys_in_batch
>= ctrl
.keys_per_batch
) {
3409 if (ctrl
.unittest_fail_after_first_batch
) {
3414 if (bat
.Count() > 0) {
3420 auto close_column_handles
= make_scope_guard([this] {
3424 columns_t to_process_columns
;
3425 int r
= prepare_for_reshard(new_sharding
, to_process_columns
);
3427 dout(1) << "failed to prepare db for reshard" << dendl
;
3431 for (auto& [name
, handle
] : to_process_columns
) {
3432 dout(5) << "Processing column=" << name
3433 << " handle=" << handle
.get() << dendl
;
3434 if (name
== rocksdb::kDefaultColumnFamilyName
) {
3435 ceph_assert(handle
.get() == default_cf
);
3436 r
= process_column(default_cf
, std::string());
3438 std::string fixed_prefix
= name
.substr(0, name
.find('-'));
3439 dout(10) << "Prefix: " << fixed_prefix
<< dendl
;
3440 r
= process_column(handle
.get(), fixed_prefix
);
3443 derr
<< "Error processing column " << name
<< dendl
;
3446 if (ctrl
.unittest_fail_after_processing_column
) {
3451 r
= reshard_cleanup(to_process_columns
);
3453 dout(5) << "failed to cleanup after reshard" << dendl
;
3457 if (ctrl
.unittest_fail_after_successful_processing
) {
3460 env
->CreateDir(sharding_def_dir
);
3461 if (auto status
= rocksdb::WriteStringToFile(env
, new_sharding
,
3462 sharding_def_file
, true);
3464 derr
<< __func__
<< " cannot write to " << sharding_def_file
<< dendl
;
3471 bool RocksDBStore::get_sharding(std::string
& sharding
) {
3472 rocksdb::Status status
;
3473 std::string stored_sharding_text
;
3474 bool result
= false;
3477 status
= env
->FileExists(sharding_def_file
);
3479 status
= rocksdb::ReadFileToString(env
,
3481 &stored_sharding_text
);
3484 sharding
= stored_sharding_text
;