1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
11 #include <sys/types.h>
14 #include "rocksdb/db.h"
15 #include "rocksdb/table.h"
16 #include "rocksdb/env.h"
17 #include "rocksdb/slice.h"
18 #include "rocksdb/cache.h"
19 #include "rocksdb/filter_policy.h"
20 #include "rocksdb/utilities/convenience.h"
21 #include "rocksdb/merge_operator.h"
23 #include "common/perf_counters.h"
24 #include "common/PriorityCache.h"
25 #include "include/common_fwd.h"
26 #include "include/scope_guard.h"
27 #include "include/str_list.h"
28 #include "include/stringify.h"
29 #include "include/str_map.h"
30 #include "KeyValueDB.h"
31 #include "RocksDBStore.h"
33 #include "common/debug.h"
35 #define dout_context cct
36 #define dout_subsys ceph_subsys_rocksdb
38 #define dout_prefix *_dout << "rocksdb: "
40 namespace fs
= std::filesystem
;
49 using std::unique_ptr
;
52 using ceph::bufferlist
;
53 using ceph::bufferptr
;
54 using ceph::Formatter
;
56 static const char* sharding_def_dir
= "sharding";
57 static const char* sharding_def_file
= "sharding/def";
58 static const char* sharding_recreate
= "sharding/recreate_columns";
59 static const char* resharding_column_lock
= "reshardingXcommencingXlocked";
61 static bufferlist
to_bufferlist(rocksdb::Slice in
) {
63 bl
.append(bufferptr(in
.data(), in
.size()));
67 static rocksdb::SliceParts
prepare_sliceparts(const bufferlist
&bl
,
68 vector
<rocksdb::Slice
> *slices
)
71 for (auto& buf
: bl
.buffers()) {
72 (*slices
)[n
].data_
= buf
.c_str();
73 (*slices
)[n
].size_
= buf
.length();
76 return rocksdb::SliceParts(slices
->data(), slices
->size());
81 // One of these for the default rocksdb column family, routing each prefix
82 // to the appropriate MergeOperator.
84 class RocksDBStore::MergeOperatorRouter
85 : public rocksdb::AssociativeMergeOperator
89 const char *Name() const override
{
90 // Construct a name that rocksDB will validate against. We want to
91 // do this in a way that doesn't constrain the ordering of calls
92 // to set_merge_operator, so sort the merge operators and then
93 // construct a name from all of those parts.
94 store
.assoc_name
.clear();
95 map
<std::string
,std::string
> names
;
97 for (auto& p
: store
.merge_ops
) {
98 names
[p
.first
] = p
.second
->name();
100 for (auto& p
: names
) {
101 store
.assoc_name
+= '.';
102 store
.assoc_name
+= p
.first
;
103 store
.assoc_name
+= ':';
104 store
.assoc_name
+= p
.second
;
106 return store
.assoc_name
.c_str();
109 explicit MergeOperatorRouter(RocksDBStore
&_store
) : store(_store
) {}
111 bool Merge(const rocksdb::Slice
& key
,
112 const rocksdb::Slice
* existing_value
,
113 const rocksdb::Slice
& value
,
114 std::string
* new_value
,
115 rocksdb::Logger
* logger
) const override
{
116 // for default column family
117 // extract prefix from key and compare against each registered merge op;
118 // even though merge operator for explicit CF is included in merge_ops,
119 // it won't be picked up, since it won't match.
120 for (auto& p
: store
.merge_ops
) {
121 if (p
.first
.compare(0, p
.first
.length(),
122 key
.data(), p
.first
.length()) == 0 &&
123 key
.data()[p
.first
.length()] == 0) {
124 if (existing_value
) {
125 p
.second
->merge(existing_value
->data(), existing_value
->size(),
126 value
.data(), value
.size(),
129 p
.second
->merge_nonexistent(value
.data(), value
.size(), new_value
);
134 return true; // OK :)
139 // One of these per non-default column family, linked directly to the
140 // merge operator for that CF/prefix (if any).
142 class RocksDBStore::MergeOperatorLinker
143 : public rocksdb::AssociativeMergeOperator
146 std::shared_ptr
<KeyValueDB::MergeOperator
> mop
;
148 explicit MergeOperatorLinker(const std::shared_ptr
<KeyValueDB::MergeOperator
> &o
) : mop(o
) {}
150 const char *Name() const override
{
154 bool Merge(const rocksdb::Slice
& key
,
155 const rocksdb::Slice
* existing_value
,
156 const rocksdb::Slice
& value
,
157 std::string
* new_value
,
158 rocksdb::Logger
* logger
) const override
{
159 if (existing_value
) {
160 mop
->merge(existing_value
->data(), existing_value
->size(),
161 value
.data(), value
.size(),
164 mop
->merge_nonexistent(value
.data(), value
.size(), new_value
);
170 int RocksDBStore::set_merge_operator(
171 const string
& prefix
,
172 std::shared_ptr
<KeyValueDB::MergeOperator
> mop
)
174 // If you fail here, it's because you can't do this on an open database
175 ceph_assert(db
== nullptr);
176 merge_ops
.push_back(std::make_pair(prefix
,mop
));
180 class CephRocksdbLogger
: public rocksdb::Logger
{
183 explicit CephRocksdbLogger(CephContext
*c
) : cct(c
) {
186 ~CephRocksdbLogger() override
{
190 // Write an entry to the log file with the specified format.
191 void Logv(const char* format
, va_list ap
) override
{
192 Logv(rocksdb::INFO_LEVEL
, format
, ap
);
195 // Write an entry to the log file with the specified log level
196 // and format. Any log with level under the internal log level
197 // of *this (see @SetInfoLogLevel and @GetInfoLogLevel) will not be
199 void Logv(const rocksdb::InfoLogLevel log_level
, const char* format
,
200 va_list ap
) override
{
201 int v
= rocksdb::NUM_INFO_LOG_LEVELS
- log_level
- 1;
202 dout(ceph::dout::need_dynamic(v
));
204 vsnprintf(buf
, sizeof(buf
), format
, ap
);
205 *_dout
<< buf
<< dendl
;
209 rocksdb::Logger
*create_rocksdb_ceph_logger()
211 return new CephRocksdbLogger(g_ceph_context
);
214 static int string2bool(const string
&val
, bool &b_val
)
216 if (strcasecmp(val
.c_str(), "false") == 0) {
219 } else if (strcasecmp(val
.c_str(), "true") == 0) {
224 int b
= strict_strtol(val
.c_str(), 10, &err
);
233 extern std::string
trim(const std::string
& str
);
236 // this function is a modification of rocksdb's StringToMap:
237 // 1) accepts ' \n ; as separators
238 // 2) leaves compound options with enclosing { and }
239 rocksdb::Status
StringToMap(const std::string
& opts_str
,
240 std::unordered_map
<std::string
, std::string
>* opts_map
)
242 using rocksdb::Status
;
246 // opts_str = "write_buffer_size=1024;max_write_buffer_number=2;"
247 // "nested_opt={opt1=1;opt2=2};max_bytes_for_level_base=100"
249 std::string opts
= trim(opts_str
);
250 while (pos
< opts
.size()) {
251 size_t eq_pos
= opts
.find('=', pos
);
252 if (eq_pos
== std::string::npos
) {
253 return Status::InvalidArgument("Mismatched key value pair, '=' expected");
255 std::string key
= trim(opts
.substr(pos
, eq_pos
- pos
));
257 return Status::InvalidArgument("Empty key found");
260 // skip space after '=' and look for '{' for possible nested options
262 while (pos
< opts
.size() && isspace(opts
[pos
])) {
265 // Empty value at the end
266 if (pos
>= opts
.size()) {
267 (*opts_map
)[key
] = "";
270 if (opts
[pos
] == '{') {
272 size_t brace_pos
= pos
+ 1;
273 while (brace_pos
< opts
.size()) {
274 if (opts
[brace_pos
] == '{') {
276 } else if (opts
[brace_pos
] == '}') {
284 // found the matching closing brace
286 //include both '{' and '}'
287 (*opts_map
)[key
] = trim(opts
.substr(pos
, brace_pos
- pos
+ 1));
288 // skip all whitespace and move to the next ';,'
289 // brace_pos points to the matching '}'
291 while (pos
< opts
.size() && isspace(opts
[pos
])) {
294 if (pos
< opts
.size() && opts
[pos
] != ';' && opts
[pos
] != ',') {
295 return Status::InvalidArgument(
296 "Unexpected chars after nested options");
300 return Status::InvalidArgument(
301 "Mismatched curly braces for nested options");
304 size_t sc_pos
= opts
.find_first_of(",;", pos
);
305 if (sc_pos
== std::string::npos
) {
306 (*opts_map
)[key
] = trim(opts
.substr(pos
));
307 // It either ends with a trailing , ; or the last key-value pair
310 (*opts_map
)[key
] = trim(opts
.substr(pos
, sc_pos
- pos
));
318 int RocksDBStore::tryInterpret(const string
&key
, const string
&val
, rocksdb::Options
&opt
)
320 if (key
== "compaction_threads") {
322 int f
= strict_iecstrtoll(val
, &err
);
325 //Low priority threadpool is used for compaction
326 opt
.env
->SetBackgroundThreads(f
, rocksdb::Env::Priority::LOW
);
327 } else if (key
== "flusher_threads") {
329 int f
= strict_iecstrtoll(val
, &err
);
332 //High priority threadpool is used for flusher
333 opt
.env
->SetBackgroundThreads(f
, rocksdb::Env::Priority::HIGH
);
334 } else if (key
== "compact_on_mount") {
335 int ret
= string2bool(val
, compact_on_mount
);
338 } else if (key
== "disableWAL") {
339 int ret
= string2bool(val
, disableWAL
);
343 //unrecognize config options.
349 int RocksDBStore::ParseOptionsFromString(const string
&opt_str
, rocksdb::Options
&opt
)
351 return ParseOptionsFromStringStatic(cct
, opt_str
, opt
,
352 [&](const string
& k
, const string
& v
, rocksdb::Options
& o
) {
353 return tryInterpret(k
, v
, o
);
358 int RocksDBStore::ParseOptionsFromStringStatic(
360 const string
& opt_str
,
361 rocksdb::Options
& opt
,
362 function
<int(const string
&, const string
&, rocksdb::Options
&)> interp
)
364 // keep aligned with func tryInterpret
365 const set
<string
> need_interp_keys
= {"compaction_threads", "flusher_threads", "compact_on_mount", "disableWAL"};
366 rocksdb::Status status
;
367 std::unordered_map
<std::string
, std::string
> str_map
;
368 status
= StringToMap(opt_str
, &str_map
);
370 dout(5) << __func__
<< " error '" << status
.getState() <<
371 "' while parsing options '" << opt_str
<< "'" << dendl
;
375 for (auto it
= str_map
.begin(); it
!= str_map
.end(); ++it
) {
376 string this_opt
= it
->first
+ "=" + it
->second
;
377 rocksdb::Status status
=
378 rocksdb::GetOptionsFromString(opt
, this_opt
, &opt
);
381 if (interp
!= nullptr) {
382 r
= interp(it
->first
, it
->second
, opt
);
383 } else if (!need_interp_keys
.count(it
->first
)) {
387 derr
<< status
.ToString() << dendl
;
391 lgeneric_dout(cct
, 1) << " set rocksdb option " << it
->first
392 << " = " << it
->second
<< dendl
;
397 int RocksDBStore::init(string _options_str
)
399 options_str
= _options_str
;
400 rocksdb::Options opt
;
402 if (options_str
.length()) {
403 int r
= ParseOptionsFromString(options_str
, opt
);
411 int RocksDBStore::create_db_dir()
414 unique_ptr
<rocksdb::Directory
> dir
;
415 env
->NewDirectory(path
, &dir
);
417 if (!fs::exists(path
)) {
419 if (!fs::create_directory(path
, ec
)) {
420 derr
<< __func__
<< " failed to create " << path
421 << ": " << ec
.message() << dendl
;
424 fs::permissions(path
,
425 fs::perms::owner_all
|
426 fs::perms::group_read
| fs::perms::group_exec
|
427 fs::perms::others_read
| fs::perms::others_exec
);
433 int RocksDBStore::install_cf_mergeop(
434 const string
&key_prefix
,
435 rocksdb::ColumnFamilyOptions
*cf_opt
)
437 ceph_assert(cf_opt
!= nullptr);
438 cf_opt
->merge_operator
.reset();
439 for (auto& i
: merge_ops
) {
440 if (i
.first
== key_prefix
) {
441 cf_opt
->merge_operator
.reset(new MergeOperatorLinker(i
.second
));
447 int RocksDBStore::create_and_open(ostream
&out
,
448 const std::string
& cfs
)
450 int r
= create_db_dir();
453 return do_open(out
, true, false, cfs
);
456 std::shared_ptr
<rocksdb::Cache
> RocksDBStore::create_block_cache(
457 const std::string
& cache_type
, size_t cache_size
, double cache_prio_high
) {
458 std::shared_ptr
<rocksdb::Cache
> cache
;
459 auto shard_bits
= cct
->_conf
->rocksdb_cache_shard_bits
;
460 if (cache_type
== "binned_lru") {
461 cache
= rocksdb_cache::NewBinnedLRUCache(cct
, cache_size
, shard_bits
, false, cache_prio_high
);
462 } else if (cache_type
== "lru") {
463 cache
= rocksdb::NewLRUCache(cache_size
, shard_bits
);
464 } else if (cache_type
== "clock") {
465 cache
= rocksdb::NewClockCache(cache_size
, shard_bits
);
467 derr
<< "rocksdb_cache_type '" << cache
468 << "' chosen, but RocksDB not compiled with LibTBB. "
472 derr
<< "unrecognized rocksdb_cache_type '" << cache_type
<< "'" << dendl
;
477 int RocksDBStore::load_rocksdb_options(bool create_if_missing
, rocksdb::Options
& opt
)
479 rocksdb::Status status
;
481 if (options_str
.length()) {
482 int r
= ParseOptionsFromString(options_str
, opt
);
488 if (cct
->_conf
->rocksdb_perf
) {
489 dbstats
= rocksdb::CreateDBStatistics();
490 opt
.statistics
= dbstats
;
493 opt
.create_if_missing
= create_if_missing
;
494 if (kv_options
.count("separate_wal_dir")) {
495 opt
.wal_dir
= path
+ ".wal";
498 // Since ceph::for_each_substr doesn't return a value and
499 // std::stoull does throw, we may as well just catch everything here.
501 if (kv_options
.count("db_paths")) {
503 get_str_list(kv_options
["db_paths"], "; \t", paths
);
504 for (auto& p
: paths
) {
505 size_t pos
= p
.find(',');
506 if (pos
== std::string::npos
) {
507 derr
<< __func__
<< " invalid db path item " << p
<< " in "
508 << kv_options
["db_paths"] << dendl
;
511 string path
= p
.substr(0, pos
);
512 string size_str
= p
.substr(pos
+ 1);
513 uint64_t size
= atoll(size_str
.c_str());
515 derr
<< __func__
<< " invalid db path item " << p
<< " in "
516 << kv_options
["db_paths"] << dendl
;
519 opt
.db_paths
.push_back(rocksdb::DbPath(path
, size
));
520 dout(10) << __func__
<< " db_path " << path
<< " size " << size
<< dendl
;
523 } catch (const std::system_error
& e
) {
524 return -e
.code().value();
527 if (cct
->_conf
->rocksdb_log_to_ceph_log
) {
528 opt
.info_log
.reset(new CephRocksdbLogger(cct
));
532 dout(10) << __func__
<< " using custom Env " << priv
<< dendl
;
533 opt
.env
= static_cast<rocksdb::Env
*>(priv
);
538 opt
.env
->SetAllowNonOwnerAccess(false);
541 if (!set_cache_flag
) {
542 cache_size
= cct
->_conf
->rocksdb_cache_size
;
544 uint64_t row_cache_size
= cache_size
* cct
->_conf
->rocksdb_cache_row_ratio
;
545 uint64_t block_cache_size
= cache_size
- row_cache_size
;
547 bbt_opts
.block_cache
= create_block_cache(cct
->_conf
->rocksdb_cache_type
, block_cache_size
);
548 if (!bbt_opts
.block_cache
) {
551 bbt_opts
.block_size
= cct
->_conf
->rocksdb_block_size
;
553 if (row_cache_size
> 0)
554 opt
.row_cache
= rocksdb::NewLRUCache(row_cache_size
,
555 cct
->_conf
->rocksdb_cache_shard_bits
);
556 uint64_t bloom_bits
= cct
->_conf
.get_val
<uint64_t>("rocksdb_bloom_bits_per_key");
557 if (bloom_bits
> 0) {
558 dout(10) << __func__
<< " set bloom filter bits per key to "
559 << bloom_bits
<< dendl
;
560 bbt_opts
.filter_policy
.reset(rocksdb::NewBloomFilterPolicy(bloom_bits
));
562 using std::placeholders::_1
;
563 if (cct
->_conf
.with_val
<std::string
>("rocksdb_index_type",
564 std::bind(std::equal_to
<std::string
>(), _1
,
566 bbt_opts
.index_type
= rocksdb::BlockBasedTableOptions::IndexType::kBinarySearch
;
567 if (cct
->_conf
.with_val
<std::string
>("rocksdb_index_type",
568 std::bind(std::equal_to
<std::string
>(), _1
,
570 bbt_opts
.index_type
= rocksdb::BlockBasedTableOptions::IndexType::kHashSearch
;
571 if (cct
->_conf
.with_val
<std::string
>("rocksdb_index_type",
572 std::bind(std::equal_to
<std::string
>(), _1
,
574 bbt_opts
.index_type
= rocksdb::BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch
;
575 if (!bbt_opts
.no_block_cache
) {
576 bbt_opts
.cache_index_and_filter_blocks
=
577 cct
->_conf
.get_val
<bool>("rocksdb_cache_index_and_filter_blocks");
578 bbt_opts
.cache_index_and_filter_blocks_with_high_priority
=
579 cct
->_conf
.get_val
<bool>("rocksdb_cache_index_and_filter_blocks_with_high_priority");
580 bbt_opts
.pin_l0_filter_and_index_blocks_in_cache
=
581 cct
->_conf
.get_val
<bool>("rocksdb_pin_l0_filter_and_index_blocks_in_cache");
583 bbt_opts
.partition_filters
= cct
->_conf
.get_val
<bool>("rocksdb_partition_filters");
584 if (cct
->_conf
.get_val
<Option::size_t>("rocksdb_metadata_block_size") > 0)
585 bbt_opts
.metadata_block_size
= cct
->_conf
.get_val
<Option::size_t>("rocksdb_metadata_block_size");
587 opt
.table_factory
.reset(rocksdb::NewBlockBasedTableFactory(bbt_opts
));
588 dout(10) << __func__
<< " block size " << cct
->_conf
->rocksdb_block_size
589 << ", block_cache size " << byte_u_t(block_cache_size
)
590 << ", row_cache size " << byte_u_t(row_cache_size
)
592 << (1 << cct
->_conf
->rocksdb_cache_shard_bits
)
593 << ", type " << cct
->_conf
->rocksdb_cache_type
596 opt
.merge_operator
.reset(new MergeOperatorRouter(*this));
597 comparator
= opt
.comparator
;
601 void RocksDBStore::add_column_family(const std::string
& cf_name
, uint32_t hash_l
, uint32_t hash_h
,
602 size_t shard_idx
, rocksdb::ColumnFamilyHandle
*handle
) {
603 dout(10) << __func__
<< " column_name=" << cf_name
<< " shard_idx=" << shard_idx
<<
604 " hash_l=" << hash_l
<< " hash_h=" << hash_h
<< " handle=" << (void*) handle
<< dendl
;
605 bool exists
= cf_handles
.count(cf_name
) > 0;
606 auto& column
= cf_handles
[cf_name
];
608 ceph_assert(hash_l
== column
.hash_l
);
609 ceph_assert(hash_h
== column
.hash_h
);
611 ceph_assert(hash_l
< hash_h
);
612 column
.hash_l
= hash_l
;
613 column
.hash_h
= hash_h
;
615 if (column
.handles
.size() <= shard_idx
)
616 column
.handles
.resize(shard_idx
+ 1);
617 column
.handles
[shard_idx
] = handle
;
618 cf_ids_to_prefix
.emplace(handle
->GetID(), cf_name
);
621 bool RocksDBStore::is_column_family(const std::string
& prefix
) {
622 return cf_handles
.count(prefix
);
625 std::string_view
RocksDBStore::get_key_hash_view(const prefix_shards
& shards
, const char* key
, const size_t keylen
) {
626 uint32_t hash_l
= std::min
<uint32_t>(shards
.hash_l
, keylen
);
627 uint32_t hash_h
= std::min
<uint32_t>(shards
.hash_h
, keylen
);
628 return { key
+ hash_l
, hash_h
- hash_l
};
631 rocksdb::ColumnFamilyHandle
*RocksDBStore::get_key_cf(const prefix_shards
& shards
, const char* key
, const size_t keylen
) {
632 auto sv
= get_key_hash_view(shards
, key
, keylen
);
633 uint32_t hash
= ceph_str_hash_rjenkins(sv
.data(), sv
.size());
634 return shards
.handles
[hash
% shards
.handles
.size()];
637 rocksdb::ColumnFamilyHandle
*RocksDBStore::get_cf_handle(const std::string
& prefix
, const std::string
& key
) {
638 auto iter
= cf_handles
.find(prefix
);
639 if (iter
== cf_handles
.end()) {
642 if (iter
->second
.handles
.size() == 1) {
643 return iter
->second
.handles
[0];
645 return get_key_cf(iter
->second
, key
.data(), key
.size());
650 rocksdb::ColumnFamilyHandle
*RocksDBStore::get_cf_handle(const std::string
& prefix
, const char* key
, size_t keylen
) {
651 auto iter
= cf_handles
.find(prefix
);
652 if (iter
== cf_handles
.end()) {
655 if (iter
->second
.handles
.size() == 1) {
656 return iter
->second
.handles
[0];
658 return get_key_cf(iter
->second
, key
, keylen
);
664 * If the specified IteratorBounds arg has both an upper and a lower bound defined, and they have equal placement hash
665 * strings, we can be sure that the entire iteration range exists in a single CF. In that case, we return the relevant
666 * CF handle. In all other cases, we return a nullptr to indicate that the specified bounds cannot necessarily be mapped
669 rocksdb::ColumnFamilyHandle
*RocksDBStore::get_cf_handle(const std::string
& prefix
, const IteratorBounds
& bounds
) {
670 if (!bounds
.lower_bound
|| !bounds
.upper_bound
) {
673 auto iter
= cf_handles
.find(prefix
);
674 ceph_assert(iter
!= cf_handles
.end());
675 ceph_assert(iter
->second
.handles
.size() != 1);
676 if (iter
->second
.hash_l
!= 0) {
679 auto lower_bound_hash_str
= get_key_hash_view(iter
->second
, bounds
.lower_bound
->data(), bounds
.lower_bound
->size());
680 auto upper_bound_hash_str
= get_key_hash_view(iter
->second
, bounds
.upper_bound
->data(), bounds
.upper_bound
->size());
681 if (lower_bound_hash_str
== upper_bound_hash_str
) {
682 auto key
= *bounds
.lower_bound
;
683 return get_key_cf(iter
->second
, key
.data(), key
.size());
690 * Definition of sharding:
691 * space-separated list of: column_def [ '=' options ]
692 * column_def := column_name '(' shard_count ')'
693 * column_def := column_name '(' shard_count ',' hash_begin '-' ')'
694 * column_def := column_name '(' shard_count ',' hash_begin '-' hash_end ')'
695 * I=write_buffer_size=1048576 O(6) m(7,10-) prefix(4,0-10)=disable_auto_compactions=true,max_bytes_for_level_base=1048576
697 bool RocksDBStore::parse_sharding_def(const std::string_view text_def_in
,
698 std::vector
<ColumnFamily
>& sharding_def
,
699 char const* *error_position
,
700 std::string
*error_msg
)
702 std::string_view text_def
= text_def_in
;
703 char const* error_position_local
= nullptr;
704 std::string error_msg_local
;
705 if (error_position
== nullptr) {
706 error_position
= &error_position_local
;
708 *error_position
= nullptr;
709 if (error_msg
== nullptr) {
710 error_msg
= &error_msg_local
;
714 sharding_def
.clear();
715 while (!text_def
.empty()) {
716 std::string_view options
;
717 std::string_view name
;
718 size_t shard_cnt
= 1;
719 uint32_t l_bound
= 0;
720 uint32_t h_bound
= std::numeric_limits
<uint32_t>::max();
722 std::string_view column_def
;
723 size_t spos
= text_def
.find(' ');
724 if (spos
== std::string_view::npos
) {
725 column_def
= text_def
;
726 text_def
= std::string_view(text_def
.end(), 0);
728 column_def
= text_def
.substr(0, spos
);
729 text_def
= text_def
.substr(spos
+ 1);
731 size_t eqpos
= column_def
.find('=');
732 if (eqpos
!= std::string_view::npos
) {
733 options
= column_def
.substr(eqpos
+ 1);
734 column_def
= column_def
.substr(0, eqpos
);
737 size_t bpos
= column_def
.find('(');
738 if (bpos
!= std::string_view::npos
) {
739 name
= column_def
.substr(0, bpos
);
740 const char* nptr
= &column_def
[bpos
+ 1];
742 shard_cnt
= strtol(nptr
, &endptr
, 10);
743 if (nptr
== endptr
) {
744 *error_position
= nptr
;
745 *error_msg
= "expecting integer";
751 l_bound
= strtol(nptr
, &endptr
, 10);
752 if (nptr
== endptr
) {
753 *error_position
= nptr
;
754 *error_msg
= "expecting integer";
759 *error_position
= nptr
;
760 *error_msg
= "expecting '-'";
764 h_bound
= strtol(nptr
, &endptr
, 10);
765 if (nptr
== endptr
) {
766 h_bound
= std::numeric_limits
<uint32_t>::max();
771 *error_position
= nptr
;
772 *error_msg
= "expecting ')'";
778 sharding_def
.emplace_back(std::string(name
), shard_cnt
, std::string(options
), l_bound
, h_bound
);
780 return *error_position
== nullptr;
783 void RocksDBStore::sharding_def_to_columns(const std::vector
<ColumnFamily
>& sharding_def
,
784 std::vector
<std::string
>& columns
)
787 for (size_t i
= 0; i
< sharding_def
.size(); i
++) {
788 if (sharding_def
[i
].shard_cnt
== 1) {
789 columns
.push_back(sharding_def
[i
].name
);
791 for (size_t j
= 0; j
< sharding_def
[i
].shard_cnt
; j
++) {
792 columns
.push_back(sharding_def
[i
].name
+ "-" + std::to_string(j
));
798 int RocksDBStore::create_shards(const rocksdb::Options
& opt
,
799 const std::vector
<ColumnFamily
>& sharding_def
)
801 for (auto& p
: sharding_def
) {
802 // copy default CF settings, block cache, merge operators as
803 // the base for new CF
804 rocksdb::ColumnFamilyOptions
cf_opt(opt
);
805 rocksdb::Status status
;
806 // apply options to column family
807 int r
= update_column_family_options(p
.name
, p
.options
, &cf_opt
);
811 for (size_t idx
= 0; idx
< p
.shard_cnt
; idx
++) {
813 if (p
.shard_cnt
== 1)
816 cf_name
= p
.name
+ "-" + std::to_string(idx
);
817 rocksdb::ColumnFamilyHandle
*cf
;
818 status
= db
->CreateColumnFamily(cf_opt
, cf_name
, &cf
);
820 derr
<< __func__
<< " Failed to create rocksdb column family: "
824 // store the new CF handle
825 add_column_family(p
.name
, p
.hash_l
, p
.hash_h
, idx
, cf
);
831 int RocksDBStore::apply_sharding(const rocksdb::Options
& opt
,
832 const std::string
& sharding_text
)
834 // create and open column families
835 if (!sharding_text
.empty()) {
838 rocksdb::Status status
;
839 std::vector
<ColumnFamily
> sharding_def
;
840 char const* error_position
;
841 std::string error_msg
;
842 b
= parse_sharding_def(sharding_text
, sharding_def
, &error_position
, &error_msg
);
844 dout(1) << __func__
<< " bad sharding: " << dendl
;
845 dout(1) << __func__
<< sharding_text
<< dendl
;
846 dout(1) << __func__
<< std::string(error_position
- &sharding_text
[0], ' ') << "^" << error_msg
<< dendl
;
849 r
= create_shards(opt
, sharding_def
);
851 derr
<< __func__
<< " create_shards failed error=" << r
<< dendl
;
854 opt
.env
->CreateDir(sharding_def_dir
);
855 status
= rocksdb::WriteStringToFile(opt
.env
, sharding_text
,
856 sharding_def_file
, true);
858 derr
<< __func__
<< " cannot write to " << sharding_def_file
<< dendl
;
862 opt
.env
->DeleteFile(sharding_def_file
);
867 // linking to rocksdb function defined in options_helper.cc
868 // it can parse nested params like "nested_opt={opt1=1;opt2=2}"
869 extern rocksdb::Status
rocksdb::StringToMap(const std::string
& opts_str
,
870 std::unordered_map
<std::string
, std::string
>* opts_map
);
872 // Splits column family options from single string into name->value column_opts_map.
873 // The split is done using RocksDB parser that understands "{" and "}", so it
874 // properly extracts compound options.
875 // If non-RocksDB option "block_cache" is defined it is extracted to block_cache_opt.
876 int RocksDBStore::split_column_family_options(const std::string
& options
,
877 std::unordered_map
<std::string
, std::string
>* opt_map
,
878 std::string
* block_cache_opt
)
880 dout(20) << __func__
<< " options=" << options
<< dendl
;
881 rocksdb::Status status
= rocksdb::StringToMap(options
, opt_map
);
883 dout(5) << __func__
<< " error '" << status
.getState()
884 << "' while parsing options '" << options
<< "'" << dendl
;
887 // if "block_cache" option exists, then move it to separate string
888 if (auto it
= opt_map
->find("block_cache"); it
!= opt_map
->end()) {
889 *block_cache_opt
= it
->second
;
892 block_cache_opt
->clear();
897 // Updates column family options.
898 // Take options from more_options and apply them to cf_opt.
899 // Allowed options are exactly the same as allowed for column families in RocksDB.
900 // Ceph addition is "block_cache" option that is translated to block_cache and
901 // allows to specialize separate block cache for O column family.
903 // base_name - name of column without shard suffix: "-"+number
904 // options - additional options to apply
905 // cf_opt - column family options to update
906 int RocksDBStore::update_column_family_options(const std::string
& base_name
,
907 const std::string
& more_options
,
908 rocksdb::ColumnFamilyOptions
* cf_opt
)
910 std::unordered_map
<std::string
, std::string
> options_map
;
911 std::string block_cache_opt
;
912 rocksdb::Status status
;
913 int r
= split_column_family_options(more_options
, &options_map
, &block_cache_opt
);
915 dout(5) << __func__
<< " failed to parse options; column family=" << base_name
916 << " options=" << more_options
<< dendl
;
919 status
= rocksdb::GetColumnFamilyOptionsFromMap(*cf_opt
, options_map
, cf_opt
);
921 dout(5) << __func__
<< " invalid column family optionsp; column family="
922 << base_name
<< " options=" << more_options
<< dendl
;
923 dout(5) << __func__
<< " RocksDB error='" << status
.getState() << "'" << dendl
;
926 if (base_name
!= rocksdb::kDefaultColumnFamilyName
) {
927 // default cf has its merge operator defined in load_rocksdb_options, should not override it
928 install_cf_mergeop(base_name
, cf_opt
);
930 if (!block_cache_opt
.empty()) {
931 r
= apply_block_cache_options(base_name
, block_cache_opt
, cf_opt
);
933 // apply_block_cache_options already does all necessary douts
940 int RocksDBStore::apply_block_cache_options(const std::string
& column_name
,
941 const std::string
& block_cache_opt
,
942 rocksdb::ColumnFamilyOptions
* cf_opt
)
944 rocksdb::Status status
;
945 std::unordered_map
<std::string
, std::string
> cache_options_map
;
946 status
= rocksdb::StringToMap(block_cache_opt
, &cache_options_map
);
948 dout(5) << __func__
<< " invalid block cache options; column=" << column_name
949 << " options=" << block_cache_opt
<< dendl
;
950 dout(5) << __func__
<< " RocksDB error='" << status
.getState() << "'" << dendl
;
953 bool require_new_block_cache
= false;
954 std::string cache_type
= cct
->_conf
->rocksdb_cache_type
;
955 if (const auto it
= cache_options_map
.find("type"); it
!= cache_options_map
.end()) {
956 cache_type
= it
->second
;
957 cache_options_map
.erase(it
);
958 require_new_block_cache
= true;
960 size_t cache_size
= cct
->_conf
->rocksdb_cache_size
;
961 if (auto it
= cache_options_map
.find("size"); it
!= cache_options_map
.end()) {
963 cache_size
= strict_iecstrtoll(it
->second
.c_str(), &error
);
964 if (!error
.empty()) {
965 dout(10) << __func__
<< " invalid size: '" << it
->second
<< "'" << dendl
;
968 cache_options_map
.erase(it
);
969 require_new_block_cache
= true;
971 double high_pri_pool_ratio
= 0.0;
972 if (auto it
= cache_options_map
.find("high_ratio"); it
!= cache_options_map
.end()) {
974 high_pri_pool_ratio
= strict_strtod(it
->second
.c_str(), &error
);
975 if (!error
.empty()) {
976 dout(10) << __func__
<< " invalid high_pri (float): '" << it
->second
<< "'" << dendl
;
979 cache_options_map
.erase(it
);
980 require_new_block_cache
= true;
983 rocksdb::BlockBasedTableOptions column_bbt_opts
;
984 status
= GetBlockBasedTableOptionsFromMap(bbt_opts
, cache_options_map
, &column_bbt_opts
);
986 dout(5) << __func__
<< " invalid block cache options; column=" << column_name
987 << " options=" << block_cache_opt
<< dendl
;
988 dout(5) << __func__
<< " RocksDB error='" << status
.getState() << "'" << dendl
;
991 std::shared_ptr
<rocksdb::Cache
> block_cache
;
992 if (column_bbt_opts
.no_block_cache
) {
993 // clear all settings except no_block_cache
994 // rocksdb does not like then
995 column_bbt_opts
= rocksdb::BlockBasedTableOptions();
996 column_bbt_opts
.no_block_cache
= true;
998 if (require_new_block_cache
) {
999 block_cache
= create_block_cache(cache_type
, cache_size
, high_pri_pool_ratio
);
1001 dout(5) << __func__
<< " failed to create block cache for params: " << block_cache_opt
<< dendl
;
1005 block_cache
= bbt_opts
.block_cache
;
1008 column_bbt_opts
.block_cache
= block_cache
;
1009 cf_bbt_opts
[column_name
] = column_bbt_opts
;
1010 cf_opt
->table_factory
.reset(NewBlockBasedTableFactory(cf_bbt_opts
[column_name
]));
1014 int RocksDBStore::verify_sharding(const rocksdb::Options
& opt
,
1015 std::vector
<rocksdb::ColumnFamilyDescriptor
>& existing_cfs
,
1016 std::vector
<std::pair
<size_t, RocksDBStore::ColumnFamily
> >& existing_cfs_shard
,
1017 std::vector
<rocksdb::ColumnFamilyDescriptor
>& missing_cfs
,
1018 std::vector
<std::pair
<size_t, RocksDBStore::ColumnFamily
> >& missing_cfs_shard
)
1020 rocksdb::Status status
;
1021 std::string stored_sharding_text
;
1022 status
= opt
.env
->FileExists(sharding_def_file
);
1024 status
= rocksdb::ReadFileToString(opt
.env
,
1026 &stored_sharding_text
);
1028 derr
<< __func__
<< " cannot read from " << sharding_def_file
<< dendl
;
1031 dout(20) << __func__
<< " sharding=" << stored_sharding_text
<< dendl
;
1033 dout(30) << __func__
<< " no sharding" << dendl
;
1034 //no "sharding_def" present
1036 //check if sharding_def matches stored_sharding_def
1037 std::vector
<ColumnFamily
> stored_sharding_def
;
1038 parse_sharding_def(stored_sharding_text
, stored_sharding_def
);
1040 std::sort(stored_sharding_def
.begin(), stored_sharding_def
.end(),
1041 [](ColumnFamily
& a
, ColumnFamily
& b
) { return a
.name
< b
.name
; } );
1043 std::vector
<string
> rocksdb_cfs
;
1044 status
= rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(opt
),
1045 path
, &rocksdb_cfs
);
1047 derr
<< __func__
<< " unable to list column families: " << status
.ToString() << dendl
;
1050 dout(5) << __func__
<< " column families from rocksdb: " << rocksdb_cfs
<< dendl
;
1052 auto emplace_cf
= [&] (const RocksDBStore::ColumnFamily
& column
,
1054 const std::string
& shard_name
,
1055 const rocksdb::ColumnFamilyOptions
& opt
) {
1056 if (std::find(rocksdb_cfs
.begin(), rocksdb_cfs
.end(), shard_name
) != rocksdb_cfs
.end()) {
1057 existing_cfs
.emplace_back(shard_name
, opt
);
1058 existing_cfs_shard
.emplace_back(shard_id
, column
);
1060 missing_cfs
.emplace_back(shard_name
, opt
);
1061 missing_cfs_shard
.emplace_back(shard_id
, column
);
1065 for (auto& column
: stored_sharding_def
) {
1066 rocksdb::ColumnFamilyOptions
cf_opt(opt
);
1067 int r
= update_column_family_options(column
.name
, column
.options
, &cf_opt
);
1071 if (column
.shard_cnt
== 1) {
1072 emplace_cf(column
, 0, column
.name
, cf_opt
);
1074 for (size_t i
= 0; i
< column
.shard_cnt
; i
++) {
1075 std::string cf_name
= column
.name
+ "-" + std::to_string(i
);
1076 emplace_cf(column
, i
, cf_name
, cf_opt
);
1080 existing_cfs
.emplace_back("default", opt
);
1082 if (existing_cfs
.size() != rocksdb_cfs
.size()) {
1083 std::vector
<std::string
> columns_from_stored
;
1084 sharding_def_to_columns(stored_sharding_def
, columns_from_stored
);
1085 derr
<< __func__
<< " extra columns in rocksdb. rocksdb columns = " << rocksdb_cfs
1086 << " target columns = " << columns_from_stored
<< dendl
;
1092 std::ostream
& operator<<(std::ostream
& out
, const RocksDBStore::ColumnFamily
& cf
)
1097 out
<< cf
.shard_cnt
;
1101 if (cf
.hash_h
!= std::numeric_limits
<uint32_t>::max()) {
1110 int RocksDBStore::do_open(ostream
&out
,
1111 bool create_if_missing
,
1113 const std::string
& sharding_text
)
1115 ceph_assert(!(create_if_missing
&& open_readonly
));
1116 rocksdb::Options opt
;
1117 int r
= load_rocksdb_options(create_if_missing
, opt
);
1119 dout(1) << __func__
<< " load rocksdb options failed" << dendl
;
1122 rocksdb::Status status
;
1123 if (create_if_missing
) {
1124 status
= rocksdb::DB::Open(opt
, path
, &db
);
1126 derr
<< status
.ToString() << dendl
;
1129 r
= apply_sharding(opt
, sharding_text
);
1133 default_cf
= db
->DefaultColumnFamily();
1135 std::vector
<rocksdb::ColumnFamilyDescriptor
> existing_cfs
;
1136 std::vector
<std::pair
<size_t, RocksDBStore::ColumnFamily
> > existing_cfs_shard
;
1137 std::vector
<rocksdb::ColumnFamilyDescriptor
> missing_cfs
;
1138 std::vector
<std::pair
<size_t, RocksDBStore::ColumnFamily
> > missing_cfs_shard
;
1140 r
= verify_sharding(opt
,
1141 existing_cfs
, existing_cfs_shard
,
1142 missing_cfs
, missing_cfs_shard
);
1146 std::string sharding_recreate_text
;
1147 status
= rocksdb::ReadFileToString(opt
.env
,
1149 &sharding_recreate_text
);
1150 bool recreate_mode
= status
.ok() && sharding_recreate_text
== "1";
1152 ceph_assert(!recreate_mode
|| !open_readonly
);
1153 if (recreate_mode
== false && missing_cfs
.size() != 0) {
1154 // We do not accept when there are missing column families, except case that we are during resharding.
1155 // We can get into this case if resharding was interrupted. It gives a chance to continue.
1156 // Opening DB is only allowed in read-only mode.
1157 if (open_readonly
== false &&
1158 std::find_if(missing_cfs
.begin(), missing_cfs
.end(),
1159 [](const rocksdb::ColumnFamilyDescriptor
& c
) { return c
.name
== resharding_column_lock
; }
1160 ) != missing_cfs
.end()) {
1161 derr
<< __func__
<< " missing column families: " << missing_cfs_shard
<< dendl
;
1166 if (existing_cfs
.empty()) {
1167 // no column families
1168 if (open_readonly
) {
1169 status
= rocksdb::DB::OpenForReadOnly(opt
, path
, &db
);
1171 status
= rocksdb::DB::Open(opt
, path
, &db
);
1174 derr
<< status
.ToString() << dendl
;
1177 default_cf
= db
->DefaultColumnFamily();
1179 std::vector
<rocksdb::ColumnFamilyHandle
*> handles
;
1180 if (open_readonly
) {
1181 status
= rocksdb::DB::OpenForReadOnly(rocksdb::DBOptions(opt
),
1185 status
= rocksdb::DB::Open(rocksdb::DBOptions(opt
),
1186 path
, existing_cfs
, &handles
, &db
);
1189 derr
<< status
.ToString() << dendl
;
1192 ceph_assert(existing_cfs
.size() == existing_cfs_shard
.size() + 1);
1193 ceph_assert(handles
.size() == existing_cfs
.size());
1194 dout(10) << __func__
<< " existing_cfs=" << existing_cfs
.size() << dendl
;
1195 for (size_t i
= 0; i
< existing_cfs_shard
.size(); i
++) {
1196 add_column_family(existing_cfs_shard
[i
].second
.name
,
1197 existing_cfs_shard
[i
].second
.hash_l
,
1198 existing_cfs_shard
[i
].second
.hash_h
,
1199 existing_cfs_shard
[i
].first
,
1202 default_cf
= handles
[handles
.size() - 1];
1203 must_close_default_cf
= true;
1205 if (missing_cfs
.size() > 0 &&
1206 std::find_if(missing_cfs
.begin(), missing_cfs
.end(),
1207 [](const rocksdb::ColumnFamilyDescriptor
& c
) { return c
.name
== resharding_column_lock
; }
1208 ) == missing_cfs
.end())
1210 dout(10) << __func__
<< " missing_cfs=" << missing_cfs
.size() << dendl
;
1211 ceph_assert(recreate_mode
);
1212 ceph_assert(missing_cfs
.size() == missing_cfs_shard
.size());
1213 for (size_t i
= 0; i
< missing_cfs
.size(); i
++) {
1214 rocksdb::ColumnFamilyHandle
*cf
;
1215 status
= db
->CreateColumnFamily(missing_cfs
[i
].options
, missing_cfs
[i
].name
, &cf
);
1217 derr
<< __func__
<< " Failed to create rocksdb column family: "
1218 << missing_cfs
[i
].name
<< dendl
;
1221 add_column_family(missing_cfs_shard
[i
].second
.name
,
1222 missing_cfs_shard
[i
].second
.hash_l
,
1223 missing_cfs_shard
[i
].second
.hash_h
,
1224 missing_cfs_shard
[i
].first
,
1227 opt
.env
->DeleteFile(sharding_recreate
);
1231 ceph_assert(default_cf
!= nullptr);
1233 PerfCountersBuilder
plb(cct
, "rocksdb", l_rocksdb_first
, l_rocksdb_last
);
1234 plb
.add_time_avg(l_rocksdb_get_latency
, "get_latency", "Get latency");
1235 plb
.add_time_avg(l_rocksdb_submit_latency
, "submit_latency", "Submit Latency");
1236 plb
.add_time_avg(l_rocksdb_submit_sync_latency
, "submit_sync_latency", "Submit Sync Latency");
1237 plb
.add_u64_counter(l_rocksdb_compact
, "compact", "Compactions");
1238 plb
.add_u64_counter(l_rocksdb_compact_range
, "compact_range", "Compactions by range");
1239 plb
.add_u64_counter(l_rocksdb_compact_queue_merge
, "compact_queue_merge", "Mergings of ranges in compaction queue");
1240 plb
.add_u64(l_rocksdb_compact_queue_len
, "compact_queue_len", "Length of compaction queue");
1241 plb
.add_time_avg(l_rocksdb_write_wal_time
, "rocksdb_write_wal_time", "Rocksdb write wal time");
1242 plb
.add_time_avg(l_rocksdb_write_memtable_time
, "rocksdb_write_memtable_time", "Rocksdb write memtable time");
1243 plb
.add_time_avg(l_rocksdb_write_delay_time
, "rocksdb_write_delay_time", "Rocksdb write delay time");
1244 plb
.add_time_avg(l_rocksdb_write_pre_and_post_process_time
,
1245 "rocksdb_write_pre_and_post_time", "total time spent on writing a record, excluding write process");
1246 logger
= plb
.create_perf_counters();
1247 cct
->get_perfcounters_collection()->add(logger
);
1249 if (compact_on_mount
) {
1250 derr
<< "Compacting rocksdb store..." << dendl
;
1252 derr
<< "Finished compacting rocksdb store" << dendl
;
1257 int RocksDBStore::_test_init(const string
& dir
)
1259 rocksdb::Options options
;
1260 options
.create_if_missing
= true;
1262 rocksdb::Status status
= rocksdb::DB::Open(options
, dir
, &db
);
1265 return status
.ok() ? 0 : -EIO
;
1268 RocksDBStore::~RocksDBStore()
1272 delete static_cast<rocksdb::Env
*>(priv
);
1276 void RocksDBStore::close()
1278 // stop compaction thread
1279 compact_queue_lock
.lock();
1280 if (compact_thread
.is_started()) {
1281 dout(1) << __func__
<< " waiting for compaction thread to stop" << dendl
;
1282 compact_queue_stop
= true;
1283 compact_queue_cond
.notify_all();
1284 compact_queue_lock
.unlock();
1285 compact_thread
.join();
1286 dout(1) << __func__
<< " compaction thread to stopped" << dendl
;
1288 compact_queue_lock
.unlock();
1292 cct
->get_perfcounters_collection()->remove(logger
);
1297 // Ensure db is destroyed before dependent db_cache and filterpolicy
1298 for (auto& p
: cf_handles
) {
1299 for (size_t i
= 0; i
< p
.second
.handles
.size(); i
++) {
1300 db
->DestroyColumnFamilyHandle(p
.second
.handles
[i
]);
1304 if (must_close_default_cf
) {
1305 db
->DestroyColumnFamilyHandle(default_cf
);
1306 must_close_default_cf
= false;
1308 default_cf
= nullptr;
1313 int RocksDBStore::repair(std::ostream
&out
)
1315 rocksdb::Status status
;
1316 rocksdb::Options opt
;
1317 int r
= load_rocksdb_options(false, opt
);
1319 dout(1) << __func__
<< " load rocksdb options failed" << dendl
;
1320 out
<< "load rocksdb options failed" << std::endl
;
1323 //need to save sharding definition, repairDB will delete files it does not know
1324 std::string stored_sharding_text
;
1325 status
= opt
.env
->FileExists(sharding_def_file
);
1327 status
= rocksdb::ReadFileToString(opt
.env
,
1329 &stored_sharding_text
);
1331 stored_sharding_text
.clear();
1334 dout(10) << __func__
<< " stored_sharding: " << stored_sharding_text
<< dendl
;
1335 status
= rocksdb::RepairDB(path
, opt
);
1336 bool repaired
= status
.ok();
1337 if (!stored_sharding_text
.empty()) {
1338 //recreate markers even if repair failed
1339 opt
.env
->CreateDir(sharding_def_dir
);
1340 status
= rocksdb::WriteStringToFile(opt
.env
, stored_sharding_text
,
1341 sharding_def_file
, true);
1343 derr
<< __func__
<< " cannot write to " << sharding_def_file
<< dendl
;
1346 status
= rocksdb::WriteStringToFile(opt
.env
, "1",
1347 sharding_recreate
, true);
1349 derr
<< __func__
<< " cannot write to " << sharding_recreate
<< dendl
;
1352 // fiinalize sharding recreate
1353 if (do_open(out
, false, false)) {
1354 derr
<< __func__
<< " cannot finalize repair" << dendl
;
1360 if (repaired
&& status
.ok()) {
1363 out
<< "repair rocksdb failed : " << status
.ToString() << std::endl
;
1368 void RocksDBStore::split_stats(const std::string
&s
, char delim
, std::vector
<std::string
> &elems
) {
1369 std::stringstream ss
;
1372 while (std::getline(ss
, item
, delim
)) {
1373 elems
.push_back(item
);
1377 bool RocksDBStore::get_property(
1378 const std::string
&property
,
1381 return db
->GetIntProperty(property
, out
);
1384 int64_t RocksDBStore::estimate_prefix_size(const string
& prefix
,
1385 const string
& key_prefix
)
1388 auto p_iter
= cf_handles
.find(prefix
);
1389 if (p_iter
!= cf_handles
.end()) {
1390 for (auto cf
: p_iter
->second
.handles
) {
1392 string start
= key_prefix
+ string(1, '\x00');
1393 string limit
= key_prefix
+ string("\xff\xff\xff\xff");
1394 rocksdb::Range
r(start
, limit
);
1395 db
->GetApproximateSizes(cf
, &r
, 1, &s
);
1399 string start
= combine_strings(prefix
, key_prefix
);
1400 string limit
= combine_strings(prefix
, key_prefix
+ "\xff\xff\xff\xff");
1401 rocksdb::Range
r(start
, limit
);
1402 db
->GetApproximateSizes(default_cf
, &r
, 1, &size
);
1407 void RocksDBStore::get_statistics(Formatter
*f
)
1409 if (!cct
->_conf
->rocksdb_perf
) {
1410 dout(20) << __func__
<< " RocksDB perf is disabled, can't probe for stats"
1415 if (cct
->_conf
->rocksdb_collect_compaction_stats
) {
1416 std::string stat_str
;
1417 bool status
= db
->GetProperty("rocksdb.stats", &stat_str
);
1419 f
->open_object_section("rocksdb_statistics");
1420 f
->dump_string("rocksdb_compaction_statistics", "");
1421 vector
<string
> stats
;
1422 split_stats(stat_str
, '\n', stats
);
1423 for (auto st
:stats
) {
1424 f
->dump_string("", st
);
1429 if (cct
->_conf
->rocksdb_collect_extended_stats
) {
1431 f
->open_object_section("rocksdb_extended_statistics");
1432 string stat_str
= dbstats
->ToString();
1433 vector
<string
> stats
;
1434 split_stats(stat_str
, '\n', stats
);
1435 f
->dump_string("rocksdb_extended_statistics", "");
1436 for (auto st
:stats
) {
1437 f
->dump_string(".", st
);
1441 f
->open_object_section("rocksdbstore_perf_counters");
1442 logger
->dump_formatted(f
,0);
1445 if (cct
->_conf
->rocksdb_collect_memory_stats
) {
1446 f
->open_object_section("rocksdb_memtable_statistics");
1448 if (!bbt_opts
.no_block_cache
) {
1449 str
.append(stringify(bbt_opts
.block_cache
->GetUsage()));
1450 f
->dump_string("block_cache_usage", str
.data());
1452 str
.append(stringify(bbt_opts
.block_cache
->GetPinnedUsage()));
1453 f
->dump_string("block_cache_pinned_blocks_usage", str
);
1456 db
->GetProperty("rocksdb.cur-size-all-mem-tables", &str
);
1457 f
->dump_string("rocksdb_memtable_usage", str
);
1459 db
->GetProperty("rocksdb.estimate-table-readers-mem", &str
);
1460 f
->dump_string("rocksdb_index_filter_blocks_usage", str
);
1465 struct RocksDBStore::RocksWBHandler
: public rocksdb::WriteBatch::Handler
{
1466 RocksWBHandler(const RocksDBStore
& db
) : db(db
) {}
1467 const RocksDBStore
& db
;
1468 std::stringstream seen
;
1471 void dump(const char* op_name
,
1472 uint32_t column_family_id
,
1473 const rocksdb::Slice
& key_in
,
1474 const rocksdb::Slice
* value
= nullptr) {
1477 ssize_t size
= value
? value
->size() : -1;
1478 seen
<< std::endl
<< op_name
<< "(";
1480 if (column_family_id
== 0) {
1481 db
.split_key(key_in
, &prefix
, &key
);
1483 auto it
= db
.cf_ids_to_prefix
.find(column_family_id
);
1484 ceph_assert(it
!= db
.cf_ids_to_prefix
.end());
1485 prefix
= it
->second
;
1486 key
= key_in
.ToString();
1488 seen
<< " prefix = " << prefix
;
1489 seen
<< " key = " << pretty_binary_string(key
);
1491 seen
<< " value size = " << std::to_string(size
);
1495 void Put(const rocksdb::Slice
& key
,
1496 const rocksdb::Slice
& value
) override
{
1497 dump("Put", 0, key
, &value
);
1499 rocksdb::Status
PutCF(uint32_t column_family_id
, const rocksdb::Slice
& key
,
1500 const rocksdb::Slice
& value
) override
{
1501 dump("PutCF", column_family_id
, key
, &value
);
1502 return rocksdb::Status::OK();
1504 void SingleDelete(const rocksdb::Slice
& key
) override
{
1505 dump("SingleDelete", 0, key
);
1507 rocksdb::Status
SingleDeleteCF(uint32_t column_family_id
, const rocksdb::Slice
& key
) override
{
1508 dump("SingleDeleteCF", column_family_id
, key
);
1509 return rocksdb::Status::OK();
1511 void Delete(const rocksdb::Slice
& key
) override
{
1512 dump("Delete", 0, key
);
1514 rocksdb::Status
DeleteCF(uint32_t column_family_id
, const rocksdb::Slice
& key
) override
{
1515 dump("DeleteCF", column_family_id
, key
);
1516 return rocksdb::Status::OK();
1518 void Merge(const rocksdb::Slice
& key
,
1519 const rocksdb::Slice
& value
) override
{
1520 dump("Merge", 0, key
, &value
);
1522 rocksdb::Status
MergeCF(uint32_t column_family_id
, const rocksdb::Slice
& key
,
1523 const rocksdb::Slice
& value
) override
{
1524 dump("MergeCF", column_family_id
, key
, &value
);
1525 return rocksdb::Status::OK();
1527 bool Continue() override
{ return num_seen
< 50; }
1530 int RocksDBStore::submit_common(rocksdb::WriteOptions
& woptions
, KeyValueDB::Transaction t
)
1532 // enable rocksdb breakdown
1533 // considering performance overhead, default is disabled
1534 if (cct
->_conf
->rocksdb_perf
) {
1535 rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex
);
1536 rocksdb::get_perf_context()->Reset();
1539 RocksDBTransactionImpl
* _t
=
1540 static_cast<RocksDBTransactionImpl
*>(t
.get());
1541 woptions
.disableWAL
= disableWAL
;
1542 lgeneric_subdout(cct
, rocksdb
, 30) << __func__
;
1543 RocksWBHandler
bat_txc(*this);
1544 _t
->bat
.Iterate(&bat_txc
);
1545 *_dout
<< " Rocksdb transaction: " << bat_txc
.seen
.str() << dendl
;
1547 rocksdb::Status s
= db
->Write(woptions
, &_t
->bat
);
1549 RocksWBHandler
rocks_txc(*this);
1550 _t
->bat
.Iterate(&rocks_txc
);
1551 derr
<< __func__
<< " error: " << s
.ToString() << " code = " << s
.code()
1552 << " Rocksdb transaction: " << rocks_txc
.seen
.str() << dendl
;
1555 if (cct
->_conf
->rocksdb_perf
) {
1556 utime_t write_memtable_time
;
1557 utime_t write_delay_time
;
1558 utime_t write_wal_time
;
1559 utime_t write_pre_and_post_process_time
;
1560 write_wal_time
.set_from_double(
1561 static_cast<double>(rocksdb::get_perf_context()->write_wal_time
)/1000000000);
1562 write_memtable_time
.set_from_double(
1563 static_cast<double>(rocksdb::get_perf_context()->write_memtable_time
)/1000000000);
1564 write_delay_time
.set_from_double(
1565 static_cast<double>(rocksdb::get_perf_context()->write_delay_time
)/1000000000);
1566 write_pre_and_post_process_time
.set_from_double(
1567 static_cast<double>(rocksdb::get_perf_context()->write_pre_and_post_process_time
)/1000000000);
1568 logger
->tinc(l_rocksdb_write_memtable_time
, write_memtable_time
);
1569 logger
->tinc(l_rocksdb_write_delay_time
, write_delay_time
);
1570 logger
->tinc(l_rocksdb_write_wal_time
, write_wal_time
);
1571 logger
->tinc(l_rocksdb_write_pre_and_post_process_time
, write_pre_and_post_process_time
);
1574 return s
.ok() ? 0 : -1;
1577 int RocksDBStore::submit_transaction(KeyValueDB::Transaction t
)
1579 utime_t start
= ceph_clock_now();
1580 rocksdb::WriteOptions woptions
;
1581 woptions
.sync
= false;
1583 int result
= submit_common(woptions
, t
);
1585 utime_t lat
= ceph_clock_now() - start
;
1586 logger
->tinc(l_rocksdb_submit_latency
, lat
);
1591 int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t
)
1593 utime_t start
= ceph_clock_now();
1594 rocksdb::WriteOptions woptions
;
1595 // if disableWAL, sync can't set
1596 woptions
.sync
= !disableWAL
;
1598 int result
= submit_common(woptions
, t
);
1600 utime_t lat
= ceph_clock_now() - start
;
1601 logger
->tinc(l_rocksdb_submit_sync_latency
, lat
);
1606 RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore
*_db
)
1611 void RocksDBStore::RocksDBTransactionImpl::put_bat(
1612 rocksdb::WriteBatch
& bat
,
1613 rocksdb::ColumnFamilyHandle
*cf
,
1615 const bufferlist
&to_set_bl
)
1617 // bufferlist::c_str() is non-constant, so we can't call c_str()
1618 if (to_set_bl
.is_contiguous() && to_set_bl
.length() > 0) {
1620 rocksdb::Slice(key
),
1621 rocksdb::Slice(to_set_bl
.buffers().front().c_str(),
1622 to_set_bl
.length()));
1624 rocksdb::Slice
key_slice(key
);
1625 vector
<rocksdb::Slice
> value_slices(to_set_bl
.get_num_buffers());
1627 rocksdb::SliceParts(&key_slice
, 1),
1628 prepare_sliceparts(to_set_bl
, &value_slices
));
1632 void RocksDBStore::RocksDBTransactionImpl::set(
1633 const string
&prefix
,
1635 const bufferlist
&to_set_bl
)
1637 auto cf
= db
->get_cf_handle(prefix
, k
);
1639 put_bat(bat
, cf
, k
, to_set_bl
);
1641 string key
= combine_strings(prefix
, k
);
1642 put_bat(bat
, db
->default_cf
, key
, to_set_bl
);
1646 void RocksDBStore::RocksDBTransactionImpl::set(
1647 const string
&prefix
,
1648 const char *k
, size_t keylen
,
1649 const bufferlist
&to_set_bl
)
1651 auto cf
= db
->get_cf_handle(prefix
, k
, keylen
);
1653 string
key(k
, keylen
); // fixme?
1654 put_bat(bat
, cf
, key
, to_set_bl
);
1657 combine_strings(prefix
, k
, keylen
, &key
);
1658 put_bat(bat
, cf
, key
, to_set_bl
);
1662 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string
&prefix
,
1665 auto cf
= db
->get_cf_handle(prefix
, k
);
1667 bat
.Delete(cf
, rocksdb::Slice(k
));
1669 bat
.Delete(db
->default_cf
, combine_strings(prefix
, k
));
1673 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string
&prefix
,
1677 auto cf
= db
->get_cf_handle(prefix
, k
, keylen
);
1679 bat
.Delete(cf
, rocksdb::Slice(k
, keylen
));
1682 combine_strings(prefix
, k
, keylen
, &key
);
1683 bat
.Delete(db
->default_cf
, rocksdb::Slice(key
));
1687 void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string
&prefix
,
1690 auto cf
= db
->get_cf_handle(prefix
, k
);
1692 bat
.SingleDelete(cf
, k
);
1694 bat
.SingleDelete(db
->default_cf
, combine_strings(prefix
, k
));
1698 void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string
&prefix
)
1700 auto p_iter
= db
->cf_handles
.find(prefix
);
1701 if (p_iter
== db
->cf_handles
.end()) {
1702 uint64_t cnt
= db
->delete_range_threshold
;
1704 auto it
= db
->get_iterator(prefix
);
1705 for (it
->seek_to_first(); it
->valid() && (--cnt
) != 0; it
->next()) {
1706 bat
.Delete(db
->default_cf
, combine_strings(prefix
, it
->key()));
1709 bat
.RollbackToSavePoint();
1710 string endprefix
= prefix
;
1711 endprefix
.push_back('\x01');
1712 bat
.DeleteRange(db
->default_cf
,
1713 combine_strings(prefix
, string()),
1714 combine_strings(endprefix
, string()));
1719 ceph_assert(p_iter
->second
.handles
.size() >= 1);
1720 for (auto cf
: p_iter
->second
.handles
) {
1721 uint64_t cnt
= db
->delete_range_threshold
;
1723 auto it
= db
->new_shard_iterator(cf
);
1724 for (it
->SeekToFirst(); it
->Valid() && (--cnt
) != 0; it
->Next()) {
1725 bat
.Delete(cf
, it
->key());
1728 bat
.RollbackToSavePoint();
1729 string endprefix
= "\xff\xff\xff\xff"; // FIXME: this is cheating...
1730 bat
.DeleteRange(cf
, string(), endprefix
);
1738 void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string
&prefix
,
1739 const string
&start
,
1742 ldout(db
->cct
, 10) << __func__
<< " enter start=" << start
1743 << " end=" << end
<< dendl
;
1744 auto p_iter
= db
->cf_handles
.find(prefix
);
1745 if (p_iter
== db
->cf_handles
.end()) {
1746 uint64_t cnt
= db
->delete_range_threshold
;
1748 auto it
= db
->get_iterator(prefix
);
1749 for (it
->lower_bound(start
);
1750 it
->valid() && db
->comparator
->Compare(it
->key(), end
) < 0 && (--cnt
) != 0;
1752 bat
.Delete(db
->default_cf
, combine_strings(prefix
, it
->key()));
1755 ldout(db
->cct
, 10) << __func__
<< " p_iter == end(), resorting to DeleteRange"
1757 bat
.RollbackToSavePoint();
1758 bat
.DeleteRange(db
->default_cf
,
1759 rocksdb::Slice(combine_strings(prefix
, start
)),
1760 rocksdb::Slice(combine_strings(prefix
, end
)));
1765 ceph_assert(p_iter
->second
.handles
.size() >= 1);
1766 for (auto cf
: p_iter
->second
.handles
) {
1767 uint64_t cnt
= db
->delete_range_threshold
;
1769 rocksdb::Iterator
* it
= db
->new_shard_iterator(cf
);
1770 ceph_assert(it
!= nullptr);
1771 for (it
->Seek(start
);
1772 it
->Valid() && db
->comparator
->Compare(it
->key(), end
) < 0 && (--cnt
) != 0;
1774 bat
.Delete(cf
, it
->key());
1777 ldout(db
->cct
, 10) << __func__
<< " p_iter != end(), resorting to DeleteRange"
1779 bat
.RollbackToSavePoint();
1780 bat
.DeleteRange(cf
, rocksdb::Slice(start
), rocksdb::Slice(end
));
1787 ldout(db
->cct
, 10) << __func__
<< " end" << dendl
;
1790 void RocksDBStore::RocksDBTransactionImpl::merge(
1791 const string
&prefix
,
1793 const bufferlist
&to_set_bl
)
1795 auto cf
= db
->get_cf_handle(prefix
, k
);
1797 // bufferlist::c_str() is non-constant, so we can't call c_str()
1798 if (to_set_bl
.is_contiguous() && to_set_bl
.length() > 0) {
1802 rocksdb::Slice(to_set_bl
.buffers().front().c_str(), to_set_bl
.length()));
1805 rocksdb::Slice
key_slice(k
);
1806 vector
<rocksdb::Slice
> value_slices(to_set_bl
.get_num_buffers());
1807 bat
.Merge(cf
, rocksdb::SliceParts(&key_slice
, 1),
1808 prepare_sliceparts(to_set_bl
, &value_slices
));
1811 string key
= combine_strings(prefix
, k
);
1812 // bufferlist::c_str() is non-constant, so we can't call c_str()
1813 if (to_set_bl
.is_contiguous() && to_set_bl
.length() > 0) {
1816 rocksdb::Slice(key
),
1817 rocksdb::Slice(to_set_bl
.buffers().front().c_str(), to_set_bl
.length()));
1820 rocksdb::Slice
key_slice(key
);
1821 vector
<rocksdb::Slice
> value_slices(to_set_bl
.get_num_buffers());
1824 rocksdb::SliceParts(&key_slice
, 1),
1825 prepare_sliceparts(to_set_bl
, &value_slices
));
1830 int RocksDBStore::get(
1831 const string
&prefix
,
1832 const std::set
<string
> &keys
,
1833 std::map
<string
, bufferlist
> *out
)
1835 rocksdb::PinnableSlice value
;
1836 utime_t start
= ceph_clock_now();
1837 if (cf_handles
.count(prefix
) > 0) {
1838 for (auto& key
: keys
) {
1839 auto cf_handle
= get_cf_handle(prefix
, key
);
1840 auto status
= db
->Get(rocksdb::ReadOptions(),
1842 rocksdb::Slice(key
),
1845 (*out
)[key
].append(value
.data(), value
.size());
1846 } else if (status
.IsIOError()) {
1847 ceph_abort_msg(status
.getState());
1852 for (auto& key
: keys
) {
1853 string k
= combine_strings(prefix
, key
);
1854 auto status
= db
->Get(rocksdb::ReadOptions(),
1859 (*out
)[key
].append(value
.data(), value
.size());
1860 } else if (status
.IsIOError()) {
1861 ceph_abort_msg(status
.getState());
1866 utime_t lat
= ceph_clock_now() - start
;
1867 logger
->tinc(l_rocksdb_get_latency
, lat
);
1871 int RocksDBStore::get(
1872 const string
&prefix
,
1876 ceph_assert(out
&& (out
->length() == 0));
1877 utime_t start
= ceph_clock_now();
1879 rocksdb::PinnableSlice value
;
1881 auto cf
= get_cf_handle(prefix
, key
);
1883 s
= db
->Get(rocksdb::ReadOptions(),
1885 rocksdb::Slice(key
),
1888 string k
= combine_strings(prefix
, key
);
1889 s
= db
->Get(rocksdb::ReadOptions(),
1895 out
->append(value
.data(), value
.size());
1896 } else if (s
.IsNotFound()) {
1899 ceph_abort_msg(s
.getState());
1901 utime_t lat
= ceph_clock_now() - start
;
1902 logger
->tinc(l_rocksdb_get_latency
, lat
);
1906 int RocksDBStore::get(
1907 const string
& prefix
,
1912 ceph_assert(out
&& (out
->length() == 0));
1913 utime_t start
= ceph_clock_now();
1915 rocksdb::PinnableSlice value
;
1917 auto cf
= get_cf_handle(prefix
, key
, keylen
);
1919 s
= db
->Get(rocksdb::ReadOptions(),
1921 rocksdb::Slice(key
, keylen
),
1925 combine_strings(prefix
, key
, keylen
, &k
);
1926 s
= db
->Get(rocksdb::ReadOptions(),
1932 out
->append(value
.data(), value
.size());
1933 } else if (s
.IsNotFound()) {
1936 ceph_abort_msg(s
.getState());
1938 utime_t lat
= ceph_clock_now() - start
;
1939 logger
->tinc(l_rocksdb_get_latency
, lat
);
1943 int RocksDBStore::split_key(rocksdb::Slice in
, string
*prefix
, string
*key
)
1945 size_t prefix_len
= 0;
1947 // Find separator inside Slice
1948 char* separator
= (char*) memchr(in
.data(), 0, in
.size());
1949 if (separator
== NULL
)
1951 prefix_len
= size_t(separator
- in
.data());
1952 if (prefix_len
>= in
.size())
1955 // Fetch prefix and/or key directly from Slice
1957 *prefix
= string(in
.data(), prefix_len
);
1959 *key
= string(separator
+1, in
.size()-prefix_len
-1);
1963 void RocksDBStore::compact()
1965 logger
->inc(l_rocksdb_compact
);
1966 rocksdb::CompactRangeOptions options
;
1967 db
->CompactRange(options
, default_cf
, nullptr, nullptr);
1968 for (auto cf
: cf_handles
) {
1969 for (auto shard_cf
: cf
.second
.handles
) {
1978 void RocksDBStore::compact_thread_entry()
1980 std::unique_lock l
{compact_queue_lock
};
1981 dout(10) << __func__
<< " enter" << dendl
;
1982 while (!compact_queue_stop
) {
1983 if (!compact_queue
.empty()) {
1984 auto range
= compact_queue
.front();
1985 compact_queue
.pop_front();
1986 logger
->set(l_rocksdb_compact_queue_len
, compact_queue
.size());
1988 logger
->inc(l_rocksdb_compact_range
);
1989 if (range
.first
.empty() && range
.second
.empty()) {
1992 compact_range(range
.first
, range
.second
);
1997 dout(10) << __func__
<< " waiting" << dendl
;
1998 compact_queue_cond
.wait(l
);
2000 dout(10) << __func__
<< " exit" << dendl
;
2003 void RocksDBStore::compact_range_async(const string
& start
, const string
& end
)
2005 std::lock_guard
l(compact_queue_lock
);
2007 // try to merge adjacent ranges. this is O(n), but the queue should
2008 // be short. note that we do not cover all overlap cases and merge
2009 // opportunities here, but we capture the ones we currently need.
2010 list
< pair
<string
,string
> >::iterator p
= compact_queue
.begin();
2011 while (p
!= compact_queue
.end()) {
2012 if (p
->first
== start
&& p
->second
== end
) {
2016 if (start
<= p
->first
&& p
->first
<= end
) {
2017 // new region crosses start of existing range
2018 // select right bound that is bigger
2019 compact_queue
.push_back(make_pair(start
, end
> p
->second
? end
: p
->second
));
2020 compact_queue
.erase(p
);
2021 logger
->inc(l_rocksdb_compact_queue_merge
);
2024 if (start
<= p
->second
&& p
->second
<= end
) {
2025 // new region crosses end of existing range
2026 //p->first < p->second and p->second <= end, so p->first <= end.
2027 //But we break if previous condition, so start > p->first.
2028 compact_queue
.push_back(make_pair(p
->first
, end
));
2029 compact_queue
.erase(p
);
2030 logger
->inc(l_rocksdb_compact_queue_merge
);
2035 if (p
== compact_queue
.end()) {
2036 // no merge, new entry.
2037 compact_queue
.push_back(make_pair(start
, end
));
2038 logger
->set(l_rocksdb_compact_queue_len
, compact_queue
.size());
2040 compact_queue_cond
.notify_all();
2041 if (!compact_thread
.is_started()) {
2042 compact_thread
.create("rstore_compact");
2045 bool RocksDBStore::check_omap_dir(string
&omap_dir
)
2047 rocksdb::Options options
;
2048 options
.create_if_missing
= true;
2050 rocksdb::Status status
= rocksdb::DB::Open(options
, omap_dir
, &db
);
2056 void RocksDBStore::compact_range(const string
& start
, const string
& end
)
2058 rocksdb::CompactRangeOptions options
;
2059 rocksdb::Slice
cstart(start
);
2060 rocksdb::Slice
cend(end
);
2061 string prefix_start
, key_start
;
2062 string prefix_end
, key_end
;
2063 string key_highest
= "\xff\xff\xff\xff"; //cheating
2064 string key_lowest
= "";
2066 auto compact_range
= [&] (const decltype(cf_handles
)::iterator column_it
,
2067 const std::string
& start
,
2068 const std::string
& end
) {
2069 rocksdb::Slice
cstart(start
);
2070 rocksdb::Slice
cend(end
);
2071 for (const auto& shard_it
: column_it
->second
.handles
) {
2072 db
->CompactRange(options
, shard_it
, &cstart
, &cend
);
2075 db
->CompactRange(options
, default_cf
, &cstart
, &cend
);
2076 split_key(cstart
, &prefix_start
, &key_start
);
2077 split_key(cend
, &prefix_end
, &key_end
);
2078 if (prefix_start
== prefix_end
) {
2079 const auto& column
= cf_handles
.find(prefix_start
);
2080 if (column
!= cf_handles
.end()) {
2081 compact_range(column
, key_start
, key_end
);
2084 auto column
= cf_handles
.find(prefix_start
);
2085 if (column
!= cf_handles
.end()) {
2086 compact_range(column
, key_start
, key_highest
);
2089 const auto& column_end
= cf_handles
.find(prefix_end
);
2090 while (column
!= column_end
) {
2091 compact_range(column
, key_lowest
, key_highest
);
2094 if (column
!= cf_handles
.end()) {
2095 compact_range(column
, key_lowest
, key_end
);
2100 RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl()
2104 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first()
2106 dbiter
->SeekToFirst();
2107 ceph_assert(!dbiter
->status().IsIOError());
2108 return dbiter
->status().ok() ? 0 : -1;
2110 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string
&prefix
)
2112 rocksdb::Slice
slice_prefix(prefix
);
2113 dbiter
->Seek(slice_prefix
);
2114 ceph_assert(!dbiter
->status().IsIOError());
2115 return dbiter
->status().ok() ? 0 : -1;
2117 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last()
2119 dbiter
->SeekToLast();
2120 ceph_assert(!dbiter
->status().IsIOError());
2121 return dbiter
->status().ok() ? 0 : -1;
2123 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string
&prefix
)
2125 string limit
= past_prefix(prefix
);
2126 rocksdb::Slice
slice_limit(limit
);
2127 dbiter
->Seek(slice_limit
);
2129 if (!dbiter
->Valid()) {
2130 dbiter
->SeekToLast();
2134 return dbiter
->status().ok() ? 0 : -1;
2136 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string
&prefix
, const string
&after
)
2138 lower_bound(prefix
, after
);
2140 pair
<string
,string
> key
= raw_key();
2141 if (key
.first
== prefix
&& key
.second
== after
)
2144 return dbiter
->status().ok() ? 0 : -1;
2146 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string
&prefix
, const string
&to
)
2148 string bound
= combine_strings(prefix
, to
);
2149 rocksdb::Slice
slice_bound(bound
);
2150 dbiter
->Seek(slice_bound
);
2151 return dbiter
->status().ok() ? 0 : -1;
2153 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid()
2155 return dbiter
->Valid();
2157 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next()
2162 ceph_assert(!dbiter
->status().IsIOError());
2163 return dbiter
->status().ok() ? 0 : -1;
2165 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev()
2170 ceph_assert(!dbiter
->status().IsIOError());
2171 return dbiter
->status().ok() ? 0 : -1;
2173 string
RocksDBStore::RocksDBWholeSpaceIteratorImpl::key()
2176 split_key(dbiter
->key(), 0, &out_key
);
2179 pair
<string
,string
> RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key()
2182 split_key(dbiter
->key(), &prefix
, &key
);
2183 return make_pair(prefix
, key
);
2186 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key_is_prefixed(const string
&prefix
) {
2187 // Look for "prefix\0" right in rocksb::Slice
2188 rocksdb::Slice key
= dbiter
->key();
2189 if ((key
.size() > prefix
.length()) && (key
[prefix
.length()] == '\0')) {
2190 return memcmp(key
.data(), prefix
.c_str(), prefix
.length()) == 0;
2196 bufferlist
RocksDBStore::RocksDBWholeSpaceIteratorImpl::value()
2198 return to_bufferlist(dbiter
->value());
2201 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::key_size()
2203 return dbiter
->key().size();
2206 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_size()
2208 return dbiter
->value().size();
2211 bufferptr
RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_as_ptr()
2213 rocksdb::Slice val
= dbiter
->value();
2214 return bufferptr(val
.data(), val
.size());
2217 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status()
2219 return dbiter
->status().ok() ? 0 : -1;
2222 string
RocksDBStore::past_prefix(const string
&prefix
)
2224 string limit
= prefix
;
2229 class CFIteratorImpl
: public KeyValueDB::IteratorImpl
{
2232 rocksdb::Iterator
*dbiter
;
2233 const KeyValueDB::IteratorBounds bounds
;
2234 const rocksdb::Slice iterate_lower_bound
;
2235 const rocksdb::Slice iterate_upper_bound
;
2237 explicit CFIteratorImpl(const RocksDBStore
* db
,
2238 const std::string
& p
,
2239 rocksdb::ColumnFamilyHandle
* cf
,
2240 KeyValueDB::IteratorBounds bounds_
)
2241 : prefix(p
), bounds(std::move(bounds_
)),
2242 iterate_lower_bound(make_slice(bounds
.lower_bound
)),
2243 iterate_upper_bound(make_slice(bounds
.upper_bound
))
2245 auto options
= rocksdb::ReadOptions();
2246 if (db
->cct
->_conf
->osd_rocksdb_iterator_bounds_enabled
) {
2247 if (bounds
.lower_bound
) {
2248 options
.iterate_lower_bound
= &iterate_lower_bound
;
2250 if (bounds
.upper_bound
) {
2251 options
.iterate_upper_bound
= &iterate_upper_bound
;
2254 dbiter
= db
->db
->NewIterator(options
, cf
);
2260 int seek_to_first() override
{
2261 dbiter
->SeekToFirst();
2262 return dbiter
->status().ok() ? 0 : -1;
2264 int seek_to_last() override
{
2265 dbiter
->SeekToLast();
2266 return dbiter
->status().ok() ? 0 : -1;
2268 int upper_bound(const string
&after
) override
{
2270 if (valid() && (key() == after
)) {
2273 return dbiter
->status().ok() ? 0 : -1;
2275 int lower_bound(const string
&to
) override
{
2276 rocksdb::Slice
slice_bound(to
);
2277 dbiter
->Seek(slice_bound
);
2278 return dbiter
->status().ok() ? 0 : -1;
2280 int next() override
{
2284 return dbiter
->status().ok() ? 0 : -1;
2286 int prev() override
{
2290 return dbiter
->status().ok() ? 0 : -1;
2292 bool valid() override
{
2293 return dbiter
->Valid();
2295 string
key() override
{
2296 return dbiter
->key().ToString();
2298 std::pair
<std::string
, std::string
> raw_key() override
{
2299 return make_pair(prefix
, key());
2301 bufferlist
value() override
{
2302 return to_bufferlist(dbiter
->value());
2304 bufferptr
value_as_ptr() override
{
2305 rocksdb::Slice val
= dbiter
->value();
2306 return bufferptr(val
.data(), val
.size());
2308 int status() override
{
2309 return dbiter
->status().ok() ? 0 : -1;
2314 //merge column iterators and rest iterator
2315 class WholeMergeIteratorImpl
: public KeyValueDB::WholeSpaceIteratorImpl
{
2318 KeyValueDB::WholeSpaceIterator main
;
2319 std::map
<std::string
, KeyValueDB::Iterator
> shards
;
2320 std::map
<std::string
, KeyValueDB::Iterator
>::iterator current_shard
;
2321 enum {on_main
, on_shard
} smaller
;
2324 WholeMergeIteratorImpl(RocksDBStore
* db
)
2326 , main(db
->get_default_cf_iterator())
2328 for (auto& e
: db
->cf_handles
) {
2329 shards
.emplace(e
.first
, db
->get_iterator(e
.first
));
2333 // returns true if value in main is smaller then in shards
2334 // invalid is larger then actual value
2335 bool is_main_smaller() {
2336 if (main
->valid()) {
2337 if (current_shard
!= shards
.end()) {
2338 auto main_rk
= main
->raw_key();
2339 ceph_assert(current_shard
->second
->valid());
2340 auto shards_rk
= current_shard
->second
->raw_key();
2341 if (main_rk
.first
< shards_rk
.first
)
2343 if (main_rk
.first
> shards_rk
.first
)
2345 return main_rk
.second
< shards_rk
.second
;
2350 if (current_shard
!= shards
.end()) {
2353 //this means that neither is valid
2354 //we select main to be smaller, so valid() will signal properly
2360 int seek_to_first() override
{
2361 int r0
= main
->seek_to_first();
2363 // find first shard that has some data
2364 current_shard
= shards
.begin();
2365 while (current_shard
!= shards
.end()) {
2366 r1
= current_shard
->second
->seek_to_first();
2367 if (r1
!= 0 || current_shard
->second
->valid()) {
2368 //this is the first shard that will yield some keys
2373 smaller
= is_main_smaller() ? on_main
: on_shard
;
2374 return r0
== 0 && r1
== 0 ? 0 : -1;
2377 int seek_to_first(const std::string
&prefix
) override
{
2378 int r0
= main
->seek_to_first(prefix
);
2380 // find first shard that has some data
2381 current_shard
= shards
.lower_bound(prefix
);
2382 while (current_shard
!= shards
.end()) {
2383 r1
= current_shard
->second
->seek_to_first();
2384 if (r1
!= 0 || current_shard
->second
->valid()) {
2385 //this is the first shard that will yield some keys
2390 smaller
= is_main_smaller() ? on_main
: on_shard
;
2391 return r0
== 0 && r1
== 0 ? 0 : -1;
2394 int seek_to_last() override
{
2395 int r0
= main
->seek_to_last();
2397 r1
= shards_seek_to_last();
2398 //if we have 2 candidates, we need to select
2399 if (main
->valid()) {
2400 if (shards_valid()) {
2401 if (is_main_smaller()) {
2412 if (shards_valid()) {
2418 return r0
== 0 && r1
== 0 ? 0 : -1;
2421 int seek_to_last(const std::string
&prefix
) override
{
2422 int r0
= main
->seek_to_last(prefix
);
2424 // find last shard that has some data
2426 current_shard
= shards
.lower_bound(prefix
);
2427 while (current_shard
!= shards
.begin()) {
2428 r1
= current_shard
->second
->seek_to_last();
2431 if (current_shard
->second
->valid()) {
2436 //if we have 2 candidates, we need to select
2437 if (main
->valid() && found
) {
2438 if (is_main_smaller()) {
2445 //set shards state that properly represents eof
2446 current_shard
= shards
.end();
2448 smaller
= is_main_smaller() ? on_main
: on_shard
;
2449 return r0
== 0 && r1
== 0 ? 0 : -1;
2452 int upper_bound(const std::string
&prefix
, const std::string
&after
) override
{
2453 int r0
= main
->upper_bound(prefix
, after
);
2457 current_shard
= shards
.lower_bound(prefix
);
2458 if (current_shard
!= shards
.end()) {
2459 bool located
= false;
2460 if (current_shard
->first
== prefix
) {
2461 r1
= current_shard
->second
->upper_bound(after
);
2464 if (current_shard
->second
->valid()) {
2469 while (current_shard
!= shards
.end()) {
2470 r1
= current_shard
->second
->seek_to_first();
2473 if (current_shard
->second
->valid())
2479 smaller
= is_main_smaller() ? on_main
: on_shard
;
2483 int lower_bound(const std::string
&prefix
, const std::string
&to
) override
{
2484 int r0
= main
->lower_bound(prefix
, to
);
2488 current_shard
= shards
.lower_bound(prefix
);
2489 if (current_shard
!= shards
.end()) {
2490 bool located
= false;
2491 if (current_shard
->first
== prefix
) {
2492 r1
= current_shard
->second
->lower_bound(to
);
2495 if (current_shard
->second
->valid()) {
2500 while (current_shard
!= shards
.end()) {
2501 r1
= current_shard
->second
->seek_to_first();
2504 if (current_shard
->second
->valid())
2510 smaller
= is_main_smaller() ? on_main
: on_shard
;
2514 bool valid() override
{
2515 if (smaller
== on_main
) {
2516 return main
->valid();
2518 if (current_shard
== shards
.end())
2520 return current_shard
->second
->valid();
2524 int next() override
{
2526 if (smaller
== on_main
) {
2533 smaller
= is_main_smaller() ? on_main
: on_shard
;
2537 int prev() override
{
2539 bool main_was_valid
= false;
2540 if (main
->valid()) {
2541 main_was_valid
= true;
2544 r
= main
->seek_to_last();
2549 bool shards_was_valid
= false;
2550 if (shards_valid()) {
2551 shards_was_valid
= true;
2554 r
= shards_seek_to_last();
2559 if (!main
->valid() && !shards_valid()) {
2560 //end, no previous. set marker so valid() can work
2565 //if 1 is valid, select it
2566 //if 2 are valid select larger and advance the other
2567 if (main
->valid()) {
2568 if (shards_valid()) {
2569 if (is_main_smaller()) {
2571 if (main_was_valid
) {
2572 if (main
->valid()) {
2575 r
= main
->seek_to_first();
2578 //if we have resurrected main, kill it
2579 if (main
->valid()) {
2585 if (shards_was_valid
) {
2586 if (shards_valid()) {
2589 r
= shards_seek_to_first();
2592 //if we have resurected shards, kill it
2593 if (shards_valid()) {
2600 r
= shards_seek_to_first();
2604 r
= main
->seek_to_first();
2609 std::string
key() override
2611 if (smaller
== on_main
) {
2614 return current_shard
->second
->key();
2618 std::pair
<std::string
,std::string
> raw_key() override
2620 if (smaller
== on_main
) {
2621 return main
->raw_key();
2623 return { current_shard
->first
, current_shard
->second
->key() };
2627 bool raw_key_is_prefixed(const std::string
&prefix
) override
2629 if (smaller
== on_main
) {
2630 return main
->raw_key_is_prefixed(prefix
);
2632 return current_shard
->first
== prefix
;
2636 ceph::buffer::list
value() override
2638 if (smaller
== on_main
) {
2639 return main
->value();
2641 return current_shard
->second
->value();
2645 int status() override
2647 //because we already had to inspect key, it must be ok
2651 size_t key_size() override
2653 if (smaller
== on_main
) {
2654 return main
->key_size();
2656 return current_shard
->second
->key().size();
2659 size_t value_size() override
2661 if (smaller
== on_main
) {
2662 return main
->value_size();
2664 return current_shard
->second
->value().length();
2668 int shards_valid() {
2669 if (current_shard
== shards
.end())
2671 return current_shard
->second
->valid();
2675 if (current_shard
== shards
.end()) {
2676 //illegal to next() on !valid()
2680 r
= current_shard
->second
->next();
2683 if (current_shard
->second
->valid())
2685 //current shard exhaused, search for key
2687 while (current_shard
!= shards
.end()) {
2688 r
= current_shard
->second
->seek_to_first();
2691 if (current_shard
->second
->valid())
2695 //either we found key or not, but it is success
2700 if (current_shard
== shards
.end()) {
2701 //illegal to prev() on !valid()
2704 int r
= current_shard
->second
->prev();
2706 if (current_shard
->second
->valid()) {
2709 if (current_shard
== shards
.begin()) {
2710 //we have reached pre-first element
2711 //this makes it !valid(), but guarantees next() moves to first element
2715 r
= current_shard
->second
->seek_to_last();
2720 int shards_seek_to_last() {
2722 current_shard
= shards
.end();
2723 if (current_shard
== shards
.begin()) {
2727 while (current_shard
!= shards
.begin()) {
2729 r
= current_shard
->second
->seek_to_last();
2732 if (current_shard
->second
->valid()) {
2737 current_shard
= shards
.end();
2741 int shards_seek_to_first() {
2743 current_shard
= shards
.begin();
2744 while (current_shard
!= shards
.end()) {
2745 r
= current_shard
->second
->seek_to_first();
2748 if (current_shard
->second
->valid()) {
2749 //this is the first shard that will yield some keys
2758 class ShardMergeIteratorImpl
: public KeyValueDB::IteratorImpl
{
2762 const rocksdb::Comparator
* comparator
;
2764 KeyLess(const rocksdb::Comparator
* comparator
) : comparator(comparator
) { };
2766 bool operator()(rocksdb::Iterator
* a
, rocksdb::Iterator
* b
) const
2770 return comparator
->Compare(a
->key(), b
->key()) < 0;
2784 const RocksDBStore
* db
;
2787 const KeyValueDB::IteratorBounds bounds
;
2788 const rocksdb::Slice iterate_lower_bound
;
2789 const rocksdb::Slice iterate_upper_bound
;
2790 std::vector
<rocksdb::Iterator
*> iters
;
2792 explicit ShardMergeIteratorImpl(const RocksDBStore
* db
,
2793 const std::string
& prefix
,
2794 const std::vector
<rocksdb::ColumnFamilyHandle
*>& shards
,
2795 KeyValueDB::IteratorBounds bounds_
)
2796 : db(db
), keyless(db
->comparator
), prefix(prefix
), bounds(std::move(bounds_
)),
2797 iterate_lower_bound(make_slice(bounds
.lower_bound
)),
2798 iterate_upper_bound(make_slice(bounds
.upper_bound
))
2800 iters
.reserve(shards
.size());
2801 auto options
= rocksdb::ReadOptions();
2802 if (db
->cct
->_conf
->osd_rocksdb_iterator_bounds_enabled
) {
2803 if (bounds
.lower_bound
) {
2804 options
.iterate_lower_bound
= &iterate_lower_bound
;
2806 if (bounds
.upper_bound
) {
2807 options
.iterate_upper_bound
= &iterate_upper_bound
;
2810 for (auto& s
: shards
) {
2811 iters
.push_back(db
->db
->NewIterator(options
, s
));
2814 ~ShardMergeIteratorImpl() {
2815 for (auto& it
: iters
) {
2819 int seek_to_first() override
{
2820 for (auto& it
: iters
) {
2822 if (!it
->status().ok()) {
2826 //all iterators seeked, sort
2827 std::sort(iters
.begin(), iters
.end(), keyless
);
2830 int seek_to_last() override
{
2831 for (auto& it
: iters
) {
2833 if (!it
->status().ok()) {
2837 for (size_t i
= 1; i
< iters
.size(); i
++) {
2838 if (iters
[0]->Valid()) {
2839 if (iters
[i
]->Valid()) {
2840 if (keyless(iters
[0], iters
[i
])) {
2841 std::swap(iters
[0], iters
[i
]);
2847 if (iters
[i
]->Valid()) {
2848 std::swap(iters
[0], iters
[i
]);
2851 //it might happen that cf was empty
2852 if (iters
[i
]->Valid()) {
2856 //no need to sort, as at most 1 iterator is valid now
2859 int upper_bound(const string
&after
) override
{
2860 rocksdb::Slice
slice_bound(after
);
2861 for (auto& it
: iters
) {
2862 it
->Seek(slice_bound
);
2863 if (it
->Valid() && it
->key() == after
) {
2866 if (!it
->status().ok()) {
2870 std::sort(iters
.begin(), iters
.end(), keyless
);
2873 int lower_bound(const string
&to
) override
{
2874 rocksdb::Slice
slice_bound(to
);
2875 for (auto& it
: iters
) {
2876 it
->Seek(slice_bound
);
2877 if (!it
->status().ok()) {
2881 std::sort(iters
.begin(), iters
.end(), keyless
);
2884 int next() override
{
2886 if (iters
[0]->Valid()) {
2888 if (iters
[0]->status().ok()) {
2891 for (size_t i
= 0; i
< iters
.size() - 1; i
++) {
2892 if (keyless(iters
[i
], iters
[i
+ 1])) {
2896 std::swap(iters
[i
], iters
[i
+ 1]);
2902 // iters are sorted, so
2903 // a[0] < b[0] < c[0] < d[0]
2904 // a[0] > a[-1], a[0] > b[-1], a[0] > c[-1], a[0] > d[-1]
2905 // so, prev() will be one of:
2906 // a[-1], b[-1], c[-1], d[-1]
2907 // prev() will be the one that is *largest* of them
2910 // 1. go prev() on each iterator we can
2911 // 2. select largest key from those iterators
2912 // 3. go next() on all iterators except (2)
2914 int prev() override
{
2915 std::vector
<rocksdb::Iterator
*> prev_done
;
2917 for (auto it
: iters
) {
2921 prev_done
.push_back(it
);
2928 prev_done
.push_back(it
);
2932 if (prev_done
.size() == 0) {
2933 /* there is no previous element */
2934 if (iters
[0]->Valid()) {
2936 ceph_assert(!iters
[0]->Valid());
2941 rocksdb::Iterator
* highest
= prev_done
[0];
2942 for (size_t i
= 1; i
< prev_done
.size(); i
++) {
2943 if (keyless(highest
, prev_done
[i
])) {
2945 highest
= prev_done
[i
];
2947 prev_done
[i
]->Next();
2951 //insert highest in the beginning, and shift values until we pick highest
2952 //untouched rest is sorted - we just prev()/next() them
2953 rocksdb::Iterator
* hold
= highest
;
2954 for (size_t i
= 0; i
< iters
.size(); i
++) {
2955 std::swap(hold
, iters
[i
]);
2956 if (hold
== highest
) break;
2958 ceph_assert(hold
== highest
);
2961 bool valid() override
{
2962 return iters
[0]->Valid();
2964 string
key() override
{
2965 return iters
[0]->key().ToString();
2967 std::pair
<std::string
, std::string
> raw_key() override
{
2968 return make_pair(prefix
, key());
2970 bufferlist
value() override
{
2971 return to_bufferlist(iters
[0]->value());
2973 bufferptr
value_as_ptr() override
{
2974 rocksdb::Slice val
= iters
[0]->value();
2975 return bufferptr(val
.data(), val
.size());
2977 int status() override
{
2978 return iters
[0]->status().ok() ? 0 : -1;
2982 KeyValueDB::Iterator
RocksDBStore::get_iterator(const std::string
& prefix
, IteratorOpts opts
, IteratorBounds bounds
)
2984 auto cf_it
= cf_handles
.find(prefix
);
2985 if (cf_it
!= cf_handles
.end()) {
2986 rocksdb::ColumnFamilyHandle
* cf
= nullptr;
2987 if (cf_it
->second
.handles
.size() == 1) {
2988 cf
= cf_it
->second
.handles
[0];
2989 } else if (cct
->_conf
->osd_rocksdb_iterator_bounds_enabled
) {
2990 cf
= get_cf_handle(prefix
, bounds
);
2993 return std::make_shared
<CFIteratorImpl
>(
2999 return std::make_shared
<ShardMergeIteratorImpl
>(
3002 cf_it
->second
.handles
,
3006 return KeyValueDB::get_iterator(prefix
, opts
);
3010 rocksdb::Iterator
* RocksDBStore::new_shard_iterator(rocksdb::ColumnFamilyHandle
* cf
)
3012 return db
->NewIterator(rocksdb::ReadOptions(), cf
);
3015 RocksDBStore::WholeSpaceIterator
RocksDBStore::get_wholespace_iterator(IteratorOpts opts
)
3017 if (cf_handles
.size() == 0) {
3018 return std::make_shared
<RocksDBWholeSpaceIteratorImpl
>(
3019 this, default_cf
, opts
);
3021 return std::make_shared
<WholeMergeIteratorImpl
>(this);
3025 RocksDBStore::WholeSpaceIterator
RocksDBStore::get_default_cf_iterator()
3027 return std::make_shared
<RocksDBWholeSpaceIteratorImpl
>(this, default_cf
, 0);
3030 int RocksDBStore::prepare_for_reshard(const std::string
& new_sharding
,
3031 RocksDBStore::columns_t
& to_process_columns
)
3033 //0. lock db from opening
3034 //1. list existing columns
3035 //2. apply merge operator to (main + columns) opts
3036 //3. prepare std::vector<rocksdb::ColumnFamilyDescriptor> existing_cfs
3037 //4. open db, acquire existing column handles
3038 //5. calculate missing columns
3039 //6. create missing columns
3040 //7. construct cf_handles according to new sharding
3041 //8. check is all cf_handles are filled
3044 std::vector
<ColumnFamily
> new_sharding_def
;
3045 char const* error_position
;
3046 std::string error_msg
;
3047 b
= parse_sharding_def(new_sharding
, new_sharding_def
, &error_position
, &error_msg
);
3049 dout(1) << __func__
<< " bad sharding: " << dendl
;
3050 dout(1) << __func__
<< new_sharding
<< dendl
;
3051 dout(1) << __func__
<< std::string(error_position
- &new_sharding
[0], ' ') << "^" << error_msg
<< dendl
;
3055 //0. lock db from opening
3056 std::string stored_sharding_text
;
3057 rocksdb::ReadFileToString(env
,
3059 &stored_sharding_text
);
3060 if (stored_sharding_text
.find(resharding_column_lock
) == string::npos
) {
3061 rocksdb::Status status
;
3062 if (stored_sharding_text
.size() != 0)
3063 stored_sharding_text
+= " ";
3064 stored_sharding_text
+= resharding_column_lock
;
3065 env
->CreateDir(sharding_def_dir
);
3066 status
= rocksdb::WriteStringToFile(env
, stored_sharding_text
,
3067 sharding_def_file
, true);
3069 derr
<< __func__
<< " cannot write to " << sharding_def_file
<< dendl
;
3074 //1. list existing columns
3076 rocksdb::Status status
;
3077 std::vector
<std::string
> existing_columns
;
3078 rocksdb::Options opt
;
3079 int r
= load_rocksdb_options(false, opt
);
3081 dout(1) << __func__
<< " load rocksdb options failed" << dendl
;
3084 status
= rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(opt
), path
, &existing_columns
);
3086 derr
<< "Unable to list column families: " << status
.ToString() << dendl
;
3089 dout(5) << "existing columns = " << existing_columns
<< dendl
;
3091 //2. apply merge operator to (main + columns) opts
3092 //3. prepare std::vector<rocksdb::ColumnFamilyDescriptor> cfs_to_open
3094 std::vector
<rocksdb::ColumnFamilyDescriptor
> cfs_to_open
;
3095 for (const auto& full_name
: existing_columns
) {
3096 //split col_name to <prefix>-<number>
3097 std::string base_name
;
3098 size_t pos
= full_name
.find('-');
3099 if (std::string::npos
== pos
)
3100 base_name
= full_name
;
3102 base_name
= full_name
.substr(0,pos
);
3104 rocksdb::ColumnFamilyOptions
cf_opt(opt
);
3105 // search if we have options for this column
3106 std::string options
;
3107 for (const auto& nsd
: new_sharding_def
) {
3108 if (nsd
.name
== base_name
) {
3109 options
= nsd
.options
;
3113 int r
= update_column_family_options(base_name
, options
, &cf_opt
);
3117 cfs_to_open
.emplace_back(full_name
, cf_opt
);
3120 //4. open db, acquire existing column handles
3121 std::vector
<rocksdb::ColumnFamilyHandle
*> handles
;
3122 status
= rocksdb::DB::Open(rocksdb::DBOptions(opt
),
3123 path
, cfs_to_open
, &handles
, &db
);
3125 derr
<< status
.ToString() << dendl
;
3128 for (size_t i
= 0; i
< cfs_to_open
.size(); i
++) {
3129 dout(10) << "column " << cfs_to_open
[i
].name
<< " handle " << (void*)handles
[i
] << dendl
;
3132 //5. calculate missing columns
3133 std::vector
<std::string
> new_sharding_columns
;
3134 std::vector
<std::string
> missing_columns
;
3135 sharding_def_to_columns(new_sharding_def
,
3136 new_sharding_columns
);
3137 dout(5) << "target columns = " << new_sharding_columns
<< dendl
;
3138 for (const auto& n
: new_sharding_columns
) {
3140 for (const auto& e
: existing_columns
) {
3147 missing_columns
.push_back(n
);
3150 dout(5) << "missing columns = " << missing_columns
<< dendl
;
3152 //6. create missing columns
3153 for (const auto& full_name
: missing_columns
) {
3154 std::string base_name
;
3155 size_t pos
= full_name
.find('-');
3156 if (std::string::npos
== pos
)
3157 base_name
= full_name
;
3159 base_name
= full_name
.substr(0,pos
);
3161 rocksdb::ColumnFamilyOptions
cf_opt(opt
);
3162 // search if we have options for this column
3163 std::string options
;
3164 for (const auto& nsd
: new_sharding_def
) {
3165 if (nsd
.name
== base_name
) {
3166 options
= nsd
.options
;
3170 int r
= update_column_family_options(base_name
, options
, &cf_opt
);
3174 rocksdb::ColumnFamilyHandle
*cf
;
3175 status
= db
->CreateColumnFamily(cf_opt
, full_name
, &cf
);
3177 derr
<< __func__
<< " Failed to create rocksdb column family: "
3178 << full_name
<< dendl
;
3181 dout(10) << "created column " << full_name
<< " handle = " << (void*)cf
<< dendl
;
3182 existing_columns
.push_back(full_name
);
3183 handles
.push_back(cf
);
3186 //7. construct cf_handles according to new sharding
3187 for (size_t i
= 0; i
< existing_columns
.size(); i
++) {
3188 std::string full_name
= existing_columns
[i
];
3189 rocksdb::ColumnFamilyHandle
*cf
= handles
[i
];
3190 std::string base_name
;
3191 size_t shard_idx
= 0;
3192 size_t pos
= full_name
.find('-');
3193 dout(10) << "processing column " << full_name
<< dendl
;
3194 if (std::string::npos
== pos
) {
3195 base_name
= full_name
;
3197 base_name
= full_name
.substr(0,pos
);
3198 shard_idx
= atoi(full_name
.substr(pos
+1).c_str());
3200 if (rocksdb::kDefaultColumnFamilyName
== base_name
) {
3201 default_cf
= handles
[i
];
3202 must_close_default_cf
= true;
3203 std::unique_ptr
<rocksdb::ColumnFamilyHandle
, cf_deleter_t
> ptr
{
3204 cf
, [](rocksdb::ColumnFamilyHandle
*) {}};
3205 to_process_columns
.emplace(full_name
, std::move(ptr
));
3207 for (const auto& nsd
: new_sharding_def
) {
3208 if (nsd
.name
== base_name
) {
3209 if (shard_idx
< nsd
.shard_cnt
) {
3210 add_column_family(base_name
, nsd
.hash_l
, nsd
.hash_h
, shard_idx
, cf
);
3212 //ignore columns with index larger then shard count
3217 std::unique_ptr
<rocksdb::ColumnFamilyHandle
, cf_deleter_t
> ptr
{
3218 cf
, [this](rocksdb::ColumnFamilyHandle
* handle
) {
3219 db
->DestroyColumnFamilyHandle(handle
);
3221 to_process_columns
.emplace(full_name
, std::move(ptr
));
3225 //8. check if all cf_handles are filled
3226 for (const auto& col
: cf_handles
) {
3227 for (size_t i
= 0; i
< col
.second
.handles
.size(); i
++) {
3228 if (col
.second
.handles
[i
] == nullptr) {
3229 derr
<< "missing handle for column " << col
.first
<< " shard " << i
<< dendl
;
3237 int RocksDBStore::reshard_cleanup(const RocksDBStore::columns_t
& current_columns
)
3239 std::vector
<std::string
> new_sharding_columns
;
3240 for (const auto& [name
, handle
] : cf_handles
) {
3241 if (handle
.handles
.size() == 1) {
3242 new_sharding_columns
.push_back(name
);
3244 for (size_t i
= 0; i
< handle
.handles
.size(); i
++) {
3245 new_sharding_columns
.push_back(name
+ "-" + std::to_string(i
));
3250 for (auto& [name
, handle
] : current_columns
) {
3251 auto found
= std::find(new_sharding_columns
.begin(),
3252 new_sharding_columns
.end(),
3253 name
) != new_sharding_columns
.end();
3254 if (found
|| name
== rocksdb::kDefaultColumnFamilyName
) {
3255 dout(5) << "Column " << name
<< " is part of new sharding." << dendl
;
3258 dout(5) << "Column " << name
<< " not part of new sharding. Deleting." << dendl
;
3260 // verify that column is empty
3261 std::unique_ptr
<rocksdb::Iterator
> it
{
3262 db
->NewIterator(rocksdb::ReadOptions(), handle
.get())};
3265 ceph_assert(!it
->Valid());
3267 if (rocksdb::Status status
= db
->DropColumnFamily(handle
.get()); !status
.ok()) {
3268 derr
<< __func__
<< " Failed to drop column: " << name
<< dendl
;
3275 int RocksDBStore::reshard(const std::string
& new_sharding
, const RocksDBStore::resharding_ctrl
* ctrl_in
)
3278 resharding_ctrl ctrl
= ctrl_in
? *ctrl_in
: resharding_ctrl();
3279 size_t bytes_in_batch
= 0;
3280 size_t keys_in_batch
= 0;
3281 size_t bytes_per_iterator
= 0;
3282 size_t keys_per_iterator
= 0;
3283 size_t keys_processed
= 0;
3284 size_t keys_moved
= 0;
3286 auto flush_batch
= [&](rocksdb::WriteBatch
* batch
) {
3287 dout(10) << "flushing batch, " << keys_in_batch
<< " keys, for "
3288 << bytes_in_batch
<< " bytes" << dendl
;
3289 rocksdb::WriteOptions woptions
;
3290 woptions
.sync
= true;
3291 rocksdb::Status s
= db
->Write(woptions
, batch
);
3292 ceph_assert(s
.ok());
3298 auto process_column
= [&](rocksdb::ColumnFamilyHandle
* handle
,
3299 const std::string
& fixed_prefix
)
3301 dout(5) << " column=" << (void*)handle
<< " prefix=" << fixed_prefix
<< dendl
;
3302 std::unique_ptr
<rocksdb::Iterator
> it
{
3303 db
->NewIterator(rocksdb::ReadOptions(), handle
)};
3306 rocksdb::WriteBatch bat
;
3307 for (it
->SeekToFirst(); it
->Valid(); it
->Next()) {
3308 rocksdb::Slice raw_key
= it
->key();
3309 dout(30) << "key=" << pretty_binary_string(raw_key
.ToString()) << dendl
;
3310 //check if need to refresh iterator
3311 if (bytes_per_iterator
>= ctrl
.bytes_per_iterator
||
3312 keys_per_iterator
>= ctrl
.keys_per_iterator
) {
3313 dout(8) << "refreshing iterator" << dendl
;
3314 bytes_per_iterator
= 0;
3315 keys_per_iterator
= 0;
3316 std::string raw_key_str
= raw_key
.ToString();
3317 it
.reset(db
->NewIterator(rocksdb::ReadOptions(), handle
));
3319 it
->Seek(raw_key_str
);
3320 ceph_assert(it
->Valid());
3321 raw_key
= it
->key();
3323 rocksdb::Slice value
= it
->value();
3324 std::string prefix
, key
;
3325 if (fixed_prefix
.size() == 0) {
3326 split_key(raw_key
, &prefix
, &key
);
3328 prefix
= fixed_prefix
;
3329 key
= raw_key
.ToString();
3332 if ((keys_processed
% 10000) == 0) {
3333 dout(10) << "processed " << keys_processed
<< " keys, moved " << keys_moved
<< dendl
;
3335 rocksdb::ColumnFamilyHandle
* new_handle
= get_cf_handle(prefix
, key
);
3336 if (new_handle
== nullptr) {
3337 new_handle
= default_cf
;
3339 if (handle
== new_handle
) {
3342 std::string new_raw_key
;
3343 if (new_handle
== default_cf
) {
3344 new_raw_key
= combine_strings(prefix
, key
);
3348 bat
.Delete(handle
, raw_key
);
3349 bat
.Put(new_handle
, new_raw_key
, value
);
3350 dout(25) << "moving " << (void*)handle
<< "/" << pretty_binary_string(raw_key
.ToString()) <<
3351 " to " << (void*)new_handle
<< "/" << pretty_binary_string(new_raw_key
) <<
3352 " size " << value
.size() << dendl
;
3354 bytes_in_batch
+= new_raw_key
.size() * 2 + value
.size();
3356 bytes_per_iterator
+= new_raw_key
.size() * 2 + value
.size();
3357 keys_per_iterator
++;
3359 //check if need to write batch
3360 if (bytes_in_batch
>= ctrl
.bytes_per_batch
||
3361 keys_in_batch
>= ctrl
.keys_per_batch
) {
3363 if (ctrl
.unittest_fail_after_first_batch
) {
3368 if (bat
.Count() > 0) {
3374 auto close_column_handles
= make_scope_guard([this] {
3378 columns_t to_process_columns
;
3379 int r
= prepare_for_reshard(new_sharding
, to_process_columns
);
3381 dout(1) << "failed to prepare db for reshard" << dendl
;
3385 for (auto& [name
, handle
] : to_process_columns
) {
3386 dout(5) << "Processing column=" << name
3387 << " handle=" << handle
.get() << dendl
;
3388 if (name
== rocksdb::kDefaultColumnFamilyName
) {
3389 ceph_assert(handle
.get() == default_cf
);
3390 r
= process_column(default_cf
, std::string());
3392 std::string fixed_prefix
= name
.substr(0, name
.find('-'));
3393 dout(10) << "Prefix: " << fixed_prefix
<< dendl
;
3394 r
= process_column(handle
.get(), fixed_prefix
);
3397 derr
<< "Error processing column " << name
<< dendl
;
3400 if (ctrl
.unittest_fail_after_processing_column
) {
3405 r
= reshard_cleanup(to_process_columns
);
3407 dout(5) << "failed to cleanup after reshard" << dendl
;
3411 if (ctrl
.unittest_fail_after_successful_processing
) {
3414 env
->CreateDir(sharding_def_dir
);
3415 if (auto status
= rocksdb::WriteStringToFile(env
, new_sharding
,
3416 sharding_def_file
, true);
3418 derr
<< __func__
<< " cannot write to " << sharding_def_file
<< dendl
;
3425 bool RocksDBStore::get_sharding(std::string
& sharding
) {
3426 rocksdb::Status status
;
3427 std::string stored_sharding_text
;
3428 bool result
= false;
3431 status
= env
->FileExists(sharding_def_file
);
3433 status
= rocksdb::ReadFileToString(env
,
3435 &stored_sharding_text
);
3438 sharding
= stored_sharding_text
;