]> git.proxmox.com Git - ceph.git/blob - ceph/src/kv/RocksDBStore.cc
import ceph quincy 17.2.1
[ceph.git] / ceph / src / kv / RocksDBStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <filesystem>
5 #include <map>
6 #include <memory>
7 #include <set>
8 #include <string>
9 #include <errno.h>
10 #include <unistd.h>
11 #include <sys/types.h>
12 #include <sys/stat.h>
13
14 #include "rocksdb/db.h"
15 #include "rocksdb/table.h"
16 #include "rocksdb/env.h"
17 #include "rocksdb/slice.h"
18 #include "rocksdb/cache.h"
19 #include "rocksdb/filter_policy.h"
20 #include "rocksdb/utilities/convenience.h"
21 #include "rocksdb/merge_operator.h"
22
23 #include "common/perf_counters.h"
24 #include "common/PriorityCache.h"
25 #include "include/common_fwd.h"
26 #include "include/scope_guard.h"
27 #include "include/str_list.h"
28 #include "include/stringify.h"
29 #include "include/str_map.h"
30 #include "KeyValueDB.h"
31 #include "RocksDBStore.h"
32
33 #include "common/debug.h"
34
35 #define dout_context cct
36 #define dout_subsys ceph_subsys_rocksdb
37 #undef dout_prefix
38 #define dout_prefix *_dout << "rocksdb: "
39
40 namespace fs = std::filesystem;
41
42 using std::function;
43 using std::list;
44 using std::map;
45 using std::ostream;
46 using std::pair;
47 using std::set;
48 using std::string;
49 using std::unique_ptr;
50 using std::vector;
51
52 using ceph::bufferlist;
53 using ceph::bufferptr;
54 using ceph::Formatter;
55
56 static const char* sharding_def_dir = "sharding";
57 static const char* sharding_def_file = "sharding/def";
58 static const char* sharding_recreate = "sharding/recreate_columns";
59 static const char* resharding_column_lock = "reshardingXcommencingXlocked";
60
61 static bufferlist to_bufferlist(rocksdb::Slice in) {
62 bufferlist bl;
63 bl.append(bufferptr(in.data(), in.size()));
64 return bl;
65 }
66
67 static rocksdb::SliceParts prepare_sliceparts(const bufferlist &bl,
68 vector<rocksdb::Slice> *slices)
69 {
70 unsigned n = 0;
71 for (auto& buf : bl.buffers()) {
72 (*slices)[n].data_ = buf.c_str();
73 (*slices)[n].size_ = buf.length();
74 n++;
75 }
76 return rocksdb::SliceParts(slices->data(), slices->size());
77 }
78
79
80 //
81 // One of these for the default rocksdb column family, routing each prefix
82 // to the appropriate MergeOperator.
83 //
84 class RocksDBStore::MergeOperatorRouter
85 : public rocksdb::AssociativeMergeOperator
86 {
87 RocksDBStore& store;
88 public:
89 const char *Name() const override {
90 // Construct a name that rocksDB will validate against. We want to
91 // do this in a way that doesn't constrain the ordering of calls
92 // to set_merge_operator, so sort the merge operators and then
93 // construct a name from all of those parts.
94 store.assoc_name.clear();
95 map<std::string,std::string> names;
96
97 for (auto& p : store.merge_ops) {
98 names[p.first] = p.second->name();
99 }
100 for (auto& p : names) {
101 store.assoc_name += '.';
102 store.assoc_name += p.first;
103 store.assoc_name += ':';
104 store.assoc_name += p.second;
105 }
106 return store.assoc_name.c_str();
107 }
108
109 explicit MergeOperatorRouter(RocksDBStore &_store) : store(_store) {}
110
111 bool Merge(const rocksdb::Slice& key,
112 const rocksdb::Slice* existing_value,
113 const rocksdb::Slice& value,
114 std::string* new_value,
115 rocksdb::Logger* logger) const override {
116 // for default column family
117 // extract prefix from key and compare against each registered merge op;
118 // even though merge operator for explicit CF is included in merge_ops,
119 // it won't be picked up, since it won't match.
120 for (auto& p : store.merge_ops) {
121 if (p.first.compare(0, p.first.length(),
122 key.data(), p.first.length()) == 0 &&
123 key.data()[p.first.length()] == 0) {
124 if (existing_value) {
125 p.second->merge(existing_value->data(), existing_value->size(),
126 value.data(), value.size(),
127 new_value);
128 } else {
129 p.second->merge_nonexistent(value.data(), value.size(), new_value);
130 }
131 break;
132 }
133 }
134 return true; // OK :)
135 }
136 };
137
138 //
139 // One of these per non-default column family, linked directly to the
140 // merge operator for that CF/prefix (if any).
141 //
142 class RocksDBStore::MergeOperatorLinker
143 : public rocksdb::AssociativeMergeOperator
144 {
145 private:
146 std::shared_ptr<KeyValueDB::MergeOperator> mop;
147 public:
148 explicit MergeOperatorLinker(const std::shared_ptr<KeyValueDB::MergeOperator> &o) : mop(o) {}
149
150 const char *Name() const override {
151 return mop->name();
152 }
153
154 bool Merge(const rocksdb::Slice& key,
155 const rocksdb::Slice* existing_value,
156 const rocksdb::Slice& value,
157 std::string* new_value,
158 rocksdb::Logger* logger) const override {
159 if (existing_value) {
160 mop->merge(existing_value->data(), existing_value->size(),
161 value.data(), value.size(),
162 new_value);
163 } else {
164 mop->merge_nonexistent(value.data(), value.size(), new_value);
165 }
166 return true;
167 }
168 };
169
170 int RocksDBStore::set_merge_operator(
171 const string& prefix,
172 std::shared_ptr<KeyValueDB::MergeOperator> mop)
173 {
174 // If you fail here, it's because you can't do this on an open database
175 ceph_assert(db == nullptr);
176 merge_ops.push_back(std::make_pair(prefix,mop));
177 return 0;
178 }
179
180 class CephRocksdbLogger : public rocksdb::Logger {
181 CephContext *cct;
182 public:
183 explicit CephRocksdbLogger(CephContext *c) : cct(c) {
184 cct->get();
185 }
186 ~CephRocksdbLogger() override {
187 cct->put();
188 }
189
190 // Write an entry to the log file with the specified format.
191 void Logv(const char* format, va_list ap) override {
192 Logv(rocksdb::INFO_LEVEL, format, ap);
193 }
194
195 // Write an entry to the log file with the specified log level
196 // and format. Any log with level under the internal log level
197 // of *this (see @SetInfoLogLevel and @GetInfoLogLevel) will not be
198 // printed.
199 void Logv(const rocksdb::InfoLogLevel log_level, const char* format,
200 va_list ap) override {
201 int v = rocksdb::NUM_INFO_LOG_LEVELS - log_level - 1;
202 dout(ceph::dout::need_dynamic(v));
203 char buf[65536];
204 vsnprintf(buf, sizeof(buf), format, ap);
205 *_dout << buf << dendl;
206 }
207 };
208
209 rocksdb::Logger *create_rocksdb_ceph_logger()
210 {
211 return new CephRocksdbLogger(g_ceph_context);
212 }
213
214 static int string2bool(const string &val, bool &b_val)
215 {
216 if (strcasecmp(val.c_str(), "false") == 0) {
217 b_val = false;
218 return 0;
219 } else if (strcasecmp(val.c_str(), "true") == 0) {
220 b_val = true;
221 return 0;
222 } else {
223 std::string err;
224 int b = strict_strtol(val.c_str(), 10, &err);
225 if (!err.empty())
226 return -EINVAL;
227 b_val = !!b;
228 return 0;
229 }
230 }
231
232 namespace rocksdb {
233 extern std::string trim(const std::string& str);
234 }
235
236 // this function is a modification of rocksdb's StringToMap:
237 // 1) accepts ' \n ; as separators
238 // 2) leaves compound options with enclosing { and }
239 rocksdb::Status StringToMap(const std::string& opts_str,
240 std::unordered_map<std::string, std::string>* opts_map)
241 {
242 using rocksdb::Status;
243 using rocksdb::trim;
244 assert(opts_map);
245 // Example:
246 // opts_str = "write_buffer_size=1024;max_write_buffer_number=2;"
247 // "nested_opt={opt1=1;opt2=2};max_bytes_for_level_base=100"
248 size_t pos = 0;
249 std::string opts = trim(opts_str);
250 while (pos < opts.size()) {
251 size_t eq_pos = opts.find('=', pos);
252 if (eq_pos == std::string::npos) {
253 return Status::InvalidArgument("Mismatched key value pair, '=' expected");
254 }
255 std::string key = trim(opts.substr(pos, eq_pos - pos));
256 if (key.empty()) {
257 return Status::InvalidArgument("Empty key found");
258 }
259
260 // skip space after '=' and look for '{' for possible nested options
261 pos = eq_pos + 1;
262 while (pos < opts.size() && isspace(opts[pos])) {
263 ++pos;
264 }
265 // Empty value at the end
266 if (pos >= opts.size()) {
267 (*opts_map)[key] = "";
268 break;
269 }
270 if (opts[pos] == '{') {
271 int count = 1;
272 size_t brace_pos = pos + 1;
273 while (brace_pos < opts.size()) {
274 if (opts[brace_pos] == '{') {
275 ++count;
276 } else if (opts[brace_pos] == '}') {
277 --count;
278 if (count == 0) {
279 break;
280 }
281 }
282 ++brace_pos;
283 }
284 // found the matching closing brace
285 if (count == 0) {
286 //include both '{' and '}'
287 (*opts_map)[key] = trim(opts.substr(pos, brace_pos - pos + 1));
288 // skip all whitespace and move to the next ';,'
289 // brace_pos points to the matching '}'
290 pos = brace_pos + 1;
291 while (pos < opts.size() && isspace(opts[pos])) {
292 ++pos;
293 }
294 if (pos < opts.size() && opts[pos] != ';' && opts[pos] != ',') {
295 return Status::InvalidArgument(
296 "Unexpected chars after nested options");
297 }
298 ++pos;
299 } else {
300 return Status::InvalidArgument(
301 "Mismatched curly braces for nested options");
302 }
303 } else {
304 size_t sc_pos = opts.find_first_of(",;", pos);
305 if (sc_pos == std::string::npos) {
306 (*opts_map)[key] = trim(opts.substr(pos));
307 // It either ends with a trailing , ; or the last key-value pair
308 break;
309 } else {
310 (*opts_map)[key] = trim(opts.substr(pos, sc_pos - pos));
311 }
312 pos = sc_pos + 1;
313 }
314 }
315 return Status::OK();
316 }
317
318 int RocksDBStore::tryInterpret(const string &key, const string &val, rocksdb::Options &opt)
319 {
320 if (key == "compaction_threads") {
321 std::string err;
322 int f = strict_iecstrtoll(val, &err);
323 if (!err.empty())
324 return -EINVAL;
325 //Low priority threadpool is used for compaction
326 opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::LOW);
327 } else if (key == "flusher_threads") {
328 std::string err;
329 int f = strict_iecstrtoll(val, &err);
330 if (!err.empty())
331 return -EINVAL;
332 //High priority threadpool is used for flusher
333 opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::HIGH);
334 } else if (key == "compact_on_mount") {
335 int ret = string2bool(val, compact_on_mount);
336 if (ret != 0)
337 return ret;
338 } else if (key == "disableWAL") {
339 int ret = string2bool(val, disableWAL);
340 if (ret != 0)
341 return ret;
342 } else {
343 //unrecognize config options.
344 return -EINVAL;
345 }
346 return 0;
347 }
348
349 int RocksDBStore::ParseOptionsFromString(const string &opt_str, rocksdb::Options &opt)
350 {
351 return ParseOptionsFromStringStatic(cct, opt_str, opt,
352 [&](const string& k, const string& v, rocksdb::Options& o) {
353 return tryInterpret(k, v, o);
354 }
355 );
356 }
357
358 int RocksDBStore::ParseOptionsFromStringStatic(
359 CephContext *cct,
360 const string& opt_str,
361 rocksdb::Options& opt,
362 function<int(const string&, const string&, rocksdb::Options&)> interp)
363 {
364 // keep aligned with func tryInterpret
365 const set<string> need_interp_keys = {"compaction_threads", "flusher_threads", "compact_on_mount", "disableWAL"};
366 rocksdb::Status status;
367 std::unordered_map<std::string, std::string> str_map;
368 status = StringToMap(opt_str, &str_map);
369 if (!status.ok()) {
370 dout(5) << __func__ << " error '" << status.getState() <<
371 "' while parsing options '" << opt_str << "'" << dendl;
372 return -EINVAL;
373 }
374
375 for (auto it = str_map.begin(); it != str_map.end(); ++it) {
376 string this_opt = it->first + "=" + it->second;
377 rocksdb::Status status =
378 rocksdb::GetOptionsFromString(opt, this_opt, &opt);
379 int r = 0;
380 if (!status.ok()) {
381 if (interp != nullptr) {
382 r = interp(it->first, it->second, opt);
383 } else if (!need_interp_keys.count(it->first)) {
384 r = -1;
385 }
386 if (r < 0) {
387 derr << status.ToString() << dendl;
388 return -EINVAL;
389 }
390 }
391 lgeneric_dout(cct, 1) << " set rocksdb option " << it->first
392 << " = " << it->second << dendl;
393 }
394 return 0;
395 }
396
397 int RocksDBStore::init(string _options_str)
398 {
399 options_str = _options_str;
400 rocksdb::Options opt;
401 //try parse options
402 if (options_str.length()) {
403 int r = ParseOptionsFromString(options_str, opt);
404 if (r != 0) {
405 return -EINVAL;
406 }
407 }
408 return 0;
409 }
410
411 int RocksDBStore::create_db_dir()
412 {
413 if (env) {
414 unique_ptr<rocksdb::Directory> dir;
415 env->NewDirectory(path, &dir);
416 } else {
417 if (!fs::exists(path)) {
418 std::error_code ec;
419 if (!fs::create_directory(path, ec)) {
420 derr << __func__ << " failed to create " << path
421 << ": " << ec.message() << dendl;
422 return -ec.value();
423 }
424 fs::permissions(path,
425 fs::perms::owner_all |
426 fs::perms::group_read | fs::perms::group_exec |
427 fs::perms::others_read | fs::perms::others_exec);
428 }
429 }
430 return 0;
431 }
432
433 int RocksDBStore::install_cf_mergeop(
434 const string &key_prefix,
435 rocksdb::ColumnFamilyOptions *cf_opt)
436 {
437 ceph_assert(cf_opt != nullptr);
438 cf_opt->merge_operator.reset();
439 for (auto& i : merge_ops) {
440 if (i.first == key_prefix) {
441 cf_opt->merge_operator.reset(new MergeOperatorLinker(i.second));
442 }
443 }
444 return 0;
445 }
446
447 int RocksDBStore::create_and_open(ostream &out,
448 const std::string& cfs)
449 {
450 int r = create_db_dir();
451 if (r < 0)
452 return r;
453 return do_open(out, true, false, cfs);
454 }
455
456 std::shared_ptr<rocksdb::Cache> RocksDBStore::create_block_cache(
457 const std::string& cache_type, size_t cache_size, double cache_prio_high) {
458 std::shared_ptr<rocksdb::Cache> cache;
459 auto shard_bits = cct->_conf->rocksdb_cache_shard_bits;
460 if (cache_type == "binned_lru") {
461 cache = rocksdb_cache::NewBinnedLRUCache(cct, cache_size, shard_bits, false, cache_prio_high);
462 } else if (cache_type == "lru") {
463 cache = rocksdb::NewLRUCache(cache_size, shard_bits);
464 } else if (cache_type == "clock") {
465 cache = rocksdb::NewClockCache(cache_size, shard_bits);
466 if (!cache) {
467 derr << "rocksdb_cache_type '" << cache
468 << "' chosen, but RocksDB not compiled with LibTBB. "
469 << dendl;
470 }
471 } else {
472 derr << "unrecognized rocksdb_cache_type '" << cache_type << "'" << dendl;
473 }
474 return cache;
475 }
476
477 int RocksDBStore::load_rocksdb_options(bool create_if_missing, rocksdb::Options& opt)
478 {
479 rocksdb::Status status;
480
481 if (options_str.length()) {
482 int r = ParseOptionsFromString(options_str, opt);
483 if (r != 0) {
484 return -EINVAL;
485 }
486 }
487
488 if (cct->_conf->rocksdb_perf) {
489 dbstats = rocksdb::CreateDBStatistics();
490 opt.statistics = dbstats;
491 }
492
493 opt.create_if_missing = create_if_missing;
494 if (kv_options.count("separate_wal_dir")) {
495 opt.wal_dir = path + ".wal";
496 }
497
498 // Since ceph::for_each_substr doesn't return a value and
499 // std::stoull does throw, we may as well just catch everything here.
500 try {
501 if (kv_options.count("db_paths")) {
502 list<string> paths;
503 get_str_list(kv_options["db_paths"], "; \t", paths);
504 for (auto& p : paths) {
505 size_t pos = p.find(',');
506 if (pos == std::string::npos) {
507 derr << __func__ << " invalid db path item " << p << " in "
508 << kv_options["db_paths"] << dendl;
509 return -EINVAL;
510 }
511 string path = p.substr(0, pos);
512 string size_str = p.substr(pos + 1);
513 uint64_t size = atoll(size_str.c_str());
514 if (!size) {
515 derr << __func__ << " invalid db path item " << p << " in "
516 << kv_options["db_paths"] << dendl;
517 return -EINVAL;
518 }
519 opt.db_paths.push_back(rocksdb::DbPath(path, size));
520 dout(10) << __func__ << " db_path " << path << " size " << size << dendl;
521 }
522 }
523 } catch (const std::system_error& e) {
524 return -e.code().value();
525 }
526
527 if (cct->_conf->rocksdb_log_to_ceph_log) {
528 opt.info_log.reset(new CephRocksdbLogger(cct));
529 }
530
531 if (priv) {
532 dout(10) << __func__ << " using custom Env " << priv << dendl;
533 opt.env = static_cast<rocksdb::Env*>(priv);
534 } else {
535 env = opt.env;
536 }
537
538 opt.env->SetAllowNonOwnerAccess(false);
539
540 // caches
541 if (!set_cache_flag) {
542 cache_size = cct->_conf->rocksdb_cache_size;
543 }
544 uint64_t row_cache_size = cache_size * cct->_conf->rocksdb_cache_row_ratio;
545 uint64_t block_cache_size = cache_size - row_cache_size;
546
547 bbt_opts.block_cache = create_block_cache(cct->_conf->rocksdb_cache_type, block_cache_size);
548 if (!bbt_opts.block_cache) {
549 return -EINVAL;
550 }
551 bbt_opts.block_size = cct->_conf->rocksdb_block_size;
552
553 if (row_cache_size > 0)
554 opt.row_cache = rocksdb::NewLRUCache(row_cache_size,
555 cct->_conf->rocksdb_cache_shard_bits);
556 uint64_t bloom_bits = cct->_conf.get_val<uint64_t>("rocksdb_bloom_bits_per_key");
557 if (bloom_bits > 0) {
558 dout(10) << __func__ << " set bloom filter bits per key to "
559 << bloom_bits << dendl;
560 bbt_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(bloom_bits));
561 }
562 using std::placeholders::_1;
563 if (cct->_conf.with_val<std::string>("rocksdb_index_type",
564 std::bind(std::equal_to<std::string>(), _1,
565 "binary_search")))
566 bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kBinarySearch;
567 if (cct->_conf.with_val<std::string>("rocksdb_index_type",
568 std::bind(std::equal_to<std::string>(), _1,
569 "hash_search")))
570 bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kHashSearch;
571 if (cct->_conf.with_val<std::string>("rocksdb_index_type",
572 std::bind(std::equal_to<std::string>(), _1,
573 "two_level")))
574 bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
575 if (!bbt_opts.no_block_cache) {
576 bbt_opts.cache_index_and_filter_blocks =
577 cct->_conf.get_val<bool>("rocksdb_cache_index_and_filter_blocks");
578 bbt_opts.cache_index_and_filter_blocks_with_high_priority =
579 cct->_conf.get_val<bool>("rocksdb_cache_index_and_filter_blocks_with_high_priority");
580 bbt_opts.pin_l0_filter_and_index_blocks_in_cache =
581 cct->_conf.get_val<bool>("rocksdb_pin_l0_filter_and_index_blocks_in_cache");
582 }
583 bbt_opts.partition_filters = cct->_conf.get_val<bool>("rocksdb_partition_filters");
584 if (cct->_conf.get_val<Option::size_t>("rocksdb_metadata_block_size") > 0)
585 bbt_opts.metadata_block_size = cct->_conf.get_val<Option::size_t>("rocksdb_metadata_block_size");
586
587 opt.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbt_opts));
588 dout(10) << __func__ << " block size " << cct->_conf->rocksdb_block_size
589 << ", block_cache size " << byte_u_t(block_cache_size)
590 << ", row_cache size " << byte_u_t(row_cache_size)
591 << "; shards "
592 << (1 << cct->_conf->rocksdb_cache_shard_bits)
593 << ", type " << cct->_conf->rocksdb_cache_type
594 << dendl;
595
596 opt.merge_operator.reset(new MergeOperatorRouter(*this));
597 comparator = opt.comparator;
598 return 0;
599 }
600
601 void RocksDBStore::add_column_family(const std::string& cf_name, uint32_t hash_l, uint32_t hash_h,
602 size_t shard_idx, rocksdb::ColumnFamilyHandle *handle) {
603 dout(10) << __func__ << " column_name=" << cf_name << " shard_idx=" << shard_idx <<
604 " hash_l=" << hash_l << " hash_h=" << hash_h << " handle=" << (void*) handle << dendl;
605 bool exists = cf_handles.count(cf_name) > 0;
606 auto& column = cf_handles[cf_name];
607 if (exists) {
608 ceph_assert(hash_l == column.hash_l);
609 ceph_assert(hash_h == column.hash_h);
610 } else {
611 ceph_assert(hash_l < hash_h);
612 column.hash_l = hash_l;
613 column.hash_h = hash_h;
614 }
615 if (column.handles.size() <= shard_idx)
616 column.handles.resize(shard_idx + 1);
617 column.handles[shard_idx] = handle;
618 cf_ids_to_prefix.emplace(handle->GetID(), cf_name);
619 }
620
621 bool RocksDBStore::is_column_family(const std::string& prefix) {
622 return cf_handles.count(prefix);
623 }
624
625 std::string_view RocksDBStore::get_key_hash_view(const prefix_shards& shards, const char* key, const size_t keylen) {
626 uint32_t hash_l = std::min<uint32_t>(shards.hash_l, keylen);
627 uint32_t hash_h = std::min<uint32_t>(shards.hash_h, keylen);
628 return { key + hash_l, hash_h - hash_l };
629 }
630
631 rocksdb::ColumnFamilyHandle *RocksDBStore::get_key_cf(const prefix_shards& shards, const char* key, const size_t keylen) {
632 auto sv = get_key_hash_view(shards, key, keylen);
633 uint32_t hash = ceph_str_hash_rjenkins(sv.data(), sv.size());
634 return shards.handles[hash % shards.handles.size()];
635 }
636
637 rocksdb::ColumnFamilyHandle *RocksDBStore::get_cf_handle(const std::string& prefix, const std::string& key) {
638 auto iter = cf_handles.find(prefix);
639 if (iter == cf_handles.end()) {
640 return nullptr;
641 } else {
642 if (iter->second.handles.size() == 1) {
643 return iter->second.handles[0];
644 } else {
645 return get_key_cf(iter->second, key.data(), key.size());
646 }
647 }
648 }
649
650 rocksdb::ColumnFamilyHandle *RocksDBStore::get_cf_handle(const std::string& prefix, const char* key, size_t keylen) {
651 auto iter = cf_handles.find(prefix);
652 if (iter == cf_handles.end()) {
653 return nullptr;
654 } else {
655 if (iter->second.handles.size() == 1) {
656 return iter->second.handles[0];
657 } else {
658 return get_key_cf(iter->second, key, keylen);
659 }
660 }
661 }
662
663 /**
664 * If the specified IteratorBounds arg has both an upper and a lower bound defined, and they have equal placement hash
665 * strings, we can be sure that the entire iteration range exists in a single CF. In that case, we return the relevant
666 * CF handle. In all other cases, we return a nullptr to indicate that the specified bounds cannot necessarily be mapped
667 * to a single CF.
668 */
669 rocksdb::ColumnFamilyHandle *RocksDBStore::get_cf_handle(const std::string& prefix, const IteratorBounds& bounds) {
670 if (!bounds.lower_bound || !bounds.upper_bound) {
671 return nullptr;
672 }
673 auto iter = cf_handles.find(prefix);
674 ceph_assert(iter != cf_handles.end());
675 ceph_assert(iter->second.handles.size() != 1);
676 if (iter->second.hash_l != 0) {
677 return nullptr;
678 }
679 auto lower_bound_hash_str = get_key_hash_view(iter->second, bounds.lower_bound->data(), bounds.lower_bound->size());
680 auto upper_bound_hash_str = get_key_hash_view(iter->second, bounds.upper_bound->data(), bounds.upper_bound->size());
681 if (lower_bound_hash_str == upper_bound_hash_str) {
682 auto key = *bounds.lower_bound;
683 return get_key_cf(iter->second, key.data(), key.size());
684 } else {
685 return nullptr;
686 }
687 }
688
689 /**
690 * Definition of sharding:
691 * space-separated list of: column_def [ '=' options ]
692 * column_def := column_name '(' shard_count ')'
693 * column_def := column_name '(' shard_count ',' hash_begin '-' ')'
694 * column_def := column_name '(' shard_count ',' hash_begin '-' hash_end ')'
695 * I=write_buffer_size=1048576 O(6) m(7,10-) prefix(4,0-10)=disable_auto_compactions=true,max_bytes_for_level_base=1048576
696 */
697 bool RocksDBStore::parse_sharding_def(const std::string_view text_def_in,
698 std::vector<ColumnFamily>& sharding_def,
699 char const* *error_position,
700 std::string *error_msg)
701 {
702 std::string_view text_def = text_def_in;
703 char const* error_position_local = nullptr;
704 std::string error_msg_local;
705 if (error_position == nullptr) {
706 error_position = &error_position_local;
707 }
708 *error_position = nullptr;
709 if (error_msg == nullptr) {
710 error_msg = &error_msg_local;
711 error_msg->clear();
712 }
713
714 sharding_def.clear();
715 while (!text_def.empty()) {
716 std::string_view options;
717 std::string_view name;
718 size_t shard_cnt = 1;
719 uint32_t l_bound = 0;
720 uint32_t h_bound = std::numeric_limits<uint32_t>::max();
721
722 std::string_view column_def;
723 size_t spos = text_def.find(' ');
724 if (spos == std::string_view::npos) {
725 column_def = text_def;
726 text_def = std::string_view(text_def.end(), 0);
727 } else {
728 column_def = text_def.substr(0, spos);
729 text_def = text_def.substr(spos + 1);
730 }
731 size_t eqpos = column_def.find('=');
732 if (eqpos != std::string_view::npos) {
733 options = column_def.substr(eqpos + 1);
734 column_def = column_def.substr(0, eqpos);
735 }
736
737 size_t bpos = column_def.find('(');
738 if (bpos != std::string_view::npos) {
739 name = column_def.substr(0, bpos);
740 const char* nptr = &column_def[bpos + 1];
741 char* endptr;
742 shard_cnt = strtol(nptr, &endptr, 10);
743 if (nptr == endptr) {
744 *error_position = nptr;
745 *error_msg = "expecting integer";
746 break;
747 }
748 nptr = endptr;
749 if (*nptr == ',') {
750 nptr++;
751 l_bound = strtol(nptr, &endptr, 10);
752 if (nptr == endptr) {
753 *error_position = nptr;
754 *error_msg = "expecting integer";
755 break;
756 }
757 nptr = endptr;
758 if (*nptr != '-') {
759 *error_position = nptr;
760 *error_msg = "expecting '-'";
761 break;
762 }
763 nptr++;
764 h_bound = strtol(nptr, &endptr, 10);
765 if (nptr == endptr) {
766 h_bound = std::numeric_limits<uint32_t>::max();
767 }
768 nptr = endptr;
769 }
770 if (*nptr != ')') {
771 *error_position = nptr;
772 *error_msg = "expecting ')'";
773 break;
774 }
775 } else {
776 name = column_def;
777 }
778 sharding_def.emplace_back(std::string(name), shard_cnt, std::string(options), l_bound, h_bound);
779 }
780 return *error_position == nullptr;
781 }
782
783 void RocksDBStore::sharding_def_to_columns(const std::vector<ColumnFamily>& sharding_def,
784 std::vector<std::string>& columns)
785 {
786 columns.clear();
787 for (size_t i = 0; i < sharding_def.size(); i++) {
788 if (sharding_def[i].shard_cnt == 1) {
789 columns.push_back(sharding_def[i].name);
790 } else {
791 for (size_t j = 0; j < sharding_def[i].shard_cnt; j++) {
792 columns.push_back(sharding_def[i].name + "-" + std::to_string(j));
793 }
794 }
795 }
796 }
797
798 int RocksDBStore::create_shards(const rocksdb::Options& opt,
799 const std::vector<ColumnFamily>& sharding_def)
800 {
801 for (auto& p : sharding_def) {
802 // copy default CF settings, block cache, merge operators as
803 // the base for new CF
804 rocksdb::ColumnFamilyOptions cf_opt(opt);
805 rocksdb::Status status;
806 // apply options to column family
807 int r = update_column_family_options(p.name, p.options, &cf_opt);
808 if (r != 0) {
809 return r;
810 }
811 for (size_t idx = 0; idx < p.shard_cnt; idx++) {
812 std::string cf_name;
813 if (p.shard_cnt == 1)
814 cf_name = p.name;
815 else
816 cf_name = p.name + "-" + std::to_string(idx);
817 rocksdb::ColumnFamilyHandle *cf;
818 status = db->CreateColumnFamily(cf_opt, cf_name, &cf);
819 if (!status.ok()) {
820 derr << __func__ << " Failed to create rocksdb column family: "
821 << cf_name << dendl;
822 return -EINVAL;
823 }
824 // store the new CF handle
825 add_column_family(p.name, p.hash_l, p.hash_h, idx, cf);
826 }
827 }
828 return 0;
829 }
830
831 int RocksDBStore::apply_sharding(const rocksdb::Options& opt,
832 const std::string& sharding_text)
833 {
834 // create and open column families
835 if (!sharding_text.empty()) {
836 bool b;
837 int r;
838 rocksdb::Status status;
839 std::vector<ColumnFamily> sharding_def;
840 char const* error_position;
841 std::string error_msg;
842 b = parse_sharding_def(sharding_text, sharding_def, &error_position, &error_msg);
843 if (!b) {
844 dout(1) << __func__ << " bad sharding: " << dendl;
845 dout(1) << __func__ << sharding_text << dendl;
846 dout(1) << __func__ << std::string(error_position - &sharding_text[0], ' ') << "^" << error_msg << dendl;
847 return -EINVAL;
848 }
849 r = create_shards(opt, sharding_def);
850 if (r != 0 ) {
851 derr << __func__ << " create_shards failed error=" << r << dendl;
852 return r;
853 }
854 opt.env->CreateDir(sharding_def_dir);
855 status = rocksdb::WriteStringToFile(opt.env, sharding_text,
856 sharding_def_file, true);
857 if (!status.ok()) {
858 derr << __func__ << " cannot write to " << sharding_def_file << dendl;
859 return -EIO;
860 }
861 } else {
862 opt.env->DeleteFile(sharding_def_file);
863 }
864 return 0;
865 }
866
867 // linking to rocksdb function defined in options_helper.cc
868 // it can parse nested params like "nested_opt={opt1=1;opt2=2}"
869 extern rocksdb::Status rocksdb::StringToMap(const std::string& opts_str,
870 std::unordered_map<std::string, std::string>* opts_map);
871
872 // Splits column family options from single string into name->value column_opts_map.
873 // The split is done using RocksDB parser that understands "{" and "}", so it
874 // properly extracts compound options.
875 // If non-RocksDB option "block_cache" is defined it is extracted to block_cache_opt.
876 int RocksDBStore::split_column_family_options(const std::string& options,
877 std::unordered_map<std::string, std::string>* opt_map,
878 std::string* block_cache_opt)
879 {
880 dout(20) << __func__ << " options=" << options << dendl;
881 rocksdb::Status status = rocksdb::StringToMap(options, opt_map);
882 if (!status.ok()) {
883 dout(5) << __func__ << " error '" << status.getState()
884 << "' while parsing options '" << options << "'" << dendl;
885 return -EINVAL;
886 }
887 // if "block_cache" option exists, then move it to separate string
888 if (auto it = opt_map->find("block_cache"); it != opt_map->end()) {
889 *block_cache_opt = it->second;
890 opt_map->erase(it);
891 } else {
892 block_cache_opt->clear();
893 }
894 return 0;
895 }
896
897 // Updates column family options.
898 // Take options from more_options and apply them to cf_opt.
899 // Allowed options are exactly the same as allowed for column families in RocksDB.
900 // Ceph addition is "block_cache" option that is translated to block_cache and
901 // allows to specialize separate block cache for O column family.
902 //
903 // base_name - name of column without shard suffix: "-"+number
904 // options - additional options to apply
905 // cf_opt - column family options to update
906 int RocksDBStore::update_column_family_options(const std::string& base_name,
907 const std::string& more_options,
908 rocksdb::ColumnFamilyOptions* cf_opt)
909 {
910 std::unordered_map<std::string, std::string> options_map;
911 std::string block_cache_opt;
912 rocksdb::Status status;
913 int r = split_column_family_options(more_options, &options_map, &block_cache_opt);
914 if (r != 0) {
915 dout(5) << __func__ << " failed to parse options; column family=" << base_name
916 << " options=" << more_options << dendl;
917 return r;
918 }
919 status = rocksdb::GetColumnFamilyOptionsFromMap(*cf_opt, options_map, cf_opt);
920 if (!status.ok()) {
921 dout(5) << __func__ << " invalid column family optionsp; column family="
922 << base_name << " options=" << more_options << dendl;
923 dout(5) << __func__ << " RocksDB error='" << status.getState() << "'" << dendl;
924 return -EINVAL;
925 }
926 if (base_name != rocksdb::kDefaultColumnFamilyName) {
927 // default cf has its merge operator defined in load_rocksdb_options, should not override it
928 install_cf_mergeop(base_name, cf_opt);
929 }
930 if (!block_cache_opt.empty()) {
931 r = apply_block_cache_options(base_name, block_cache_opt, cf_opt);
932 if (r != 0) {
933 // apply_block_cache_options already does all necessary douts
934 return r;
935 }
936 }
937 return 0;
938 }
939
940 int RocksDBStore::apply_block_cache_options(const std::string& column_name,
941 const std::string& block_cache_opt,
942 rocksdb::ColumnFamilyOptions* cf_opt)
943 {
944 rocksdb::Status status;
945 std::unordered_map<std::string, std::string> cache_options_map;
946 status = rocksdb::StringToMap(block_cache_opt, &cache_options_map);
947 if (!status.ok()) {
948 dout(5) << __func__ << " invalid block cache options; column=" << column_name
949 << " options=" << block_cache_opt << dendl;
950 dout(5) << __func__ << " RocksDB error='" << status.getState() << "'" << dendl;
951 return -EINVAL;
952 }
953 bool require_new_block_cache = false;
954 std::string cache_type = cct->_conf->rocksdb_cache_type;
955 if (const auto it = cache_options_map.find("type"); it != cache_options_map.end()) {
956 cache_type = it->second;
957 cache_options_map.erase(it);
958 require_new_block_cache = true;
959 }
960 size_t cache_size = cct->_conf->rocksdb_cache_size;
961 if (auto it = cache_options_map.find("size"); it != cache_options_map.end()) {
962 std::string error;
963 cache_size = strict_iecstrtoll(it->second.c_str(), &error);
964 if (!error.empty()) {
965 dout(10) << __func__ << " invalid size: '" << it->second << "'" << dendl;
966 return -EINVAL;
967 }
968 cache_options_map.erase(it);
969 require_new_block_cache = true;
970 }
971 double high_pri_pool_ratio = 0.0;
972 if (auto it = cache_options_map.find("high_ratio"); it != cache_options_map.end()) {
973 std::string error;
974 high_pri_pool_ratio = strict_strtod(it->second.c_str(), &error);
975 if (!error.empty()) {
976 dout(10) << __func__ << " invalid high_pri (float): '" << it->second << "'" << dendl;
977 return -EINVAL;
978 }
979 cache_options_map.erase(it);
980 require_new_block_cache = true;
981 }
982
983 rocksdb::BlockBasedTableOptions column_bbt_opts;
984 status = GetBlockBasedTableOptionsFromMap(bbt_opts, cache_options_map, &column_bbt_opts);
985 if (!status.ok()) {
986 dout(5) << __func__ << " invalid block cache options; column=" << column_name
987 << " options=" << block_cache_opt << dendl;
988 dout(5) << __func__ << " RocksDB error='" << status.getState() << "'" << dendl;
989 return -EINVAL;
990 }
991 std::shared_ptr<rocksdb::Cache> block_cache;
992 if (column_bbt_opts.no_block_cache) {
993 // clear all settings except no_block_cache
994 // rocksdb does not like then
995 column_bbt_opts = rocksdb::BlockBasedTableOptions();
996 column_bbt_opts.no_block_cache = true;
997 } else {
998 if (require_new_block_cache) {
999 block_cache = create_block_cache(cache_type, cache_size, high_pri_pool_ratio);
1000 if (!block_cache) {
1001 dout(5) << __func__ << " failed to create block cache for params: " << block_cache_opt << dendl;
1002 return -EINVAL;
1003 }
1004 } else {
1005 block_cache = bbt_opts.block_cache;
1006 }
1007 }
1008 column_bbt_opts.block_cache = block_cache;
1009 cf_bbt_opts[column_name] = column_bbt_opts;
1010 cf_opt->table_factory.reset(NewBlockBasedTableFactory(cf_bbt_opts[column_name]));
1011 return 0;
1012 }
1013
1014 int RocksDBStore::verify_sharding(const rocksdb::Options& opt,
1015 std::vector<rocksdb::ColumnFamilyDescriptor>& existing_cfs,
1016 std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> >& existing_cfs_shard,
1017 std::vector<rocksdb::ColumnFamilyDescriptor>& missing_cfs,
1018 std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> >& missing_cfs_shard)
1019 {
1020 rocksdb::Status status;
1021 std::string stored_sharding_text;
1022 status = opt.env->FileExists(sharding_def_file);
1023 if (status.ok()) {
1024 status = rocksdb::ReadFileToString(opt.env,
1025 sharding_def_file,
1026 &stored_sharding_text);
1027 if(!status.ok()) {
1028 derr << __func__ << " cannot read from " << sharding_def_file << dendl;
1029 return -EIO;
1030 }
1031 dout(20) << __func__ << " sharding=" << stored_sharding_text << dendl;
1032 } else {
1033 dout(30) << __func__ << " no sharding" << dendl;
1034 //no "sharding_def" present
1035 }
1036 //check if sharding_def matches stored_sharding_def
1037 std::vector<ColumnFamily> stored_sharding_def;
1038 parse_sharding_def(stored_sharding_text, stored_sharding_def);
1039
1040 std::sort(stored_sharding_def.begin(), stored_sharding_def.end(),
1041 [](ColumnFamily& a, ColumnFamily& b) { return a.name < b.name; } );
1042
1043 std::vector<string> rocksdb_cfs;
1044 status = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(opt),
1045 path, &rocksdb_cfs);
1046 if (!status.ok()) {
1047 derr << __func__ << " unable to list column families: " << status.ToString() << dendl;
1048 return -EIO;
1049 }
1050 dout(5) << __func__ << " column families from rocksdb: " << rocksdb_cfs << dendl;
1051
1052 auto emplace_cf = [&] (const RocksDBStore::ColumnFamily& column,
1053 int32_t shard_id,
1054 const std::string& shard_name,
1055 const rocksdb::ColumnFamilyOptions& opt) {
1056 if (std::find(rocksdb_cfs.begin(), rocksdb_cfs.end(), shard_name) != rocksdb_cfs.end()) {
1057 existing_cfs.emplace_back(shard_name, opt);
1058 existing_cfs_shard.emplace_back(shard_id, column);
1059 } else {
1060 missing_cfs.emplace_back(shard_name, opt);
1061 missing_cfs_shard.emplace_back(shard_id, column);
1062 }
1063 };
1064
1065 for (auto& column : stored_sharding_def) {
1066 rocksdb::ColumnFamilyOptions cf_opt(opt);
1067 int r = update_column_family_options(column.name, column.options, &cf_opt);
1068 if (r != 0) {
1069 return r;
1070 }
1071 if (column.shard_cnt == 1) {
1072 emplace_cf(column, 0, column.name, cf_opt);
1073 } else {
1074 for (size_t i = 0; i < column.shard_cnt; i++) {
1075 std::string cf_name = column.name + "-" + std::to_string(i);
1076 emplace_cf(column, i, cf_name, cf_opt);
1077 }
1078 }
1079 }
1080 existing_cfs.emplace_back("default", opt);
1081
1082 if (existing_cfs.size() != rocksdb_cfs.size()) {
1083 std::vector<std::string> columns_from_stored;
1084 sharding_def_to_columns(stored_sharding_def, columns_from_stored);
1085 derr << __func__ << " extra columns in rocksdb. rocksdb columns = " << rocksdb_cfs
1086 << " target columns = " << columns_from_stored << dendl;
1087 return -EIO;
1088 }
1089 return 0;
1090 }
1091
1092 std::ostream& operator<<(std::ostream& out, const RocksDBStore::ColumnFamily& cf)
1093 {
1094 out << "(";
1095 out << cf.name;
1096 out << ",";
1097 out << cf.shard_cnt;
1098 out << ",";
1099 out << cf.hash_l;
1100 out << "-";
1101 if (cf.hash_h != std::numeric_limits<uint32_t>::max()) {
1102 out << cf.hash_h;
1103 }
1104 out << ",";
1105 out << cf.options;
1106 out << ")";
1107 return out;
1108 }
1109
1110 int RocksDBStore::do_open(ostream &out,
1111 bool create_if_missing,
1112 bool open_readonly,
1113 const std::string& sharding_text)
1114 {
1115 ceph_assert(!(create_if_missing && open_readonly));
1116 rocksdb::Options opt;
1117 int r = load_rocksdb_options(create_if_missing, opt);
1118 if (r) {
1119 dout(1) << __func__ << " load rocksdb options failed" << dendl;
1120 return r;
1121 }
1122 rocksdb::Status status;
1123 if (create_if_missing) {
1124 status = rocksdb::DB::Open(opt, path, &db);
1125 if (!status.ok()) {
1126 derr << status.ToString() << dendl;
1127 return -EINVAL;
1128 }
1129 r = apply_sharding(opt, sharding_text);
1130 if (r < 0) {
1131 return r;
1132 }
1133 default_cf = db->DefaultColumnFamily();
1134 } else {
1135 std::vector<rocksdb::ColumnFamilyDescriptor> existing_cfs;
1136 std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> > existing_cfs_shard;
1137 std::vector<rocksdb::ColumnFamilyDescriptor> missing_cfs;
1138 std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> > missing_cfs_shard;
1139
1140 r = verify_sharding(opt,
1141 existing_cfs, existing_cfs_shard,
1142 missing_cfs, missing_cfs_shard);
1143 if (r < 0) {
1144 return r;
1145 }
1146 std::string sharding_recreate_text;
1147 status = rocksdb::ReadFileToString(opt.env,
1148 sharding_recreate,
1149 &sharding_recreate_text);
1150 bool recreate_mode = status.ok() && sharding_recreate_text == "1";
1151
1152 ceph_assert(!recreate_mode || !open_readonly);
1153 if (recreate_mode == false && missing_cfs.size() != 0) {
1154 // We do not accept when there are missing column families, except case that we are during resharding.
1155 // We can get into this case if resharding was interrupted. It gives a chance to continue.
1156 // Opening DB is only allowed in read-only mode.
1157 if (open_readonly == false &&
1158 std::find_if(missing_cfs.begin(), missing_cfs.end(),
1159 [](const rocksdb::ColumnFamilyDescriptor& c) { return c.name == resharding_column_lock; }
1160 ) != missing_cfs.end()) {
1161 derr << __func__ << " missing column families: " << missing_cfs_shard << dendl;
1162 return -EIO;
1163 }
1164 }
1165
1166 if (existing_cfs.empty()) {
1167 // no column families
1168 if (open_readonly) {
1169 status = rocksdb::DB::OpenForReadOnly(opt, path, &db);
1170 } else {
1171 status = rocksdb::DB::Open(opt, path, &db);
1172 }
1173 if (!status.ok()) {
1174 derr << status.ToString() << dendl;
1175 return -EINVAL;
1176 }
1177 default_cf = db->DefaultColumnFamily();
1178 } else {
1179 std::vector<rocksdb::ColumnFamilyHandle*> handles;
1180 if (open_readonly) {
1181 status = rocksdb::DB::OpenForReadOnly(rocksdb::DBOptions(opt),
1182 path, existing_cfs,
1183 &handles, &db);
1184 } else {
1185 status = rocksdb::DB::Open(rocksdb::DBOptions(opt),
1186 path, existing_cfs, &handles, &db);
1187 }
1188 if (!status.ok()) {
1189 derr << status.ToString() << dendl;
1190 return -EINVAL;
1191 }
1192 ceph_assert(existing_cfs.size() == existing_cfs_shard.size() + 1);
1193 ceph_assert(handles.size() == existing_cfs.size());
1194 dout(10) << __func__ << " existing_cfs=" << existing_cfs.size() << dendl;
1195 for (size_t i = 0; i < existing_cfs_shard.size(); i++) {
1196 add_column_family(existing_cfs_shard[i].second.name,
1197 existing_cfs_shard[i].second.hash_l,
1198 existing_cfs_shard[i].second.hash_h,
1199 existing_cfs_shard[i].first,
1200 handles[i]);
1201 }
1202 default_cf = handles[handles.size() - 1];
1203 must_close_default_cf = true;
1204
1205 if (missing_cfs.size() > 0 &&
1206 std::find_if(missing_cfs.begin(), missing_cfs.end(),
1207 [](const rocksdb::ColumnFamilyDescriptor& c) { return c.name == resharding_column_lock; }
1208 ) == missing_cfs.end())
1209 {
1210 dout(10) << __func__ << " missing_cfs=" << missing_cfs.size() << dendl;
1211 ceph_assert(recreate_mode);
1212 ceph_assert(missing_cfs.size() == missing_cfs_shard.size());
1213 for (size_t i = 0; i < missing_cfs.size(); i++) {
1214 rocksdb::ColumnFamilyHandle *cf;
1215 status = db->CreateColumnFamily(missing_cfs[i].options, missing_cfs[i].name, &cf);
1216 if (!status.ok()) {
1217 derr << __func__ << " Failed to create rocksdb column family: "
1218 << missing_cfs[i].name << dendl;
1219 return -EINVAL;
1220 }
1221 add_column_family(missing_cfs_shard[i].second.name,
1222 missing_cfs_shard[i].second.hash_l,
1223 missing_cfs_shard[i].second.hash_h,
1224 missing_cfs_shard[i].first,
1225 cf);
1226 }
1227 opt.env->DeleteFile(sharding_recreate);
1228 }
1229 }
1230 }
1231 ceph_assert(default_cf != nullptr);
1232
1233 PerfCountersBuilder plb(cct, "rocksdb", l_rocksdb_first, l_rocksdb_last);
1234 plb.add_time_avg(l_rocksdb_get_latency, "get_latency", "Get latency");
1235 plb.add_time_avg(l_rocksdb_submit_latency, "submit_latency", "Submit Latency");
1236 plb.add_time_avg(l_rocksdb_submit_sync_latency, "submit_sync_latency", "Submit Sync Latency");
1237 plb.add_u64_counter(l_rocksdb_compact, "compact", "Compactions");
1238 plb.add_u64_counter(l_rocksdb_compact_range, "compact_range", "Compactions by range");
1239 plb.add_u64_counter(l_rocksdb_compact_queue_merge, "compact_queue_merge", "Mergings of ranges in compaction queue");
1240 plb.add_u64(l_rocksdb_compact_queue_len, "compact_queue_len", "Length of compaction queue");
1241 plb.add_time_avg(l_rocksdb_write_wal_time, "rocksdb_write_wal_time", "Rocksdb write wal time");
1242 plb.add_time_avg(l_rocksdb_write_memtable_time, "rocksdb_write_memtable_time", "Rocksdb write memtable time");
1243 plb.add_time_avg(l_rocksdb_write_delay_time, "rocksdb_write_delay_time", "Rocksdb write delay time");
1244 plb.add_time_avg(l_rocksdb_write_pre_and_post_process_time,
1245 "rocksdb_write_pre_and_post_time", "total time spent on writing a record, excluding write process");
1246 logger = plb.create_perf_counters();
1247 cct->get_perfcounters_collection()->add(logger);
1248
1249 if (compact_on_mount) {
1250 derr << "Compacting rocksdb store..." << dendl;
1251 compact();
1252 derr << "Finished compacting rocksdb store" << dendl;
1253 }
1254 return 0;
1255 }
1256
1257 int RocksDBStore::_test_init(const string& dir)
1258 {
1259 rocksdb::Options options;
1260 options.create_if_missing = true;
1261 rocksdb::DB *db;
1262 rocksdb::Status status = rocksdb::DB::Open(options, dir, &db);
1263 delete db;
1264 db = nullptr;
1265 return status.ok() ? 0 : -EIO;
1266 }
1267
1268 RocksDBStore::~RocksDBStore()
1269 {
1270 close();
1271 if (priv) {
1272 delete static_cast<rocksdb::Env*>(priv);
1273 }
1274 }
1275
1276 void RocksDBStore::close()
1277 {
1278 // stop compaction thread
1279 compact_queue_lock.lock();
1280 if (compact_thread.is_started()) {
1281 dout(1) << __func__ << " waiting for compaction thread to stop" << dendl;
1282 compact_queue_stop = true;
1283 compact_queue_cond.notify_all();
1284 compact_queue_lock.unlock();
1285 compact_thread.join();
1286 dout(1) << __func__ << " compaction thread to stopped" << dendl;
1287 } else {
1288 compact_queue_lock.unlock();
1289 }
1290
1291 if (logger) {
1292 cct->get_perfcounters_collection()->remove(logger);
1293 delete logger;
1294 logger = nullptr;
1295 }
1296
1297 // Ensure db is destroyed before dependent db_cache and filterpolicy
1298 for (auto& p : cf_handles) {
1299 for (size_t i = 0; i < p.second.handles.size(); i++) {
1300 db->DestroyColumnFamilyHandle(p.second.handles[i]);
1301 }
1302 }
1303 cf_handles.clear();
1304 if (must_close_default_cf) {
1305 db->DestroyColumnFamilyHandle(default_cf);
1306 must_close_default_cf = false;
1307 }
1308 default_cf = nullptr;
1309 delete db;
1310 db = nullptr;
1311 }
1312
1313 int RocksDBStore::repair(std::ostream &out)
1314 {
1315 rocksdb::Status status;
1316 rocksdb::Options opt;
1317 int r = load_rocksdb_options(false, opt);
1318 if (r) {
1319 dout(1) << __func__ << " load rocksdb options failed" << dendl;
1320 out << "load rocksdb options failed" << std::endl;
1321 return r;
1322 }
1323 //need to save sharding definition, repairDB will delete files it does not know
1324 std::string stored_sharding_text;
1325 status = opt.env->FileExists(sharding_def_file);
1326 if (status.ok()) {
1327 status = rocksdb::ReadFileToString(opt.env,
1328 sharding_def_file,
1329 &stored_sharding_text);
1330 if (!status.ok()) {
1331 stored_sharding_text.clear();
1332 }
1333 }
1334 dout(10) << __func__ << " stored_sharding: " << stored_sharding_text << dendl;
1335 status = rocksdb::RepairDB(path, opt);
1336 bool repaired = status.ok();
1337 if (!stored_sharding_text.empty()) {
1338 //recreate markers even if repair failed
1339 opt.env->CreateDir(sharding_def_dir);
1340 status = rocksdb::WriteStringToFile(opt.env, stored_sharding_text,
1341 sharding_def_file, true);
1342 if (!status.ok()) {
1343 derr << __func__ << " cannot write to " << sharding_def_file << dendl;
1344 return -1;
1345 }
1346 status = rocksdb::WriteStringToFile(opt.env, "1",
1347 sharding_recreate, true);
1348 if (!status.ok()) {
1349 derr << __func__ << " cannot write to " << sharding_recreate << dendl;
1350 return -1;
1351 }
1352 // fiinalize sharding recreate
1353 if (do_open(out, false, false)) {
1354 derr << __func__ << " cannot finalize repair" << dendl;
1355 return -1;
1356 }
1357 close();
1358 }
1359
1360 if (repaired && status.ok()) {
1361 return 0;
1362 } else {
1363 out << "repair rocksdb failed : " << status.ToString() << std::endl;
1364 return -1;
1365 }
1366 }
1367
1368 void RocksDBStore::split_stats(const std::string &s, char delim, std::vector<std::string> &elems) {
1369 std::stringstream ss;
1370 ss.str(s);
1371 std::string item;
1372 while (std::getline(ss, item, delim)) {
1373 elems.push_back(item);
1374 }
1375 }
1376
1377 bool RocksDBStore::get_property(
1378 const std::string &property,
1379 uint64_t *out)
1380 {
1381 return db->GetIntProperty(property, out);
1382 }
1383
1384 int64_t RocksDBStore::estimate_prefix_size(const string& prefix,
1385 const string& key_prefix)
1386 {
1387 uint64_t size = 0;
1388 auto p_iter = cf_handles.find(prefix);
1389 if (p_iter != cf_handles.end()) {
1390 for (auto cf : p_iter->second.handles) {
1391 uint64_t s = 0;
1392 string start = key_prefix + string(1, '\x00');
1393 string limit = key_prefix + string("\xff\xff\xff\xff");
1394 rocksdb::Range r(start, limit);
1395 db->GetApproximateSizes(cf, &r, 1, &s);
1396 size += s;
1397 }
1398 } else {
1399 string start = combine_strings(prefix , key_prefix);
1400 string limit = combine_strings(prefix , key_prefix + "\xff\xff\xff\xff");
1401 rocksdb::Range r(start, limit);
1402 db->GetApproximateSizes(default_cf, &r, 1, &size);
1403 }
1404 return size;
1405 }
1406
1407 void RocksDBStore::get_statistics(Formatter *f)
1408 {
1409 if (!cct->_conf->rocksdb_perf) {
1410 dout(20) << __func__ << " RocksDB perf is disabled, can't probe for stats"
1411 << dendl;
1412 return;
1413 }
1414
1415 if (cct->_conf->rocksdb_collect_compaction_stats) {
1416 std::string stat_str;
1417 bool status = db->GetProperty("rocksdb.stats", &stat_str);
1418 if (status) {
1419 f->open_object_section("rocksdb_statistics");
1420 f->dump_string("rocksdb_compaction_statistics", "");
1421 vector<string> stats;
1422 split_stats(stat_str, '\n', stats);
1423 for (auto st :stats) {
1424 f->dump_string("", st);
1425 }
1426 f->close_section();
1427 }
1428 }
1429 if (cct->_conf->rocksdb_collect_extended_stats) {
1430 if (dbstats) {
1431 f->open_object_section("rocksdb_extended_statistics");
1432 string stat_str = dbstats->ToString();
1433 vector<string> stats;
1434 split_stats(stat_str, '\n', stats);
1435 f->dump_string("rocksdb_extended_statistics", "");
1436 for (auto st :stats) {
1437 f->dump_string(".", st);
1438 }
1439 f->close_section();
1440 }
1441 f->open_object_section("rocksdbstore_perf_counters");
1442 logger->dump_formatted(f,0);
1443 f->close_section();
1444 }
1445 if (cct->_conf->rocksdb_collect_memory_stats) {
1446 f->open_object_section("rocksdb_memtable_statistics");
1447 std::string str;
1448 if (!bbt_opts.no_block_cache) {
1449 str.append(stringify(bbt_opts.block_cache->GetUsage()));
1450 f->dump_string("block_cache_usage", str.data());
1451 str.clear();
1452 str.append(stringify(bbt_opts.block_cache->GetPinnedUsage()));
1453 f->dump_string("block_cache_pinned_blocks_usage", str);
1454 str.clear();
1455 }
1456 db->GetProperty("rocksdb.cur-size-all-mem-tables", &str);
1457 f->dump_string("rocksdb_memtable_usage", str);
1458 str.clear();
1459 db->GetProperty("rocksdb.estimate-table-readers-mem", &str);
1460 f->dump_string("rocksdb_index_filter_blocks_usage", str);
1461 f->close_section();
1462 }
1463 }
1464
1465 struct RocksDBStore::RocksWBHandler: public rocksdb::WriteBatch::Handler {
1466 RocksWBHandler(const RocksDBStore& db) : db(db) {}
1467 const RocksDBStore& db;
1468 std::stringstream seen;
1469 int num_seen = 0;
1470
1471 void dump(const char* op_name,
1472 uint32_t column_family_id,
1473 const rocksdb::Slice& key_in,
1474 const rocksdb::Slice* value = nullptr) {
1475 string prefix;
1476 string key;
1477 ssize_t size = value ? value->size() : -1;
1478 seen << std::endl << op_name << "(";
1479
1480 if (column_family_id == 0) {
1481 db.split_key(key_in, &prefix, &key);
1482 } else {
1483 auto it = db.cf_ids_to_prefix.find(column_family_id);
1484 ceph_assert(it != db.cf_ids_to_prefix.end());
1485 prefix = it->second;
1486 key = key_in.ToString();
1487 }
1488 seen << " prefix = " << prefix;
1489 seen << " key = " << pretty_binary_string(key);
1490 if (size != -1)
1491 seen << " value size = " << std::to_string(size);
1492 seen << ")";
1493 num_seen++;
1494 }
1495 void Put(const rocksdb::Slice& key,
1496 const rocksdb::Slice& value) override {
1497 dump("Put", 0, key, &value);
1498 }
1499 rocksdb::Status PutCF(uint32_t column_family_id, const rocksdb::Slice& key,
1500 const rocksdb::Slice& value) override {
1501 dump("PutCF", column_family_id, key, &value);
1502 return rocksdb::Status::OK();
1503 }
1504 void SingleDelete(const rocksdb::Slice& key) override {
1505 dump("SingleDelete", 0, key);
1506 }
1507 rocksdb::Status SingleDeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override {
1508 dump("SingleDeleteCF", column_family_id, key);
1509 return rocksdb::Status::OK();
1510 }
1511 void Delete(const rocksdb::Slice& key) override {
1512 dump("Delete", 0, key);
1513 }
1514 rocksdb::Status DeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override {
1515 dump("DeleteCF", column_family_id, key);
1516 return rocksdb::Status::OK();
1517 }
1518 void Merge(const rocksdb::Slice& key,
1519 const rocksdb::Slice& value) override {
1520 dump("Merge", 0, key, &value);
1521 }
1522 rocksdb::Status MergeCF(uint32_t column_family_id, const rocksdb::Slice& key,
1523 const rocksdb::Slice& value) override {
1524 dump("MergeCF", column_family_id, key, &value);
1525 return rocksdb::Status::OK();
1526 }
1527 bool Continue() override { return num_seen < 50; }
1528 };
1529
1530 int RocksDBStore::submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t)
1531 {
1532 // enable rocksdb breakdown
1533 // considering performance overhead, default is disabled
1534 if (cct->_conf->rocksdb_perf) {
1535 rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
1536 rocksdb::get_perf_context()->Reset();
1537 }
1538
1539 RocksDBTransactionImpl * _t =
1540 static_cast<RocksDBTransactionImpl *>(t.get());
1541 woptions.disableWAL = disableWAL;
1542 lgeneric_subdout(cct, rocksdb, 30) << __func__;
1543 RocksWBHandler bat_txc(*this);
1544 _t->bat.Iterate(&bat_txc);
1545 *_dout << " Rocksdb transaction: " << bat_txc.seen.str() << dendl;
1546
1547 rocksdb::Status s = db->Write(woptions, &_t->bat);
1548 if (!s.ok()) {
1549 RocksWBHandler rocks_txc(*this);
1550 _t->bat.Iterate(&rocks_txc);
1551 derr << __func__ << " error: " << s.ToString() << " code = " << s.code()
1552 << " Rocksdb transaction: " << rocks_txc.seen.str() << dendl;
1553 }
1554
1555 if (cct->_conf->rocksdb_perf) {
1556 utime_t write_memtable_time;
1557 utime_t write_delay_time;
1558 utime_t write_wal_time;
1559 utime_t write_pre_and_post_process_time;
1560 write_wal_time.set_from_double(
1561 static_cast<double>(rocksdb::get_perf_context()->write_wal_time)/1000000000);
1562 write_memtable_time.set_from_double(
1563 static_cast<double>(rocksdb::get_perf_context()->write_memtable_time)/1000000000);
1564 write_delay_time.set_from_double(
1565 static_cast<double>(rocksdb::get_perf_context()->write_delay_time)/1000000000);
1566 write_pre_and_post_process_time.set_from_double(
1567 static_cast<double>(rocksdb::get_perf_context()->write_pre_and_post_process_time)/1000000000);
1568 logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time);
1569 logger->tinc(l_rocksdb_write_delay_time, write_delay_time);
1570 logger->tinc(l_rocksdb_write_wal_time, write_wal_time);
1571 logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time);
1572 }
1573
1574 return s.ok() ? 0 : -1;
1575 }
1576
1577 int RocksDBStore::submit_transaction(KeyValueDB::Transaction t)
1578 {
1579 utime_t start = ceph_clock_now();
1580 rocksdb::WriteOptions woptions;
1581 woptions.sync = false;
1582
1583 int result = submit_common(woptions, t);
1584
1585 utime_t lat = ceph_clock_now() - start;
1586 logger->tinc(l_rocksdb_submit_latency, lat);
1587
1588 return result;
1589 }
1590
1591 int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t)
1592 {
1593 utime_t start = ceph_clock_now();
1594 rocksdb::WriteOptions woptions;
1595 // if disableWAL, sync can't set
1596 woptions.sync = !disableWAL;
1597
1598 int result = submit_common(woptions, t);
1599
1600 utime_t lat = ceph_clock_now() - start;
1601 logger->tinc(l_rocksdb_submit_sync_latency, lat);
1602
1603 return result;
1604 }
1605
1606 RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db)
1607 {
1608 db = _db;
1609 }
1610
1611 void RocksDBStore::RocksDBTransactionImpl::put_bat(
1612 rocksdb::WriteBatch& bat,
1613 rocksdb::ColumnFamilyHandle *cf,
1614 const string &key,
1615 const bufferlist &to_set_bl)
1616 {
1617 // bufferlist::c_str() is non-constant, so we can't call c_str()
1618 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
1619 bat.Put(cf,
1620 rocksdb::Slice(key),
1621 rocksdb::Slice(to_set_bl.buffers().front().c_str(),
1622 to_set_bl.length()));
1623 } else {
1624 rocksdb::Slice key_slice(key);
1625 vector<rocksdb::Slice> value_slices(to_set_bl.get_num_buffers());
1626 bat.Put(cf,
1627 rocksdb::SliceParts(&key_slice, 1),
1628 prepare_sliceparts(to_set_bl, &value_slices));
1629 }
1630 }
1631
1632 void RocksDBStore::RocksDBTransactionImpl::set(
1633 const string &prefix,
1634 const string &k,
1635 const bufferlist &to_set_bl)
1636 {
1637 auto cf = db->get_cf_handle(prefix, k);
1638 if (cf) {
1639 put_bat(bat, cf, k, to_set_bl);
1640 } else {
1641 string key = combine_strings(prefix, k);
1642 put_bat(bat, db->default_cf, key, to_set_bl);
1643 }
1644 }
1645
1646 void RocksDBStore::RocksDBTransactionImpl::set(
1647 const string &prefix,
1648 const char *k, size_t keylen,
1649 const bufferlist &to_set_bl)
1650 {
1651 auto cf = db->get_cf_handle(prefix, k, keylen);
1652 if (cf) {
1653 string key(k, keylen); // fixme?
1654 put_bat(bat, cf, key, to_set_bl);
1655 } else {
1656 string key;
1657 combine_strings(prefix, k, keylen, &key);
1658 put_bat(bat, cf, key, to_set_bl);
1659 }
1660 }
1661
1662 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
1663 const string &k)
1664 {
1665 auto cf = db->get_cf_handle(prefix, k);
1666 if (cf) {
1667 bat.Delete(cf, rocksdb::Slice(k));
1668 } else {
1669 bat.Delete(db->default_cf, combine_strings(prefix, k));
1670 }
1671 }
1672
1673 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
1674 const char *k,
1675 size_t keylen)
1676 {
1677 auto cf = db->get_cf_handle(prefix, k, keylen);
1678 if (cf) {
1679 bat.Delete(cf, rocksdb::Slice(k, keylen));
1680 } else {
1681 string key;
1682 combine_strings(prefix, k, keylen, &key);
1683 bat.Delete(db->default_cf, rocksdb::Slice(key));
1684 }
1685 }
1686
1687 void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix,
1688 const string &k)
1689 {
1690 auto cf = db->get_cf_handle(prefix, k);
1691 if (cf) {
1692 bat.SingleDelete(cf, k);
1693 } else {
1694 bat.SingleDelete(db->default_cf, combine_strings(prefix, k));
1695 }
1696 }
1697
1698 void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
1699 {
1700 auto p_iter = db->cf_handles.find(prefix);
1701 if (p_iter == db->cf_handles.end()) {
1702 uint64_t cnt = db->delete_range_threshold;
1703 bat.SetSavePoint();
1704 auto it = db->get_iterator(prefix);
1705 for (it->seek_to_first(); it->valid() && (--cnt) != 0; it->next()) {
1706 bat.Delete(db->default_cf, combine_strings(prefix, it->key()));
1707 }
1708 if (cnt == 0) {
1709 bat.RollbackToSavePoint();
1710 string endprefix = prefix;
1711 endprefix.push_back('\x01');
1712 bat.DeleteRange(db->default_cf,
1713 combine_strings(prefix, string()),
1714 combine_strings(endprefix, string()));
1715 } else {
1716 bat.PopSavePoint();
1717 }
1718 } else {
1719 ceph_assert(p_iter->second.handles.size() >= 1);
1720 for (auto cf : p_iter->second.handles) {
1721 uint64_t cnt = db->delete_range_threshold;
1722 bat.SetSavePoint();
1723 auto it = db->new_shard_iterator(cf);
1724 for (it->SeekToFirst(); it->Valid() && (--cnt) != 0; it->Next()) {
1725 bat.Delete(cf, it->key());
1726 }
1727 if (cnt == 0) {
1728 bat.RollbackToSavePoint();
1729 string endprefix = "\xff\xff\xff\xff"; // FIXME: this is cheating...
1730 bat.DeleteRange(cf, string(), endprefix);
1731 } else {
1732 bat.PopSavePoint();
1733 }
1734 }
1735 }
1736 }
1737
1738 void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix,
1739 const string &start,
1740 const string &end)
1741 {
1742 auto p_iter = db->cf_handles.find(prefix);
1743 if (p_iter == db->cf_handles.end()) {
1744 uint64_t cnt = db->delete_range_threshold;
1745 bat.SetSavePoint();
1746 auto it = db->get_iterator(prefix);
1747 for (it->lower_bound(start);
1748 it->valid() && db->comparator->Compare(it->key(), end) < 0 && (--cnt) != 0;
1749 it->next()) {
1750 bat.Delete(db->default_cf, combine_strings(prefix, it->key()));
1751 }
1752 if (cnt == 0) {
1753 bat.RollbackToSavePoint();
1754 bat.DeleteRange(db->default_cf,
1755 rocksdb::Slice(combine_strings(prefix, start)),
1756 rocksdb::Slice(combine_strings(prefix, end)));
1757 } else {
1758 bat.PopSavePoint();
1759 }
1760 } else {
1761 ceph_assert(p_iter->second.handles.size() >= 1);
1762 for (auto cf : p_iter->second.handles) {
1763 uint64_t cnt = db->delete_range_threshold;
1764 bat.SetSavePoint();
1765 rocksdb::Iterator* it = db->new_shard_iterator(cf);
1766 ceph_assert(it != nullptr);
1767 for (it->Seek(start);
1768 it->Valid() && db->comparator->Compare(it->key(), end) < 0 && (--cnt) != 0;
1769 it->Next()) {
1770 bat.Delete(cf, it->key());
1771 }
1772 if (cnt == 0) {
1773 bat.RollbackToSavePoint();
1774 bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end));
1775 } else {
1776 bat.PopSavePoint();
1777 }
1778 delete it;
1779 }
1780 }
1781 }
1782
1783 void RocksDBStore::RocksDBTransactionImpl::merge(
1784 const string &prefix,
1785 const string &k,
1786 const bufferlist &to_set_bl)
1787 {
1788 auto cf = db->get_cf_handle(prefix, k);
1789 if (cf) {
1790 // bufferlist::c_str() is non-constant, so we can't call c_str()
1791 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
1792 bat.Merge(
1793 cf,
1794 rocksdb::Slice(k),
1795 rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length()));
1796 } else {
1797 // make a copy
1798 rocksdb::Slice key_slice(k);
1799 vector<rocksdb::Slice> value_slices(to_set_bl.get_num_buffers());
1800 bat.Merge(cf, rocksdb::SliceParts(&key_slice, 1),
1801 prepare_sliceparts(to_set_bl, &value_slices));
1802 }
1803 } else {
1804 string key = combine_strings(prefix, k);
1805 // bufferlist::c_str() is non-constant, so we can't call c_str()
1806 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
1807 bat.Merge(
1808 db->default_cf,
1809 rocksdb::Slice(key),
1810 rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length()));
1811 } else {
1812 // make a copy
1813 rocksdb::Slice key_slice(key);
1814 vector<rocksdb::Slice> value_slices(to_set_bl.get_num_buffers());
1815 bat.Merge(
1816 db->default_cf,
1817 rocksdb::SliceParts(&key_slice, 1),
1818 prepare_sliceparts(to_set_bl, &value_slices));
1819 }
1820 }
1821 }
1822
1823 int RocksDBStore::get(
1824 const string &prefix,
1825 const std::set<string> &keys,
1826 std::map<string, bufferlist> *out)
1827 {
1828 rocksdb::PinnableSlice value;
1829 utime_t start = ceph_clock_now();
1830 if (cf_handles.count(prefix) > 0) {
1831 for (auto& key : keys) {
1832 auto cf_handle = get_cf_handle(prefix, key);
1833 auto status = db->Get(rocksdb::ReadOptions(),
1834 cf_handle,
1835 rocksdb::Slice(key),
1836 &value);
1837 if (status.ok()) {
1838 (*out)[key].append(value.data(), value.size());
1839 } else if (status.IsIOError()) {
1840 ceph_abort_msg(status.getState());
1841 }
1842 value.Reset();
1843 }
1844 } else {
1845 for (auto& key : keys) {
1846 string k = combine_strings(prefix, key);
1847 auto status = db->Get(rocksdb::ReadOptions(),
1848 default_cf,
1849 rocksdb::Slice(k),
1850 &value);
1851 if (status.ok()) {
1852 (*out)[key].append(value.data(), value.size());
1853 } else if (status.IsIOError()) {
1854 ceph_abort_msg(status.getState());
1855 }
1856 value.Reset();
1857 }
1858 }
1859 utime_t lat = ceph_clock_now() - start;
1860 logger->tinc(l_rocksdb_get_latency, lat);
1861 return 0;
1862 }
1863
1864 int RocksDBStore::get(
1865 const string &prefix,
1866 const string &key,
1867 bufferlist *out)
1868 {
1869 ceph_assert(out && (out->length() == 0));
1870 utime_t start = ceph_clock_now();
1871 int r = 0;
1872 rocksdb::PinnableSlice value;
1873 rocksdb::Status s;
1874 auto cf = get_cf_handle(prefix, key);
1875 if (cf) {
1876 s = db->Get(rocksdb::ReadOptions(),
1877 cf,
1878 rocksdb::Slice(key),
1879 &value);
1880 } else {
1881 string k = combine_strings(prefix, key);
1882 s = db->Get(rocksdb::ReadOptions(),
1883 default_cf,
1884 rocksdb::Slice(k),
1885 &value);
1886 }
1887 if (s.ok()) {
1888 out->append(value.data(), value.size());
1889 } else if (s.IsNotFound()) {
1890 r = -ENOENT;
1891 } else {
1892 ceph_abort_msg(s.getState());
1893 }
1894 utime_t lat = ceph_clock_now() - start;
1895 logger->tinc(l_rocksdb_get_latency, lat);
1896 return r;
1897 }
1898
1899 int RocksDBStore::get(
1900 const string& prefix,
1901 const char *key,
1902 size_t keylen,
1903 bufferlist *out)
1904 {
1905 ceph_assert(out && (out->length() == 0));
1906 utime_t start = ceph_clock_now();
1907 int r = 0;
1908 rocksdb::PinnableSlice value;
1909 rocksdb::Status s;
1910 auto cf = get_cf_handle(prefix, key, keylen);
1911 if (cf) {
1912 s = db->Get(rocksdb::ReadOptions(),
1913 cf,
1914 rocksdb::Slice(key, keylen),
1915 &value);
1916 } else {
1917 string k;
1918 combine_strings(prefix, key, keylen, &k);
1919 s = db->Get(rocksdb::ReadOptions(),
1920 default_cf,
1921 rocksdb::Slice(k),
1922 &value);
1923 }
1924 if (s.ok()) {
1925 out->append(value.data(), value.size());
1926 } else if (s.IsNotFound()) {
1927 r = -ENOENT;
1928 } else {
1929 ceph_abort_msg(s.getState());
1930 }
1931 utime_t lat = ceph_clock_now() - start;
1932 logger->tinc(l_rocksdb_get_latency, lat);
1933 return r;
1934 }
1935
1936 int RocksDBStore::split_key(rocksdb::Slice in, string *prefix, string *key)
1937 {
1938 size_t prefix_len = 0;
1939
1940 // Find separator inside Slice
1941 char* separator = (char*) memchr(in.data(), 0, in.size());
1942 if (separator == NULL)
1943 return -EINVAL;
1944 prefix_len = size_t(separator - in.data());
1945 if (prefix_len >= in.size())
1946 return -EINVAL;
1947
1948 // Fetch prefix and/or key directly from Slice
1949 if (prefix)
1950 *prefix = string(in.data(), prefix_len);
1951 if (key)
1952 *key = string(separator+1, in.size()-prefix_len-1);
1953 return 0;
1954 }
1955
1956 void RocksDBStore::compact()
1957 {
1958 logger->inc(l_rocksdb_compact);
1959 rocksdb::CompactRangeOptions options;
1960 db->CompactRange(options, default_cf, nullptr, nullptr);
1961 for (auto cf : cf_handles) {
1962 for (auto shard_cf : cf.second.handles) {
1963 db->CompactRange(
1964 options,
1965 shard_cf,
1966 nullptr, nullptr);
1967 }
1968 }
1969 }
1970
1971 void RocksDBStore::compact_thread_entry()
1972 {
1973 std::unique_lock l{compact_queue_lock};
1974 dout(10) << __func__ << " enter" << dendl;
1975 while (!compact_queue_stop) {
1976 if (!compact_queue.empty()) {
1977 auto range = compact_queue.front();
1978 compact_queue.pop_front();
1979 logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
1980 l.unlock();
1981 logger->inc(l_rocksdb_compact_range);
1982 if (range.first.empty() && range.second.empty()) {
1983 compact();
1984 } else {
1985 compact_range(range.first, range.second);
1986 }
1987 l.lock();
1988 continue;
1989 }
1990 dout(10) << __func__ << " waiting" << dendl;
1991 compact_queue_cond.wait(l);
1992 }
1993 dout(10) << __func__ << " exit" << dendl;
1994 }
1995
1996 void RocksDBStore::compact_range_async(const string& start, const string& end)
1997 {
1998 std::lock_guard l(compact_queue_lock);
1999
2000 // try to merge adjacent ranges. this is O(n), but the queue should
2001 // be short. note that we do not cover all overlap cases and merge
2002 // opportunities here, but we capture the ones we currently need.
2003 list< pair<string,string> >::iterator p = compact_queue.begin();
2004 while (p != compact_queue.end()) {
2005 if (p->first == start && p->second == end) {
2006 // dup; no-op
2007 return;
2008 }
2009 if (start <= p->first && p->first <= end) {
2010 // new region crosses start of existing range
2011 // select right bound that is bigger
2012 compact_queue.push_back(make_pair(start, end > p->second ? end : p->second));
2013 compact_queue.erase(p);
2014 logger->inc(l_rocksdb_compact_queue_merge);
2015 break;
2016 }
2017 if (start <= p->second && p->second <= end) {
2018 // new region crosses end of existing range
2019 //p->first < p->second and p->second <= end, so p->first <= end.
2020 //But we break if previous condition, so start > p->first.
2021 compact_queue.push_back(make_pair(p->first, end));
2022 compact_queue.erase(p);
2023 logger->inc(l_rocksdb_compact_queue_merge);
2024 break;
2025 }
2026 ++p;
2027 }
2028 if (p == compact_queue.end()) {
2029 // no merge, new entry.
2030 compact_queue.push_back(make_pair(start, end));
2031 logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
2032 }
2033 compact_queue_cond.notify_all();
2034 if (!compact_thread.is_started()) {
2035 compact_thread.create("rstore_compact");
2036 }
2037 }
2038 bool RocksDBStore::check_omap_dir(string &omap_dir)
2039 {
2040 rocksdb::Options options;
2041 options.create_if_missing = true;
2042 rocksdb::DB *db;
2043 rocksdb::Status status = rocksdb::DB::Open(options, omap_dir, &db);
2044 delete db;
2045 db = nullptr;
2046 return status.ok();
2047 }
2048
2049 void RocksDBStore::compact_range(const string& start, const string& end)
2050 {
2051 rocksdb::CompactRangeOptions options;
2052 rocksdb::Slice cstart(start);
2053 rocksdb::Slice cend(end);
2054 string prefix_start, key_start;
2055 string prefix_end, key_end;
2056 string key_highest = "\xff\xff\xff\xff"; //cheating
2057 string key_lowest = "";
2058
2059 auto compact_range = [&] (const decltype(cf_handles)::iterator column_it,
2060 const std::string& start,
2061 const std::string& end) {
2062 rocksdb::Slice cstart(start);
2063 rocksdb::Slice cend(end);
2064 for (const auto& shard_it : column_it->second.handles) {
2065 db->CompactRange(options, shard_it, &cstart, &cend);
2066 }
2067 };
2068 db->CompactRange(options, default_cf, &cstart, &cend);
2069 split_key(cstart, &prefix_start, &key_start);
2070 split_key(cend, &prefix_end, &key_end);
2071 if (prefix_start == prefix_end) {
2072 const auto& column = cf_handles.find(prefix_start);
2073 if (column != cf_handles.end()) {
2074 compact_range(column, key_start, key_end);
2075 }
2076 } else {
2077 auto column = cf_handles.find(prefix_start);
2078 if (column != cf_handles.end()) {
2079 compact_range(column, key_start, key_highest);
2080 ++column;
2081 }
2082 const auto& column_end = cf_handles.find(prefix_end);
2083 while (column != column_end) {
2084 compact_range(column, key_lowest, key_highest);
2085 column++;
2086 }
2087 if (column != cf_handles.end()) {
2088 compact_range(column, key_lowest, key_end);
2089 }
2090 }
2091 }
2092
2093 RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl()
2094 {
2095 delete dbiter;
2096 }
2097 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first()
2098 {
2099 dbiter->SeekToFirst();
2100 ceph_assert(!dbiter->status().IsIOError());
2101 return dbiter->status().ok() ? 0 : -1;
2102 }
2103 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string &prefix)
2104 {
2105 rocksdb::Slice slice_prefix(prefix);
2106 dbiter->Seek(slice_prefix);
2107 ceph_assert(!dbiter->status().IsIOError());
2108 return dbiter->status().ok() ? 0 : -1;
2109 }
2110 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last()
2111 {
2112 dbiter->SeekToLast();
2113 ceph_assert(!dbiter->status().IsIOError());
2114 return dbiter->status().ok() ? 0 : -1;
2115 }
2116 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string &prefix)
2117 {
2118 string limit = past_prefix(prefix);
2119 rocksdb::Slice slice_limit(limit);
2120 dbiter->Seek(slice_limit);
2121
2122 if (!dbiter->Valid()) {
2123 dbiter->SeekToLast();
2124 } else {
2125 dbiter->Prev();
2126 }
2127 return dbiter->status().ok() ? 0 : -1;
2128 }
2129 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after)
2130 {
2131 lower_bound(prefix, after);
2132 if (valid()) {
2133 pair<string,string> key = raw_key();
2134 if (key.first == prefix && key.second == after)
2135 next();
2136 }
2137 return dbiter->status().ok() ? 0 : -1;
2138 }
2139 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to)
2140 {
2141 string bound = combine_strings(prefix, to);
2142 rocksdb::Slice slice_bound(bound);
2143 dbiter->Seek(slice_bound);
2144 return dbiter->status().ok() ? 0 : -1;
2145 }
2146 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid()
2147 {
2148 return dbiter->Valid();
2149 }
2150 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next()
2151 {
2152 if (valid()) {
2153 dbiter->Next();
2154 }
2155 ceph_assert(!dbiter->status().IsIOError());
2156 return dbiter->status().ok() ? 0 : -1;
2157 }
2158 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev()
2159 {
2160 if (valid()) {
2161 dbiter->Prev();
2162 }
2163 ceph_assert(!dbiter->status().IsIOError());
2164 return dbiter->status().ok() ? 0 : -1;
2165 }
2166 string RocksDBStore::RocksDBWholeSpaceIteratorImpl::key()
2167 {
2168 string out_key;
2169 split_key(dbiter->key(), 0, &out_key);
2170 return out_key;
2171 }
2172 pair<string,string> RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key()
2173 {
2174 string prefix, key;
2175 split_key(dbiter->key(), &prefix, &key);
2176 return make_pair(prefix, key);
2177 }
2178
2179 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key_is_prefixed(const string &prefix) {
2180 // Look for "prefix\0" right in rocksb::Slice
2181 rocksdb::Slice key = dbiter->key();
2182 if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) {
2183 return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0;
2184 } else {
2185 return false;
2186 }
2187 }
2188
2189 bufferlist RocksDBStore::RocksDBWholeSpaceIteratorImpl::value()
2190 {
2191 return to_bufferlist(dbiter->value());
2192 }
2193
2194 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::key_size()
2195 {
2196 return dbiter->key().size();
2197 }
2198
2199 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_size()
2200 {
2201 return dbiter->value().size();
2202 }
2203
2204 bufferptr RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_as_ptr()
2205 {
2206 rocksdb::Slice val = dbiter->value();
2207 return bufferptr(val.data(), val.size());
2208 }
2209
2210 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status()
2211 {
2212 return dbiter->status().ok() ? 0 : -1;
2213 }
2214
2215 string RocksDBStore::past_prefix(const string &prefix)
2216 {
2217 string limit = prefix;
2218 limit.push_back(1);
2219 return limit;
2220 }
2221
2222 class CFIteratorImpl : public KeyValueDB::IteratorImpl {
2223 protected:
2224 string prefix;
2225 rocksdb::Iterator *dbiter;
2226 const KeyValueDB::IteratorBounds bounds;
2227 const rocksdb::Slice iterate_lower_bound;
2228 const rocksdb::Slice iterate_upper_bound;
2229 public:
2230 explicit CFIteratorImpl(const RocksDBStore* db,
2231 const std::string& p,
2232 rocksdb::ColumnFamilyHandle* cf,
2233 KeyValueDB::IteratorBounds bounds_)
2234 : prefix(p), bounds(std::move(bounds_)),
2235 iterate_lower_bound(make_slice(bounds.lower_bound)),
2236 iterate_upper_bound(make_slice(bounds.upper_bound))
2237 {
2238 auto options = rocksdb::ReadOptions();
2239 if (db->cct->_conf->osd_rocksdb_iterator_bounds_enabled) {
2240 if (bounds.lower_bound) {
2241 options.iterate_lower_bound = &iterate_lower_bound;
2242 }
2243 if (bounds.upper_bound) {
2244 options.iterate_upper_bound = &iterate_upper_bound;
2245 }
2246 }
2247 dbiter = db->db->NewIterator(options, cf);
2248 }
2249 ~CFIteratorImpl() {
2250 delete dbiter;
2251 }
2252
2253 int seek_to_first() override {
2254 dbiter->SeekToFirst();
2255 return dbiter->status().ok() ? 0 : -1;
2256 }
2257 int seek_to_last() override {
2258 dbiter->SeekToLast();
2259 return dbiter->status().ok() ? 0 : -1;
2260 }
2261 int upper_bound(const string &after) override {
2262 lower_bound(after);
2263 if (valid() && (key() == after)) {
2264 next();
2265 }
2266 return dbiter->status().ok() ? 0 : -1;
2267 }
2268 int lower_bound(const string &to) override {
2269 rocksdb::Slice slice_bound(to);
2270 dbiter->Seek(slice_bound);
2271 return dbiter->status().ok() ? 0 : -1;
2272 }
2273 int next() override {
2274 if (valid()) {
2275 dbiter->Next();
2276 }
2277 return dbiter->status().ok() ? 0 : -1;
2278 }
2279 int prev() override {
2280 if (valid()) {
2281 dbiter->Prev();
2282 }
2283 return dbiter->status().ok() ? 0 : -1;
2284 }
2285 bool valid() override {
2286 return dbiter->Valid();
2287 }
2288 string key() override {
2289 return dbiter->key().ToString();
2290 }
2291 std::pair<std::string, std::string> raw_key() override {
2292 return make_pair(prefix, key());
2293 }
2294 bufferlist value() override {
2295 return to_bufferlist(dbiter->value());
2296 }
2297 bufferptr value_as_ptr() override {
2298 rocksdb::Slice val = dbiter->value();
2299 return bufferptr(val.data(), val.size());
2300 }
2301 int status() override {
2302 return dbiter->status().ok() ? 0 : -1;
2303 }
2304 };
2305
2306
2307 //merge column iterators and rest iterator
2308 class WholeMergeIteratorImpl : public KeyValueDB::WholeSpaceIteratorImpl {
2309 private:
2310 RocksDBStore* db;
2311 KeyValueDB::WholeSpaceIterator main;
2312 std::map<std::string, KeyValueDB::Iterator> shards;
2313 std::map<std::string, KeyValueDB::Iterator>::iterator current_shard;
2314 enum {on_main, on_shard} smaller;
2315
2316 public:
2317 WholeMergeIteratorImpl(RocksDBStore* db)
2318 : db(db)
2319 , main(db->get_default_cf_iterator())
2320 {
2321 for (auto& e : db->cf_handles) {
2322 shards.emplace(e.first, db->get_iterator(e.first));
2323 }
2324 }
2325
2326 // returns true if value in main is smaller then in shards
2327 // invalid is larger then actual value
2328 bool is_main_smaller() {
2329 if (main->valid()) {
2330 if (current_shard != shards.end()) {
2331 auto main_rk = main->raw_key();
2332 ceph_assert(current_shard->second->valid());
2333 auto shards_rk = current_shard->second->raw_key();
2334 if (main_rk.first < shards_rk.first)
2335 return true;
2336 if (main_rk.first > shards_rk.first)
2337 return false;
2338 return main_rk.second < shards_rk.second;
2339 } else {
2340 return true;
2341 }
2342 } else {
2343 if (current_shard != shards.end()) {
2344 return false;
2345 } else {
2346 //this means that neither is valid
2347 //we select main to be smaller, so valid() will signal properly
2348 return true;
2349 }
2350 }
2351 }
2352
2353 int seek_to_first() override {
2354 int r0 = main->seek_to_first();
2355 int r1 = 0;
2356 // find first shard that has some data
2357 current_shard = shards.begin();
2358 while (current_shard != shards.end()) {
2359 r1 = current_shard->second->seek_to_first();
2360 if (r1 != 0 || current_shard->second->valid()) {
2361 //this is the first shard that will yield some keys
2362 break;
2363 }
2364 ++current_shard;
2365 }
2366 smaller = is_main_smaller() ? on_main : on_shard;
2367 return r0 == 0 && r1 == 0 ? 0 : -1;
2368 }
2369
2370 int seek_to_first(const std::string &prefix) override {
2371 int r0 = main->seek_to_first(prefix);
2372 int r1 = 0;
2373 // find first shard that has some data
2374 current_shard = shards.lower_bound(prefix);
2375 while (current_shard != shards.end()) {
2376 r1 = current_shard->second->seek_to_first();
2377 if (r1 != 0 || current_shard->second->valid()) {
2378 //this is the first shard that will yield some keys
2379 break;
2380 }
2381 ++current_shard;
2382 }
2383 smaller = is_main_smaller() ? on_main : on_shard;
2384 return r0 == 0 && r1 == 0 ? 0 : -1;
2385 };
2386
2387 int seek_to_last() override {
2388 int r0 = main->seek_to_last();
2389 int r1 = 0;
2390 r1 = shards_seek_to_last();
2391 //if we have 2 candidates, we need to select
2392 if (main->valid()) {
2393 if (shards_valid()) {
2394 if (is_main_smaller()) {
2395 smaller = on_shard;
2396 main->next();
2397 } else {
2398 smaller = on_main;
2399 shards_next();
2400 }
2401 } else {
2402 smaller = on_main;
2403 }
2404 } else {
2405 if (shards_valid()) {
2406 smaller = on_shard;
2407 } else {
2408 smaller = on_main;
2409 }
2410 }
2411 return r0 == 0 && r1 == 0 ? 0 : -1;
2412 }
2413
2414 int seek_to_last(const std::string &prefix) override {
2415 int r0 = main->seek_to_last(prefix);
2416 int r1 = 0;
2417 // find last shard that has some data
2418 bool found = false;
2419 current_shard = shards.lower_bound(prefix);
2420 while (current_shard != shards.begin()) {
2421 r1 = current_shard->second->seek_to_last();
2422 if (r1 != 0)
2423 break;
2424 if (current_shard->second->valid()) {
2425 found = true;
2426 break;
2427 }
2428 }
2429 //if we have 2 candidates, we need to select
2430 if (main->valid() && found) {
2431 if (is_main_smaller()) {
2432 main->next();
2433 } else {
2434 shards_next();
2435 }
2436 }
2437 if (!found) {
2438 //set shards state that properly represents eof
2439 current_shard = shards.end();
2440 }
2441 smaller = is_main_smaller() ? on_main : on_shard;
2442 return r0 == 0 && r1 == 0 ? 0 : -1;
2443 }
2444
2445 int upper_bound(const std::string &prefix, const std::string &after) override {
2446 int r0 = main->upper_bound(prefix, after);
2447 int r1 = 0;
2448 if (r0 != 0)
2449 return r0;
2450 current_shard = shards.lower_bound(prefix);
2451 if (current_shard != shards.end()) {
2452 bool located = false;
2453 if (current_shard->first == prefix) {
2454 r1 = current_shard->second->upper_bound(after);
2455 if (r1 != 0)
2456 return r1;
2457 if (current_shard->second->valid()) {
2458 located = true;
2459 }
2460 }
2461 if (!located) {
2462 while (current_shard != shards.end()) {
2463 r1 = current_shard->second->seek_to_first();
2464 if (r1 != 0)
2465 return r1;
2466 if (current_shard->second->valid())
2467 break;
2468 ++current_shard;
2469 }
2470 }
2471 }
2472 smaller = is_main_smaller() ? on_main : on_shard;
2473 return 0;
2474 }
2475
2476 int lower_bound(const std::string &prefix, const std::string &to) override {
2477 int r0 = main->lower_bound(prefix, to);
2478 int r1 = 0;
2479 if (r0 != 0)
2480 return r0;
2481 current_shard = shards.lower_bound(prefix);
2482 if (current_shard != shards.end()) {
2483 bool located = false;
2484 if (current_shard->first == prefix) {
2485 r1 = current_shard->second->lower_bound(to);
2486 if (r1 != 0)
2487 return r1;
2488 if (current_shard->second->valid()) {
2489 located = true;
2490 }
2491 }
2492 if (!located) {
2493 while (current_shard != shards.end()) {
2494 r1 = current_shard->second->seek_to_first();
2495 if (r1 != 0)
2496 return r1;
2497 if (current_shard->second->valid())
2498 break;
2499 ++current_shard;
2500 }
2501 }
2502 }
2503 smaller = is_main_smaller() ? on_main : on_shard;
2504 return 0;
2505 }
2506
2507 bool valid() override {
2508 if (smaller == on_main) {
2509 return main->valid();
2510 } else {
2511 if (current_shard == shards.end())
2512 return false;
2513 return current_shard->second->valid();
2514 }
2515 };
2516
2517 int next() override {
2518 int r;
2519 if (smaller == on_main) {
2520 r = main->next();
2521 } else {
2522 r = shards_next();
2523 }
2524 if (r != 0)
2525 return r;
2526 smaller = is_main_smaller() ? on_main : on_shard;
2527 return 0;
2528 }
2529
2530 int prev() override {
2531 int r;
2532 bool main_was_valid = false;
2533 if (main->valid()) {
2534 main_was_valid = true;
2535 r = main->prev();
2536 } else {
2537 r = main->seek_to_last();
2538 }
2539 if (r != 0)
2540 return r;
2541
2542 bool shards_was_valid = false;
2543 if (shards_valid()) {
2544 shards_was_valid = true;
2545 r = shards_prev();
2546 } else {
2547 r = shards_seek_to_last();
2548 }
2549 if (r != 0)
2550 return r;
2551
2552 if (!main->valid() && !shards_valid()) {
2553 //end, no previous. set marker so valid() can work
2554 smaller = on_main;
2555 return 0;
2556 }
2557
2558 //if 1 is valid, select it
2559 //if 2 are valid select larger and advance the other
2560 if (main->valid()) {
2561 if (shards_valid()) {
2562 if (is_main_smaller()) {
2563 smaller = on_shard;
2564 if (main_was_valid) {
2565 if (main->valid()) {
2566 r = main->next();
2567 } else {
2568 r = main->seek_to_first();
2569 }
2570 } else {
2571 //if we have resurrected main, kill it
2572 if (main->valid()) {
2573 main->next();
2574 }
2575 }
2576 } else {
2577 smaller = on_main;
2578 if (shards_was_valid) {
2579 if (shards_valid()) {
2580 r = shards_next();
2581 } else {
2582 r = shards_seek_to_first();
2583 }
2584 } else {
2585 //if we have resurected shards, kill it
2586 if (shards_valid()) {
2587 shards_next();
2588 }
2589 }
2590 }
2591 } else {
2592 smaller = on_main;
2593 r = shards_seek_to_first();
2594 }
2595 } else {
2596 smaller = on_shard;
2597 r = main->seek_to_first();
2598 }
2599 return r;
2600 }
2601
2602 std::string key() override
2603 {
2604 if (smaller == on_main) {
2605 return main->key();
2606 } else {
2607 return current_shard->second->key();
2608 }
2609 }
2610
2611 std::pair<std::string,std::string> raw_key() override
2612 {
2613 if (smaller == on_main) {
2614 return main->raw_key();
2615 } else {
2616 return { current_shard->first, current_shard->second->key() };
2617 }
2618 }
2619
2620 bool raw_key_is_prefixed(const std::string &prefix) override
2621 {
2622 if (smaller == on_main) {
2623 return main->raw_key_is_prefixed(prefix);
2624 } else {
2625 return current_shard->first == prefix;
2626 }
2627 }
2628
2629 ceph::buffer::list value() override
2630 {
2631 if (smaller == on_main) {
2632 return main->value();
2633 } else {
2634 return current_shard->second->value();
2635 }
2636 }
2637
2638 int status() override
2639 {
2640 //because we already had to inspect key, it must be ok
2641 return 0;
2642 }
2643
2644 size_t key_size() override
2645 {
2646 if (smaller == on_main) {
2647 return main->key_size();
2648 } else {
2649 return current_shard->second->key().size();
2650 }
2651 }
2652 size_t value_size() override
2653 {
2654 if (smaller == on_main) {
2655 return main->value_size();
2656 } else {
2657 return current_shard->second->value().length();
2658 }
2659 }
2660
2661 int shards_valid() {
2662 if (current_shard == shards.end())
2663 return false;
2664 return current_shard->second->valid();
2665 }
2666
2667 int shards_next() {
2668 if (current_shard == shards.end()) {
2669 //illegal to next() on !valid()
2670 return -1;
2671 }
2672 int r = 0;
2673 r = current_shard->second->next();
2674 if (r != 0)
2675 return r;
2676 if (current_shard->second->valid())
2677 return 0;
2678 //current shard exhaused, search for key
2679 ++current_shard;
2680 while (current_shard != shards.end()) {
2681 r = current_shard->second->seek_to_first();
2682 if (r != 0)
2683 return r;
2684 if (current_shard->second->valid())
2685 break;
2686 ++current_shard;
2687 }
2688 //either we found key or not, but it is success
2689 return 0;
2690 }
2691
2692 int shards_prev() {
2693 if (current_shard == shards.end()) {
2694 //illegal to prev() on !valid()
2695 return -1;
2696 }
2697 int r = current_shard->second->prev();
2698 while (r == 0) {
2699 if (current_shard->second->valid()) {
2700 break;
2701 }
2702 if (current_shard == shards.begin()) {
2703 //we have reached pre-first element
2704 //this makes it !valid(), but guarantees next() moves to first element
2705 break;
2706 }
2707 --current_shard;
2708 r = current_shard->second->seek_to_last();
2709 }
2710 return r;
2711 }
2712
2713 int shards_seek_to_last() {
2714 int r = 0;
2715 current_shard = shards.end();
2716 if (current_shard == shards.begin()) {
2717 //no shards at all
2718 return 0;
2719 }
2720 while (current_shard != shards.begin()) {
2721 --current_shard;
2722 r = current_shard->second->seek_to_last();
2723 if (r != 0)
2724 return r;
2725 if (current_shard->second->valid()) {
2726 return 0;
2727 }
2728 }
2729 //no keys at all
2730 current_shard = shards.end();
2731 return r;
2732 }
2733
2734 int shards_seek_to_first() {
2735 int r = 0;
2736 current_shard = shards.begin();
2737 while (current_shard != shards.end()) {
2738 r = current_shard->second->seek_to_first();
2739 if (r != 0)
2740 break;
2741 if (current_shard->second->valid()) {
2742 //this is the first shard that will yield some keys
2743 break;
2744 }
2745 ++current_shard;
2746 }
2747 return r;
2748 }
2749 };
2750
2751 class ShardMergeIteratorImpl : public KeyValueDB::IteratorImpl {
2752 private:
2753 struct KeyLess {
2754 private:
2755 const rocksdb::Comparator* comparator;
2756 public:
2757 KeyLess(const rocksdb::Comparator* comparator) : comparator(comparator) { };
2758
2759 bool operator()(rocksdb::Iterator* a, rocksdb::Iterator* b) const
2760 {
2761 if (a->Valid()) {
2762 if (b->Valid()) {
2763 return comparator->Compare(a->key(), b->key()) < 0;
2764 } else {
2765 return true;
2766 }
2767 } else {
2768 if (b->Valid()) {
2769 return false;
2770 } else {
2771 return false;
2772 }
2773 }
2774 }
2775 };
2776
2777 const RocksDBStore* db;
2778 KeyLess keyless;
2779 string prefix;
2780 const KeyValueDB::IteratorBounds bounds;
2781 const rocksdb::Slice iterate_lower_bound;
2782 const rocksdb::Slice iterate_upper_bound;
2783 std::vector<rocksdb::Iterator*> iters;
2784 public:
2785 explicit ShardMergeIteratorImpl(const RocksDBStore* db,
2786 const std::string& prefix,
2787 const std::vector<rocksdb::ColumnFamilyHandle*>& shards,
2788 KeyValueDB::IteratorBounds bounds_)
2789 : db(db), keyless(db->comparator), prefix(prefix), bounds(std::move(bounds_)),
2790 iterate_lower_bound(make_slice(bounds.lower_bound)),
2791 iterate_upper_bound(make_slice(bounds.upper_bound))
2792 {
2793 iters.reserve(shards.size());
2794 auto options = rocksdb::ReadOptions();
2795 if (db->cct->_conf->osd_rocksdb_iterator_bounds_enabled) {
2796 if (bounds.lower_bound) {
2797 options.iterate_lower_bound = &iterate_lower_bound;
2798 }
2799 if (bounds.upper_bound) {
2800 options.iterate_upper_bound = &iterate_upper_bound;
2801 }
2802 }
2803 for (auto& s : shards) {
2804 iters.push_back(db->db->NewIterator(options, s));
2805 }
2806 }
2807 ~ShardMergeIteratorImpl() {
2808 for (auto& it : iters) {
2809 delete it;
2810 }
2811 }
2812 int seek_to_first() override {
2813 for (auto& it : iters) {
2814 it->SeekToFirst();
2815 if (!it->status().ok()) {
2816 return -1;
2817 }
2818 }
2819 //all iterators seeked, sort
2820 std::sort(iters.begin(), iters.end(), keyless);
2821 return 0;
2822 }
2823 int seek_to_last() override {
2824 for (auto& it : iters) {
2825 it->SeekToLast();
2826 if (!it->status().ok()) {
2827 return -1;
2828 }
2829 }
2830 for (size_t i = 1; i < iters.size(); i++) {
2831 if (iters[0]->Valid()) {
2832 if (iters[i]->Valid()) {
2833 if (keyless(iters[0], iters[i])) {
2834 std::swap(iters[0], iters[i]);
2835 }
2836 } else {
2837 //iters[i] empty
2838 }
2839 } else {
2840 if (iters[i]->Valid()) {
2841 std::swap(iters[0], iters[i]);
2842 }
2843 }
2844 //it might happen that cf was empty
2845 if (iters[i]->Valid()) {
2846 iters[i]->Next();
2847 }
2848 }
2849 //no need to sort, as at most 1 iterator is valid now
2850 return 0;
2851 }
2852 int upper_bound(const string &after) override {
2853 rocksdb::Slice slice_bound(after);
2854 for (auto& it : iters) {
2855 it->Seek(slice_bound);
2856 if (it->Valid() && it->key() == after) {
2857 it->Next();
2858 }
2859 if (!it->status().ok()) {
2860 return -1;
2861 }
2862 }
2863 std::sort(iters.begin(), iters.end(), keyless);
2864 return 0;
2865 }
2866 int lower_bound(const string &to) override {
2867 rocksdb::Slice slice_bound(to);
2868 for (auto& it : iters) {
2869 it->Seek(slice_bound);
2870 if (!it->status().ok()) {
2871 return -1;
2872 }
2873 }
2874 std::sort(iters.begin(), iters.end(), keyless);
2875 return 0;
2876 }
2877 int next() override {
2878 int r = -1;
2879 if (iters[0]->Valid()) {
2880 iters[0]->Next();
2881 if (iters[0]->status().ok()) {
2882 r = 0;
2883 //bubble up
2884 for (size_t i = 0; i < iters.size() - 1; i++) {
2885 if (keyless(iters[i], iters[i + 1])) {
2886 //matches, fixed
2887 break;
2888 }
2889 std::swap(iters[i], iters[i + 1]);
2890 }
2891 }
2892 }
2893 return r;
2894 }
2895 // iters are sorted, so
2896 // a[0] < b[0] < c[0] < d[0]
2897 // a[0] > a[-1], a[0] > b[-1], a[0] > c[-1], a[0] > d[-1]
2898 // so, prev() will be one of:
2899 // a[-1], b[-1], c[-1], d[-1]
2900 // prev() will be the one that is *largest* of them
2901 //
2902 // alg:
2903 // 1. go prev() on each iterator we can
2904 // 2. select largest key from those iterators
2905 // 3. go next() on all iterators except (2)
2906 // 4. sort
2907 int prev() override {
2908 std::vector<rocksdb::Iterator*> prev_done;
2909 //1
2910 for (auto it: iters) {
2911 if (it->Valid()) {
2912 it->Prev();
2913 if (it->Valid()) {
2914 prev_done.push_back(it);
2915 } else {
2916 it->SeekToFirst();
2917 }
2918 } else {
2919 it->SeekToLast();
2920 if (it->Valid()) {
2921 prev_done.push_back(it);
2922 }
2923 }
2924 }
2925 if (prev_done.size() == 0) {
2926 /* there is no previous element */
2927 if (iters[0]->Valid()) {
2928 iters[0]->Prev();
2929 ceph_assert(!iters[0]->Valid());
2930 }
2931 return 0;
2932 }
2933 //2,3
2934 rocksdb::Iterator* highest = prev_done[0];
2935 for (size_t i = 1; i < prev_done.size(); i++) {
2936 if (keyless(highest, prev_done[i])) {
2937 highest->Next();
2938 highest = prev_done[i];
2939 } else {
2940 prev_done[i]->Next();
2941 }
2942 }
2943 //4
2944 //insert highest in the beginning, and shift values until we pick highest
2945 //untouched rest is sorted - we just prev()/next() them
2946 rocksdb::Iterator* hold = highest;
2947 for (size_t i = 0; i < iters.size(); i++) {
2948 std::swap(hold, iters[i]);
2949 if (hold == highest) break;
2950 }
2951 ceph_assert(hold == highest);
2952 return 0;
2953 }
2954 bool valid() override {
2955 return iters[0]->Valid();
2956 }
2957 string key() override {
2958 return iters[0]->key().ToString();
2959 }
2960 std::pair<std::string, std::string> raw_key() override {
2961 return make_pair(prefix, key());
2962 }
2963 bufferlist value() override {
2964 return to_bufferlist(iters[0]->value());
2965 }
2966 bufferptr value_as_ptr() override {
2967 rocksdb::Slice val = iters[0]->value();
2968 return bufferptr(val.data(), val.size());
2969 }
2970 int status() override {
2971 return iters[0]->status().ok() ? 0 : -1;
2972 }
2973 };
2974
2975 KeyValueDB::Iterator RocksDBStore::get_iterator(const std::string& prefix, IteratorOpts opts, IteratorBounds bounds)
2976 {
2977 auto cf_it = cf_handles.find(prefix);
2978 if (cf_it != cf_handles.end()) {
2979 rocksdb::ColumnFamilyHandle* cf = nullptr;
2980 if (cf_it->second.handles.size() == 1) {
2981 cf = cf_it->second.handles[0];
2982 } else if (cct->_conf->osd_rocksdb_iterator_bounds_enabled) {
2983 cf = get_cf_handle(prefix, bounds);
2984 }
2985 if (cf) {
2986 return std::make_shared<CFIteratorImpl>(
2987 this,
2988 prefix,
2989 cf,
2990 std::move(bounds));
2991 } else {
2992 return std::make_shared<ShardMergeIteratorImpl>(
2993 this,
2994 prefix,
2995 cf_it->second.handles,
2996 std::move(bounds));
2997 }
2998 } else {
2999 return KeyValueDB::get_iterator(prefix, opts);
3000 }
3001 }
3002
3003 rocksdb::Iterator* RocksDBStore::new_shard_iterator(rocksdb::ColumnFamilyHandle* cf)
3004 {
3005 return db->NewIterator(rocksdb::ReadOptions(), cf);
3006 }
3007
3008 RocksDBStore::WholeSpaceIterator RocksDBStore::get_wholespace_iterator(IteratorOpts opts)
3009 {
3010 if (cf_handles.size() == 0) {
3011 return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
3012 this, default_cf, opts);
3013 } else {
3014 return std::make_shared<WholeMergeIteratorImpl>(this);
3015 }
3016 }
3017
3018 RocksDBStore::WholeSpaceIterator RocksDBStore::get_default_cf_iterator()
3019 {
3020 return std::make_shared<RocksDBWholeSpaceIteratorImpl>(this, default_cf, 0);
3021 }
3022
3023 int RocksDBStore::prepare_for_reshard(const std::string& new_sharding,
3024 RocksDBStore::columns_t& to_process_columns)
3025 {
3026 //0. lock db from opening
3027 //1. list existing columns
3028 //2. apply merge operator to (main + columns) opts
3029 //3. prepare std::vector<rocksdb::ColumnFamilyDescriptor> existing_cfs
3030 //4. open db, acquire existing column handles
3031 //5. calculate missing columns
3032 //6. create missing columns
3033 //7. construct cf_handles according to new sharding
3034 //8. check is all cf_handles are filled
3035
3036 bool b;
3037 std::vector<ColumnFamily> new_sharding_def;
3038 char const* error_position;
3039 std::string error_msg;
3040 b = parse_sharding_def(new_sharding, new_sharding_def, &error_position, &error_msg);
3041 if (!b) {
3042 dout(1) << __func__ << " bad sharding: " << dendl;
3043 dout(1) << __func__ << new_sharding << dendl;
3044 dout(1) << __func__ << std::string(error_position - &new_sharding[0], ' ') << "^" << error_msg << dendl;
3045 return -EINVAL;
3046 }
3047
3048 //0. lock db from opening
3049 std::string stored_sharding_text;
3050 rocksdb::ReadFileToString(env,
3051 sharding_def_file,
3052 &stored_sharding_text);
3053 if (stored_sharding_text.find(resharding_column_lock) == string::npos) {
3054 rocksdb::Status status;
3055 if (stored_sharding_text.size() != 0)
3056 stored_sharding_text += " ";
3057 stored_sharding_text += resharding_column_lock;
3058 env->CreateDir(sharding_def_dir);
3059 status = rocksdb::WriteStringToFile(env, stored_sharding_text,
3060 sharding_def_file, true);
3061 if (!status.ok()) {
3062 derr << __func__ << " cannot write to " << sharding_def_file << dendl;
3063 return -EIO;
3064 }
3065 }
3066
3067 //1. list existing columns
3068
3069 rocksdb::Status status;
3070 std::vector<std::string> existing_columns;
3071 rocksdb::Options opt;
3072 int r = load_rocksdb_options(false, opt);
3073 if (r) {
3074 dout(1) << __func__ << " load rocksdb options failed" << dendl;
3075 return r;
3076 }
3077 status = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(opt), path, &existing_columns);
3078 if (!status.ok()) {
3079 derr << "Unable to list column families: " << status.ToString() << dendl;
3080 return -EINVAL;
3081 }
3082 dout(5) << "existing columns = " << existing_columns << dendl;
3083
3084 //2. apply merge operator to (main + columns) opts
3085 //3. prepare std::vector<rocksdb::ColumnFamilyDescriptor> cfs_to_open
3086
3087 std::vector<rocksdb::ColumnFamilyDescriptor> cfs_to_open;
3088 for (const auto& full_name : existing_columns) {
3089 //split col_name to <prefix>-<number>
3090 std::string base_name;
3091 size_t pos = full_name.find('-');
3092 if (std::string::npos == pos)
3093 base_name = full_name;
3094 else
3095 base_name = full_name.substr(0,pos);
3096
3097 rocksdb::ColumnFamilyOptions cf_opt(opt);
3098 // search if we have options for this column
3099 std::string options;
3100 for (const auto& nsd : new_sharding_def) {
3101 if (nsd.name == base_name) {
3102 options = nsd.options;
3103 break;
3104 }
3105 }
3106 int r = update_column_family_options(base_name, options, &cf_opt);
3107 if (r != 0) {
3108 return r;
3109 }
3110 cfs_to_open.emplace_back(full_name, cf_opt);
3111 }
3112
3113 //4. open db, acquire existing column handles
3114 std::vector<rocksdb::ColumnFamilyHandle*> handles;
3115 status = rocksdb::DB::Open(rocksdb::DBOptions(opt),
3116 path, cfs_to_open, &handles, &db);
3117 if (!status.ok()) {
3118 derr << status.ToString() << dendl;
3119 return -EINVAL;
3120 }
3121 for (size_t i = 0; i < cfs_to_open.size(); i++) {
3122 dout(10) << "column " << cfs_to_open[i].name << " handle " << (void*)handles[i] << dendl;
3123 }
3124
3125 //5. calculate missing columns
3126 std::vector<std::string> new_sharding_columns;
3127 std::vector<std::string> missing_columns;
3128 sharding_def_to_columns(new_sharding_def,
3129 new_sharding_columns);
3130 dout(5) << "target columns = " << new_sharding_columns << dendl;
3131 for (const auto& n : new_sharding_columns) {
3132 bool found = false;
3133 for (const auto& e : existing_columns) {
3134 if (n == e) {
3135 found = true;
3136 break;
3137 }
3138 }
3139 if (!found) {
3140 missing_columns.push_back(n);
3141 }
3142 }
3143 dout(5) << "missing columns = " << missing_columns << dendl;
3144
3145 //6. create missing columns
3146 for (const auto& full_name : missing_columns) {
3147 std::string base_name;
3148 size_t pos = full_name.find('-');
3149 if (std::string::npos == pos)
3150 base_name = full_name;
3151 else
3152 base_name = full_name.substr(0,pos);
3153
3154 rocksdb::ColumnFamilyOptions cf_opt(opt);
3155 // search if we have options for this column
3156 std::string options;
3157 for (const auto& nsd : new_sharding_def) {
3158 if (nsd.name == base_name) {
3159 options = nsd.options;
3160 break;
3161 }
3162 }
3163 int r = update_column_family_options(base_name, options, &cf_opt);
3164 if (r != 0) {
3165 return r;
3166 }
3167 rocksdb::ColumnFamilyHandle *cf;
3168 status = db->CreateColumnFamily(cf_opt, full_name, &cf);
3169 if (!status.ok()) {
3170 derr << __func__ << " Failed to create rocksdb column family: "
3171 << full_name << dendl;
3172 return -EINVAL;
3173 }
3174 dout(10) << "created column " << full_name << " handle = " << (void*)cf << dendl;
3175 existing_columns.push_back(full_name);
3176 handles.push_back(cf);
3177 }
3178
3179 //7. construct cf_handles according to new sharding
3180 for (size_t i = 0; i < existing_columns.size(); i++) {
3181 std::string full_name = existing_columns[i];
3182 rocksdb::ColumnFamilyHandle *cf = handles[i];
3183 std::string base_name;
3184 size_t shard_idx = 0;
3185 size_t pos = full_name.find('-');
3186 dout(10) << "processing column " << full_name << dendl;
3187 if (std::string::npos == pos) {
3188 base_name = full_name;
3189 } else {
3190 base_name = full_name.substr(0,pos);
3191 shard_idx = atoi(full_name.substr(pos+1).c_str());
3192 }
3193 if (rocksdb::kDefaultColumnFamilyName == base_name) {
3194 default_cf = handles[i];
3195 must_close_default_cf = true;
3196 std::unique_ptr<rocksdb::ColumnFamilyHandle, cf_deleter_t> ptr{
3197 cf, [](rocksdb::ColumnFamilyHandle*) {}};
3198 to_process_columns.emplace(full_name, std::move(ptr));
3199 } else {
3200 for (const auto& nsd : new_sharding_def) {
3201 if (nsd.name == base_name) {
3202 if (shard_idx < nsd.shard_cnt) {
3203 add_column_family(base_name, nsd.hash_l, nsd.hash_h, shard_idx, cf);
3204 } else {
3205 //ignore columns with index larger then shard count
3206 }
3207 break;
3208 }
3209 }
3210 std::unique_ptr<rocksdb::ColumnFamilyHandle, cf_deleter_t> ptr{
3211 cf, [this](rocksdb::ColumnFamilyHandle* handle) {
3212 db->DestroyColumnFamilyHandle(handle);
3213 }};
3214 to_process_columns.emplace(full_name, std::move(ptr));
3215 }
3216 }
3217
3218 //8. check if all cf_handles are filled
3219 for (const auto& col : cf_handles) {
3220 for (size_t i = 0; i < col.second.handles.size(); i++) {
3221 if (col.second.handles[i] == nullptr) {
3222 derr << "missing handle for column " << col.first << " shard " << i << dendl;
3223 return -EIO;
3224 }
3225 }
3226 }
3227 return 0;
3228 }
3229
3230 int RocksDBStore::reshard_cleanup(const RocksDBStore::columns_t& current_columns)
3231 {
3232 std::vector<std::string> new_sharding_columns;
3233 for (const auto& [name, handle] : cf_handles) {
3234 if (handle.handles.size() == 1) {
3235 new_sharding_columns.push_back(name);
3236 } else {
3237 for (size_t i = 0; i < handle.handles.size(); i++) {
3238 new_sharding_columns.push_back(name + "-" + std::to_string(i));
3239 }
3240 }
3241 }
3242
3243 for (auto& [name, handle] : current_columns) {
3244 auto found = std::find(new_sharding_columns.begin(),
3245 new_sharding_columns.end(),
3246 name) != new_sharding_columns.end();
3247 if (found || name == rocksdb::kDefaultColumnFamilyName) {
3248 dout(5) << "Column " << name << " is part of new sharding." << dendl;
3249 continue;
3250 }
3251 dout(5) << "Column " << name << " not part of new sharding. Deleting." << dendl;
3252
3253 // verify that column is empty
3254 std::unique_ptr<rocksdb::Iterator> it{
3255 db->NewIterator(rocksdb::ReadOptions(), handle.get())};
3256 ceph_assert(it);
3257 it->SeekToFirst();
3258 ceph_assert(!it->Valid());
3259
3260 if (rocksdb::Status status = db->DropColumnFamily(handle.get()); !status.ok()) {
3261 derr << __func__ << " Failed to drop column: " << name << dendl;
3262 return -EINVAL;
3263 }
3264 }
3265 return 0;
3266 }
3267
3268 int RocksDBStore::reshard(const std::string& new_sharding, const RocksDBStore::resharding_ctrl* ctrl_in)
3269 {
3270
3271 resharding_ctrl ctrl = ctrl_in ? *ctrl_in : resharding_ctrl();
3272 size_t bytes_in_batch = 0;
3273 size_t keys_in_batch = 0;
3274 size_t bytes_per_iterator = 0;
3275 size_t keys_per_iterator = 0;
3276 size_t keys_processed = 0;
3277 size_t keys_moved = 0;
3278
3279 auto flush_batch = [&](rocksdb::WriteBatch* batch) {
3280 dout(10) << "flushing batch, " << keys_in_batch << " keys, for "
3281 << bytes_in_batch << " bytes" << dendl;
3282 rocksdb::WriteOptions woptions;
3283 woptions.sync = true;
3284 rocksdb::Status s = db->Write(woptions, batch);
3285 ceph_assert(s.ok());
3286 bytes_in_batch = 0;
3287 keys_in_batch = 0;
3288 batch->Clear();
3289 };
3290
3291 auto process_column = [&](rocksdb::ColumnFamilyHandle* handle,
3292 const std::string& fixed_prefix)
3293 {
3294 dout(5) << " column=" << (void*)handle << " prefix=" << fixed_prefix << dendl;
3295 std::unique_ptr<rocksdb::Iterator> it{
3296 db->NewIterator(rocksdb::ReadOptions(), handle)};
3297 ceph_assert(it);
3298
3299 rocksdb::WriteBatch bat;
3300 for (it->SeekToFirst(); it->Valid(); it->Next()) {
3301 rocksdb::Slice raw_key = it->key();
3302 dout(30) << "key=" << pretty_binary_string(raw_key.ToString()) << dendl;
3303 //check if need to refresh iterator
3304 if (bytes_per_iterator >= ctrl.bytes_per_iterator ||
3305 keys_per_iterator >= ctrl.keys_per_iterator) {
3306 dout(8) << "refreshing iterator" << dendl;
3307 bytes_per_iterator = 0;
3308 keys_per_iterator = 0;
3309 std::string raw_key_str = raw_key.ToString();
3310 it.reset(db->NewIterator(rocksdb::ReadOptions(), handle));
3311 ceph_assert(it);
3312 it->Seek(raw_key_str);
3313 ceph_assert(it->Valid());
3314 raw_key = it->key();
3315 }
3316 rocksdb::Slice value = it->value();
3317 std::string prefix, key;
3318 if (fixed_prefix.size() == 0) {
3319 split_key(raw_key, &prefix, &key);
3320 } else {
3321 prefix = fixed_prefix;
3322 key = raw_key.ToString();
3323 }
3324 keys_processed++;
3325 if ((keys_processed % 10000) == 0) {
3326 dout(10) << "processed " << keys_processed << " keys, moved " << keys_moved << dendl;
3327 }
3328 rocksdb::ColumnFamilyHandle* new_handle = get_cf_handle(prefix, key);
3329 if (new_handle == nullptr) {
3330 new_handle = default_cf;
3331 }
3332 if (handle == new_handle) {
3333 continue;
3334 }
3335 std::string new_raw_key;
3336 if (new_handle == default_cf) {
3337 new_raw_key = combine_strings(prefix, key);
3338 } else {
3339 new_raw_key = key;
3340 }
3341 bat.Delete(handle, raw_key);
3342 bat.Put(new_handle, new_raw_key, value);
3343 dout(25) << "moving " << (void*)handle << "/" << pretty_binary_string(raw_key.ToString()) <<
3344 " to " << (void*)new_handle << "/" << pretty_binary_string(new_raw_key) <<
3345 " size " << value.size() << dendl;
3346 keys_moved++;
3347 bytes_in_batch += new_raw_key.size() * 2 + value.size();
3348 keys_in_batch++;
3349 bytes_per_iterator += new_raw_key.size() * 2 + value.size();
3350 keys_per_iterator++;
3351
3352 //check if need to write batch
3353 if (bytes_in_batch >= ctrl.bytes_per_batch ||
3354 keys_in_batch >= ctrl.keys_per_batch) {
3355 flush_batch(&bat);
3356 if (ctrl.unittest_fail_after_first_batch) {
3357 return -1000;
3358 }
3359 }
3360 }
3361 if (bat.Count() > 0) {
3362 flush_batch(&bat);
3363 }
3364 return 0;
3365 };
3366
3367 auto close_column_handles = make_scope_guard([this] {
3368 cf_handles.clear();
3369 close();
3370 });
3371 columns_t to_process_columns;
3372 int r = prepare_for_reshard(new_sharding, to_process_columns);
3373 if (r != 0) {
3374 dout(1) << "failed to prepare db for reshard" << dendl;
3375 return r;
3376 }
3377
3378 for (auto& [name, handle] : to_process_columns) {
3379 dout(5) << "Processing column=" << name
3380 << " handle=" << handle.get() << dendl;
3381 if (name == rocksdb::kDefaultColumnFamilyName) {
3382 ceph_assert(handle.get() == default_cf);
3383 r = process_column(default_cf, std::string());
3384 } else {
3385 std::string fixed_prefix = name.substr(0, name.find('-'));
3386 dout(10) << "Prefix: " << fixed_prefix << dendl;
3387 r = process_column(handle.get(), fixed_prefix);
3388 }
3389 if (r != 0) {
3390 derr << "Error processing column " << name << dendl;
3391 return r;
3392 }
3393 if (ctrl.unittest_fail_after_processing_column) {
3394 return -1001;
3395 }
3396 }
3397
3398 r = reshard_cleanup(to_process_columns);
3399 if (r != 0) {
3400 dout(5) << "failed to cleanup after reshard" << dendl;
3401 return r;
3402 }
3403
3404 if (ctrl.unittest_fail_after_successful_processing) {
3405 return -1002;
3406 }
3407 env->CreateDir(sharding_def_dir);
3408 if (auto status = rocksdb::WriteStringToFile(env, new_sharding,
3409 sharding_def_file, true);
3410 !status.ok()) {
3411 derr << __func__ << " cannot write to " << sharding_def_file << dendl;
3412 return -EIO;
3413 }
3414
3415 return r;
3416 }
3417
3418 bool RocksDBStore::get_sharding(std::string& sharding) {
3419 rocksdb::Status status;
3420 std::string stored_sharding_text;
3421 bool result = false;
3422 sharding.clear();
3423
3424 status = env->FileExists(sharding_def_file);
3425 if (status.ok()) {
3426 status = rocksdb::ReadFileToString(env,
3427 sharding_def_file,
3428 &stored_sharding_text);
3429 if(status.ok()) {
3430 result = true;
3431 sharding = stored_sharding_text;
3432 }
3433 }
3434 return result;
3435 }