]> git.proxmox.com Git - ceph.git/blame - ceph/src/kv/RocksDBStore.cc
import ceph quincy 17.2.4
[ceph.git] / ceph / src / kv / RocksDBStore.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
20effc67 4#include <filesystem>
7c673cae 5#include <map>
7c673cae 6#include <memory>
20effc67
TL
7#include <set>
8#include <string>
7c673cae
FG
9#include <errno.h>
10#include <unistd.h>
11#include <sys/types.h>
12#include <sys/stat.h>
13
14#include "rocksdb/db.h"
15#include "rocksdb/table.h"
16#include "rocksdb/env.h"
17#include "rocksdb/slice.h"
18#include "rocksdb/cache.h"
19#include "rocksdb/filter_policy.h"
20#include "rocksdb/utilities/convenience.h"
21#include "rocksdb/merge_operator.h"
91327a77 22
7c673cae 23#include "common/perf_counters.h"
91327a77 24#include "common/PriorityCache.h"
9f95a23c 25#include "include/common_fwd.h"
f67539c2 26#include "include/scope_guard.h"
7c673cae
FG
27#include "include/str_list.h"
28#include "include/stringify.h"
29#include "include/str_map.h"
30#include "KeyValueDB.h"
31#include "RocksDBStore.h"
32
33#include "common/debug.h"
34
35#define dout_context cct
36#define dout_subsys ceph_subsys_rocksdb
37#undef dout_prefix
38#define dout_prefix *_dout << "rocksdb: "
39
20effc67
TL
40namespace fs = std::filesystem;
41
f67539c2
TL
42using std::function;
43using std::list;
44using std::map;
45using std::ostream;
46using std::pair;
47using std::set;
48using std::string;
49using std::unique_ptr;
50using std::vector;
51
52using ceph::bufferlist;
53using ceph::bufferptr;
54using ceph::Formatter;
55
56static const char* sharding_def_dir = "sharding";
57static const char* sharding_def_file = "sharding/def";
58static const char* sharding_recreate = "sharding/recreate_columns";
59static const char* resharding_column_lock = "reshardingXcommencingXlocked";
60
11fdf7f2
TL
61static bufferlist to_bufferlist(rocksdb::Slice in) {
62 bufferlist bl;
63 bl.append(bufferptr(in.data(), in.size()));
64 return bl;
65}
66
c07f9fc5
FG
67static rocksdb::SliceParts prepare_sliceparts(const bufferlist &bl,
68 vector<rocksdb::Slice> *slices)
31f18b77
FG
69{
70 unsigned n = 0;
c07f9fc5
FG
71 for (auto& buf : bl.buffers()) {
72 (*slices)[n].data_ = buf.c_str();
73 (*slices)[n].size_ = buf.length();
74 n++;
31f18b77 75 }
c07f9fc5 76 return rocksdb::SliceParts(slices->data(), slices->size());
31f18b77
FG
77}
78
11fdf7f2 79
7c673cae 80//
11fdf7f2
TL
81// One of these for the default rocksdb column family, routing each prefix
82// to the appropriate MergeOperator.
7c673cae 83//
11fdf7f2
TL
84class RocksDBStore::MergeOperatorRouter
85 : public rocksdb::AssociativeMergeOperator
86{
7c673cae 87 RocksDBStore& store;
11fdf7f2 88public:
7c673cae
FG
89 const char *Name() const override {
90 // Construct a name that rocksDB will validate against. We want to
91 // do this in a way that doesn't constrain the ordering of calls
92 // to set_merge_operator, so sort the merge operators and then
93 // construct a name from all of those parts.
94 store.assoc_name.clear();
95 map<std::string,std::string> names;
11fdf7f2
TL
96
97 for (auto& p : store.merge_ops) {
98 names[p.first] = p.second->name();
99 }
7c673cae
FG
100 for (auto& p : names) {
101 store.assoc_name += '.';
102 store.assoc_name += p.first;
103 store.assoc_name += ':';
104 store.assoc_name += p.second;
105 }
106 return store.assoc_name.c_str();
107 }
108
11fdf7f2 109 explicit MergeOperatorRouter(RocksDBStore &_store) : store(_store) {}
7c673cae
FG
110
111 bool Merge(const rocksdb::Slice& key,
11fdf7f2
TL
112 const rocksdb::Slice* existing_value,
113 const rocksdb::Slice& value,
114 std::string* new_value,
115 rocksdb::Logger* logger) const override {
116 // for default column family
117 // extract prefix from key and compare against each registered merge op;
118 // even though merge operator for explicit CF is included in merge_ops,
119 // it won't be picked up, since it won't match.
7c673cae
FG
120 for (auto& p : store.merge_ops) {
121 if (p.first.compare(0, p.first.length(),
122 key.data(), p.first.length()) == 0 &&
123 key.data()[p.first.length()] == 0) {
11fdf7f2
TL
124 if (existing_value) {
125 p.second->merge(existing_value->data(), existing_value->size(),
7c673cae
FG
126 value.data(), value.size(),
127 new_value);
11fdf7f2
TL
128 } else {
129 p.second->merge_nonexistent(value.data(), value.size(), new_value);
130 }
131 break;
7c673cae
FG
132 }
133 }
134 return true; // OK :)
135 }
11fdf7f2
TL
136};
137
138//
139// One of these per non-default column family, linked directly to the
140// merge operator for that CF/prefix (if any).
141//
142class RocksDBStore::MergeOperatorLinker
143 : public rocksdb::AssociativeMergeOperator
144{
145private:
146 std::shared_ptr<KeyValueDB::MergeOperator> mop;
147public:
148 explicit MergeOperatorLinker(const std::shared_ptr<KeyValueDB::MergeOperator> &o) : mop(o) {}
7c673cae 149
11fdf7f2
TL
150 const char *Name() const override {
151 return mop->name();
152 }
153
154 bool Merge(const rocksdb::Slice& key,
155 const rocksdb::Slice* existing_value,
156 const rocksdb::Slice& value,
157 std::string* new_value,
158 rocksdb::Logger* logger) const override {
159 if (existing_value) {
160 mop->merge(existing_value->data(), existing_value->size(),
161 value.data(), value.size(),
162 new_value);
163 } else {
164 mop->merge_nonexistent(value.data(), value.size(), new_value);
165 }
166 return true;
167 }
7c673cae
FG
168};
169
170int RocksDBStore::set_merge_operator(
171 const string& prefix,
172 std::shared_ptr<KeyValueDB::MergeOperator> mop)
173{
174 // If you fail here, it's because you can't do this on an open database
11fdf7f2 175 ceph_assert(db == nullptr);
7c673cae
FG
176 merge_ops.push_back(std::make_pair(prefix,mop));
177 return 0;
178}
179
180class CephRocksdbLogger : public rocksdb::Logger {
181 CephContext *cct;
182public:
183 explicit CephRocksdbLogger(CephContext *c) : cct(c) {
184 cct->get();
185 }
186 ~CephRocksdbLogger() override {
187 cct->put();
188 }
189
190 // Write an entry to the log file with the specified format.
191 void Logv(const char* format, va_list ap) override {
192 Logv(rocksdb::INFO_LEVEL, format, ap);
193 }
194
195 // Write an entry to the log file with the specified log level
196 // and format. Any log with level under the internal log level
197 // of *this (see @SetInfoLogLevel and @GetInfoLogLevel) will not be
198 // printed.
199 void Logv(const rocksdb::InfoLogLevel log_level, const char* format,
200 va_list ap) override {
201 int v = rocksdb::NUM_INFO_LOG_LEVELS - log_level - 1;
11fdf7f2 202 dout(ceph::dout::need_dynamic(v));
7c673cae
FG
203 char buf[65536];
204 vsnprintf(buf, sizeof(buf), format, ap);
205 *_dout << buf << dendl;
206 }
207};
208
209rocksdb::Logger *create_rocksdb_ceph_logger()
210{
211 return new CephRocksdbLogger(g_ceph_context);
212}
213
c07f9fc5 214static int string2bool(const string &val, bool &b_val)
7c673cae
FG
215{
216 if (strcasecmp(val.c_str(), "false") == 0) {
217 b_val = false;
218 return 0;
219 } else if (strcasecmp(val.c_str(), "true") == 0) {
220 b_val = true;
221 return 0;
222 } else {
223 std::string err;
224 int b = strict_strtol(val.c_str(), 10, &err);
225 if (!err.empty())
226 return -EINVAL;
227 b_val = !!b;
228 return 0;
229 }
230}
f67539c2
TL
231
232namespace rocksdb {
233extern std::string trim(const std::string& str);
234}
235
236// this function is a modification of rocksdb's StringToMap:
237// 1) accepts ' \n ; as separators
238// 2) leaves compound options with enclosing { and }
239rocksdb::Status StringToMap(const std::string& opts_str,
240 std::unordered_map<std::string, std::string>* opts_map)
241{
242 using rocksdb::Status;
243 using rocksdb::trim;
244 assert(opts_map);
245 // Example:
246 // opts_str = "write_buffer_size=1024;max_write_buffer_number=2;"
247 // "nested_opt={opt1=1;opt2=2};max_bytes_for_level_base=100"
248 size_t pos = 0;
249 std::string opts = trim(opts_str);
250 while (pos < opts.size()) {
251 size_t eq_pos = opts.find('=', pos);
252 if (eq_pos == std::string::npos) {
253 return Status::InvalidArgument("Mismatched key value pair, '=' expected");
254 }
255 std::string key = trim(opts.substr(pos, eq_pos - pos));
256 if (key.empty()) {
257 return Status::InvalidArgument("Empty key found");
258 }
259
260 // skip space after '=' and look for '{' for possible nested options
261 pos = eq_pos + 1;
262 while (pos < opts.size() && isspace(opts[pos])) {
263 ++pos;
264 }
265 // Empty value at the end
266 if (pos >= opts.size()) {
267 (*opts_map)[key] = "";
268 break;
269 }
270 if (opts[pos] == '{') {
271 int count = 1;
272 size_t brace_pos = pos + 1;
273 while (brace_pos < opts.size()) {
274 if (opts[brace_pos] == '{') {
275 ++count;
276 } else if (opts[brace_pos] == '}') {
277 --count;
278 if (count == 0) {
279 break;
280 }
281 }
282 ++brace_pos;
283 }
284 // found the matching closing brace
285 if (count == 0) {
286 //include both '{' and '}'
287 (*opts_map)[key] = trim(opts.substr(pos, brace_pos - pos + 1));
288 // skip all whitespace and move to the next ';,'
289 // brace_pos points to the matching '}'
290 pos = brace_pos + 1;
291 while (pos < opts.size() && isspace(opts[pos])) {
292 ++pos;
293 }
294 if (pos < opts.size() && opts[pos] != ';' && opts[pos] != ',') {
295 return Status::InvalidArgument(
296 "Unexpected chars after nested options");
297 }
298 ++pos;
299 } else {
300 return Status::InvalidArgument(
301 "Mismatched curly braces for nested options");
302 }
303 } else {
304 size_t sc_pos = opts.find_first_of(",;", pos);
305 if (sc_pos == std::string::npos) {
306 (*opts_map)[key] = trim(opts.substr(pos));
307 // It either ends with a trailing , ; or the last key-value pair
308 break;
309 } else {
310 (*opts_map)[key] = trim(opts.substr(pos, sc_pos - pos));
311 }
312 pos = sc_pos + 1;
313 }
314 }
315 return Status::OK();
316}
317
c07f9fc5 318int RocksDBStore::tryInterpret(const string &key, const string &val, rocksdb::Options &opt)
7c673cae
FG
319{
320 if (key == "compaction_threads") {
321 std::string err;
20effc67 322 int f = strict_iecstrtoll(val, &err);
7c673cae
FG
323 if (!err.empty())
324 return -EINVAL;
325 //Low priority threadpool is used for compaction
326 opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::LOW);
327 } else if (key == "flusher_threads") {
328 std::string err;
20effc67 329 int f = strict_iecstrtoll(val, &err);
7c673cae
FG
330 if (!err.empty())
331 return -EINVAL;
332 //High priority threadpool is used for flusher
333 opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::HIGH);
334 } else if (key == "compact_on_mount") {
335 int ret = string2bool(val, compact_on_mount);
336 if (ret != 0)
337 return ret;
338 } else if (key == "disableWAL") {
339 int ret = string2bool(val, disableWAL);
340 if (ret != 0)
341 return ret;
342 } else {
343 //unrecognize config options.
344 return -EINVAL;
345 }
346 return 0;
347}
348
c07f9fc5 349int RocksDBStore::ParseOptionsFromString(const string &opt_str, rocksdb::Options &opt)
7c673cae 350{
9f95a23c
TL
351 return ParseOptionsFromStringStatic(cct, opt_str, opt,
352 [&](const string& k, const string& v, rocksdb::Options& o) {
353 return tryInterpret(k, v, o);
354 }
355 );
356}
357
358int RocksDBStore::ParseOptionsFromStringStatic(
359 CephContext *cct,
360 const string& opt_str,
361 rocksdb::Options& opt,
362 function<int(const string&, const string&, rocksdb::Options&)> interp)
363{
364 // keep aligned with func tryInterpret
365 const set<string> need_interp_keys = {"compaction_threads", "flusher_threads", "compact_on_mount", "disableWAL"};
f67539c2
TL
366 rocksdb::Status status;
367 std::unordered_map<std::string, std::string> str_map;
368 status = StringToMap(opt_str, &str_map);
369 if (!status.ok()) {
370 dout(5) << __func__ << " error '" << status.getState() <<
371 "' while parsing options '" << opt_str << "'" << dendl;
372 return -EINVAL;
373 }
9f95a23c 374
f67539c2 375 for (auto it = str_map.begin(); it != str_map.end(); ++it) {
7c673cae 376 string this_opt = it->first + "=" + it->second;
9f95a23c
TL
377 rocksdb::Status status =
378 rocksdb::GetOptionsFromString(opt, this_opt, &opt);
20effc67 379 int r = 0;
7c673cae 380 if (!status.ok()) {
9f95a23c
TL
381 if (interp != nullptr) {
382 r = interp(it->first, it->second, opt);
383 } else if (!need_interp_keys.count(it->first)) {
384 r = -1;
385 }
7c673cae 386 if (r < 0) {
9f95a23c
TL
387 derr << status.ToString() << dendl;
388 return -EINVAL;
7c673cae
FG
389 }
390 }
f67539c2 391 lgeneric_dout(cct, 1) << " set rocksdb option " << it->first
9f95a23c 392 << " = " << it->second << dendl;
7c673cae
FG
393 }
394 return 0;
395}
396
397int RocksDBStore::init(string _options_str)
398{
399 options_str = _options_str;
400 rocksdb::Options opt;
401 //try parse options
402 if (options_str.length()) {
403 int r = ParseOptionsFromString(options_str, opt);
404 if (r != 0) {
405 return -EINVAL;
406 }
407 }
408 return 0;
409}
410
11fdf7f2 411int RocksDBStore::create_db_dir()
7c673cae
FG
412{
413 if (env) {
414 unique_ptr<rocksdb::Directory> dir;
415 env->NewDirectory(path, &dir);
416 } else {
f67539c2
TL
417 if (!fs::exists(path)) {
418 std::error_code ec;
419 if (!fs::create_directory(path, ec)) {
420 derr << __func__ << " failed to create " << path
421 << ": " << ec.message() << dendl;
422 return -ec.value();
423 }
424 fs::permissions(path,
425 fs::perms::owner_all |
426 fs::perms::group_read | fs::perms::group_exec |
427 fs::perms::others_read | fs::perms::others_exec);
7c673cae
FG
428 }
429 }
11fdf7f2
TL
430 return 0;
431}
432
433int RocksDBStore::install_cf_mergeop(
f67539c2 434 const string &key_prefix,
11fdf7f2
TL
435 rocksdb::ColumnFamilyOptions *cf_opt)
436{
437 ceph_assert(cf_opt != nullptr);
438 cf_opt->merge_operator.reset();
439 for (auto& i : merge_ops) {
f67539c2 440 if (i.first == key_prefix) {
11fdf7f2
TL
441 cf_opt->merge_operator.reset(new MergeOperatorLinker(i.second));
442 }
443 }
444 return 0;
7c673cae
FG
445}
446
11fdf7f2 447int RocksDBStore::create_and_open(ostream &out,
f67539c2 448 const std::string& cfs)
11fdf7f2
TL
449{
450 int r = create_db_dir();
451 if (r < 0)
452 return r;
f67539c2
TL
453 return do_open(out, true, false, cfs);
454}
455
456std::shared_ptr<rocksdb::Cache> RocksDBStore::create_block_cache(
457 const std::string& cache_type, size_t cache_size, double cache_prio_high) {
458 std::shared_ptr<rocksdb::Cache> cache;
459 auto shard_bits = cct->_conf->rocksdb_cache_shard_bits;
460 if (cache_type == "binned_lru") {
461 cache = rocksdb_cache::NewBinnedLRUCache(cct, cache_size, shard_bits, false, cache_prio_high);
462 } else if (cache_type == "lru") {
463 cache = rocksdb::NewLRUCache(cache_size, shard_bits);
464 } else if (cache_type == "clock") {
465 cache = rocksdb::NewClockCache(cache_size, shard_bits);
466 if (!cache) {
467 derr << "rocksdb_cache_type '" << cache
468 << "' chosen, but RocksDB not compiled with LibTBB. "
469 << dendl;
470 }
11fdf7f2 471 } else {
f67539c2 472 derr << "unrecognized rocksdb_cache_type '" << cache_type << "'" << dendl;
11fdf7f2 473 }
f67539c2 474 return cache;
11fdf7f2
TL
475}
476
477int RocksDBStore::load_rocksdb_options(bool create_if_missing, rocksdb::Options& opt)
7c673cae 478{
7c673cae
FG
479 rocksdb::Status status;
480
481 if (options_str.length()) {
482 int r = ParseOptionsFromString(options_str, opt);
483 if (r != 0) {
484 return -EINVAL;
485 }
486 }
487
f67539c2 488 if (cct->_conf->rocksdb_perf) {
7c673cae
FG
489 dbstats = rocksdb::CreateDBStatistics();
490 opt.statistics = dbstats;
491 }
492
493 opt.create_if_missing = create_if_missing;
11fdf7f2 494 if (kv_options.count("separate_wal_dir")) {
7c673cae
FG
495 opt.wal_dir = path + ".wal";
496 }
11fdf7f2
TL
497
498 // Since ceph::for_each_substr doesn't return a value and
499 // std::stoull does throw, we may as well just catch everything here.
500 try {
501 if (kv_options.count("db_paths")) {
502 list<string> paths;
503 get_str_list(kv_options["db_paths"], "; \t", paths);
504 for (auto& p : paths) {
505 size_t pos = p.find(',');
506 if (pos == std::string::npos) {
507 derr << __func__ << " invalid db path item " << p << " in "
508 << kv_options["db_paths"] << dendl;
509 return -EINVAL;
510 }
511 string path = p.substr(0, pos);
512 string size_str = p.substr(pos + 1);
513 uint64_t size = atoll(size_str.c_str());
514 if (!size) {
515 derr << __func__ << " invalid db path item " << p << " in "
516 << kv_options["db_paths"] << dendl;
517 return -EINVAL;
518 }
519 opt.db_paths.push_back(rocksdb::DbPath(path, size));
520 dout(10) << __func__ << " db_path " << path << " size " << size << dendl;
7c673cae 521 }
7c673cae 522 }
11fdf7f2
TL
523 } catch (const std::system_error& e) {
524 return -e.code().value();
7c673cae
FG
525 }
526
f67539c2
TL
527 if (cct->_conf->rocksdb_log_to_ceph_log) {
528 opt.info_log.reset(new CephRocksdbLogger(cct));
7c673cae
FG
529 }
530
531 if (priv) {
532 dout(10) << __func__ << " using custom Env " << priv << dendl;
533 opt.env = static_cast<rocksdb::Env*>(priv);
f67539c2
TL
534 } else {
535 env = opt.env;
7c673cae
FG
536 }
537
eafe8130
TL
538 opt.env->SetAllowNonOwnerAccess(false);
539
31f18b77 540 // caches
224ce89b 541 if (!set_cache_flag) {
f67539c2 542 cache_size = cct->_conf->rocksdb_cache_size;
31f18b77 543 }
f67539c2 544 uint64_t row_cache_size = cache_size * cct->_conf->rocksdb_cache_row_ratio;
31f18b77 545 uint64_t block_cache_size = cache_size - row_cache_size;
224ce89b 546
f67539c2
TL
547 bbt_opts.block_cache = create_block_cache(cct->_conf->rocksdb_cache_type, block_cache_size);
548 if (!bbt_opts.block_cache) {
91327a77 549 return -EINVAL;
31f18b77 550 }
f67539c2 551 bbt_opts.block_size = cct->_conf->rocksdb_block_size;
31f18b77 552
224ce89b
WB
553 if (row_cache_size > 0)
554 opt.row_cache = rocksdb::NewLRUCache(row_cache_size,
f67539c2
TL
555 cct->_conf->rocksdb_cache_shard_bits);
556 uint64_t bloom_bits = cct->_conf.get_val<uint64_t>("rocksdb_bloom_bits_per_key");
c07f9fc5 557 if (bloom_bits > 0) {
7c673cae 558 dout(10) << __func__ << " set bloom filter bits per key to "
c07f9fc5
FG
559 << bloom_bits << dendl;
560 bbt_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(bloom_bits));
561 }
11fdf7f2 562 using std::placeholders::_1;
f67539c2 563 if (cct->_conf.with_val<std::string>("rocksdb_index_type",
11fdf7f2
TL
564 std::bind(std::equal_to<std::string>(), _1,
565 "binary_search")))
c07f9fc5 566 bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kBinarySearch;
f67539c2 567 if (cct->_conf.with_val<std::string>("rocksdb_index_type",
11fdf7f2
TL
568 std::bind(std::equal_to<std::string>(), _1,
569 "hash_search")))
c07f9fc5 570 bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kHashSearch;
f67539c2 571 if (cct->_conf.with_val<std::string>("rocksdb_index_type",
11fdf7f2
TL
572 std::bind(std::equal_to<std::string>(), _1,
573 "two_level")))
c07f9fc5 574 bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
11fdf7f2
TL
575 if (!bbt_opts.no_block_cache) {
576 bbt_opts.cache_index_and_filter_blocks =
f67539c2 577 cct->_conf.get_val<bool>("rocksdb_cache_index_and_filter_blocks");
11fdf7f2 578 bbt_opts.cache_index_and_filter_blocks_with_high_priority =
f67539c2 579 cct->_conf.get_val<bool>("rocksdb_cache_index_and_filter_blocks_with_high_priority");
11fdf7f2 580 bbt_opts.pin_l0_filter_and_index_blocks_in_cache =
f67539c2 581 cct->_conf.get_val<bool>("rocksdb_pin_l0_filter_and_index_blocks_in_cache");
11fdf7f2 582 }
f67539c2
TL
583 bbt_opts.partition_filters = cct->_conf.get_val<bool>("rocksdb_partition_filters");
584 if (cct->_conf.get_val<Option::size_t>("rocksdb_metadata_block_size") > 0)
585 bbt_opts.metadata_block_size = cct->_conf.get_val<Option::size_t>("rocksdb_metadata_block_size");
c07f9fc5 586
7c673cae 587 opt.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbt_opts));
f67539c2 588 dout(10) << __func__ << " block size " << cct->_conf->rocksdb_block_size
1adf2230
AA
589 << ", block_cache size " << byte_u_t(block_cache_size)
590 << ", row_cache size " << byte_u_t(row_cache_size)
31f18b77 591 << "; shards "
f67539c2
TL
592 << (1 << cct->_conf->rocksdb_cache_shard_bits)
593 << ", type " << cct->_conf->rocksdb_cache_type
31f18b77 594 << dendl;
7c673cae
FG
595
596 opt.merge_operator.reset(new MergeOperatorRouter(*this));
f67539c2
TL
597 comparator = opt.comparator;
598 return 0;
599}
600
601void RocksDBStore::add_column_family(const std::string& cf_name, uint32_t hash_l, uint32_t hash_h,
602 size_t shard_idx, rocksdb::ColumnFamilyHandle *handle) {
603 dout(10) << __func__ << " column_name=" << cf_name << " shard_idx=" << shard_idx <<
604 " hash_l=" << hash_l << " hash_h=" << hash_h << " handle=" << (void*) handle << dendl;
605 bool exists = cf_handles.count(cf_name) > 0;
606 auto& column = cf_handles[cf_name];
607 if (exists) {
608 ceph_assert(hash_l == column.hash_l);
609 ceph_assert(hash_h == column.hash_h);
610 } else {
611 ceph_assert(hash_l < hash_h);
612 column.hash_l = hash_l;
613 column.hash_h = hash_h;
614 }
615 if (column.handles.size() <= shard_idx)
616 column.handles.resize(shard_idx + 1);
617 column.handles[shard_idx] = handle;
618 cf_ids_to_prefix.emplace(handle->GetID(), cf_name);
619}
620
621bool RocksDBStore::is_column_family(const std::string& prefix) {
622 return cf_handles.count(prefix);
623}
624
33c7a0ef
TL
625std::string_view RocksDBStore::get_key_hash_view(const prefix_shards& shards, const char* key, const size_t keylen) {
626 uint32_t hash_l = std::min<uint32_t>(shards.hash_l, keylen);
627 uint32_t hash_h = std::min<uint32_t>(shards.hash_h, keylen);
628 return { key + hash_l, hash_h - hash_l };
629}
630
631rocksdb::ColumnFamilyHandle *RocksDBStore::get_key_cf(const prefix_shards& shards, const char* key, const size_t keylen) {
632 auto sv = get_key_hash_view(shards, key, keylen);
633 uint32_t hash = ceph_str_hash_rjenkins(sv.data(), sv.size());
634 return shards.handles[hash % shards.handles.size()];
635}
636
f67539c2
TL
637rocksdb::ColumnFamilyHandle *RocksDBStore::get_cf_handle(const std::string& prefix, const std::string& key) {
638 auto iter = cf_handles.find(prefix);
639 if (iter == cf_handles.end()) {
640 return nullptr;
641 } else {
642 if (iter->second.handles.size() == 1) {
643 return iter->second.handles[0];
644 } else {
33c7a0ef 645 return get_key_cf(iter->second, key.data(), key.size());
f67539c2
TL
646 }
647 }
648}
649
650rocksdb::ColumnFamilyHandle *RocksDBStore::get_cf_handle(const std::string& prefix, const char* key, size_t keylen) {
651 auto iter = cf_handles.find(prefix);
652 if (iter == cf_handles.end()) {
653 return nullptr;
654 } else {
655 if (iter->second.handles.size() == 1) {
656 return iter->second.handles[0];
657 } else {
33c7a0ef 658 return get_key_cf(iter->second, key, keylen);
f67539c2
TL
659 }
660 }
661}
662
33c7a0ef
TL
663/**
664 * If the specified IteratorBounds arg has both an upper and a lower bound defined, and they have equal placement hash
665 * strings, we can be sure that the entire iteration range exists in a single CF. In that case, we return the relevant
666 * CF handle. In all other cases, we return a nullptr to indicate that the specified bounds cannot necessarily be mapped
667 * to a single CF.
668 */
669rocksdb::ColumnFamilyHandle *RocksDBStore::get_cf_handle(const std::string& prefix, const IteratorBounds& bounds) {
670 if (!bounds.lower_bound || !bounds.upper_bound) {
671 return nullptr;
672 }
673 auto iter = cf_handles.find(prefix);
674 ceph_assert(iter != cf_handles.end());
675 ceph_assert(iter->second.handles.size() != 1);
676 if (iter->second.hash_l != 0) {
677 return nullptr;
678 }
679 auto lower_bound_hash_str = get_key_hash_view(iter->second, bounds.lower_bound->data(), bounds.lower_bound->size());
680 auto upper_bound_hash_str = get_key_hash_view(iter->second, bounds.upper_bound->data(), bounds.upper_bound->size());
681 if (lower_bound_hash_str == upper_bound_hash_str) {
682 auto key = *bounds.lower_bound;
683 return get_key_cf(iter->second, key.data(), key.size());
684 } else {
685 return nullptr;
686 }
687}
688
f67539c2
TL
689/**
690 * Definition of sharding:
691 * space-separated list of: column_def [ '=' options ]
692 * column_def := column_name '(' shard_count ')'
693 * column_def := column_name '(' shard_count ',' hash_begin '-' ')'
694 * column_def := column_name '(' shard_count ',' hash_begin '-' hash_end ')'
695 * I=write_buffer_size=1048576 O(6) m(7,10-) prefix(4,0-10)=disable_auto_compactions=true,max_bytes_for_level_base=1048576
696 */
697bool RocksDBStore::parse_sharding_def(const std::string_view text_def_in,
698 std::vector<ColumnFamily>& sharding_def,
699 char const* *error_position,
700 std::string *error_msg)
701{
702 std::string_view text_def = text_def_in;
703 char const* error_position_local = nullptr;
704 std::string error_msg_local;
705 if (error_position == nullptr) {
706 error_position = &error_position_local;
707 }
708 *error_position = nullptr;
709 if (error_msg == nullptr) {
710 error_msg = &error_msg_local;
711 error_msg->clear();
712 }
713
714 sharding_def.clear();
715 while (!text_def.empty()) {
716 std::string_view options;
717 std::string_view name;
718 size_t shard_cnt = 1;
719 uint32_t l_bound = 0;
720 uint32_t h_bound = std::numeric_limits<uint32_t>::max();
721
722 std::string_view column_def;
723 size_t spos = text_def.find(' ');
724 if (spos == std::string_view::npos) {
725 column_def = text_def;
726 text_def = std::string_view(text_def.end(), 0);
727 } else {
728 column_def = text_def.substr(0, spos);
729 text_def = text_def.substr(spos + 1);
730 }
731 size_t eqpos = column_def.find('=');
732 if (eqpos != std::string_view::npos) {
733 options = column_def.substr(eqpos + 1);
734 column_def = column_def.substr(0, eqpos);
735 }
736
737 size_t bpos = column_def.find('(');
738 if (bpos != std::string_view::npos) {
739 name = column_def.substr(0, bpos);
740 const char* nptr = &column_def[bpos + 1];
741 char* endptr;
742 shard_cnt = strtol(nptr, &endptr, 10);
743 if (nptr == endptr) {
744 *error_position = nptr;
745 *error_msg = "expecting integer";
746 break;
747 }
748 nptr = endptr;
749 if (*nptr == ',') {
750 nptr++;
751 l_bound = strtol(nptr, &endptr, 10);
752 if (nptr == endptr) {
753 *error_position = nptr;
754 *error_msg = "expecting integer";
755 break;
756 }
757 nptr = endptr;
758 if (*nptr != '-') {
759 *error_position = nptr;
760 *error_msg = "expecting '-'";
761 break;
762 }
763 nptr++;
764 h_bound = strtol(nptr, &endptr, 10);
765 if (nptr == endptr) {
766 h_bound = std::numeric_limits<uint32_t>::max();
767 }
768 nptr = endptr;
769 }
770 if (*nptr != ')') {
771 *error_position = nptr;
772 *error_msg = "expecting ')'";
773 break;
774 }
775 } else {
776 name = column_def;
777 }
778 sharding_def.emplace_back(std::string(name), shard_cnt, std::string(options), l_bound, h_bound);
779 }
780 return *error_position == nullptr;
781}
782
783void RocksDBStore::sharding_def_to_columns(const std::vector<ColumnFamily>& sharding_def,
784 std::vector<std::string>& columns)
785{
786 columns.clear();
787 for (size_t i = 0; i < sharding_def.size(); i++) {
788 if (sharding_def[i].shard_cnt == 1) {
789 columns.push_back(sharding_def[i].name);
790 } else {
791 for (size_t j = 0; j < sharding_def[i].shard_cnt; j++) {
20effc67 792 columns.push_back(sharding_def[i].name + "-" + std::to_string(j));
f67539c2
TL
793 }
794 }
795 }
796}
797
798int RocksDBStore::create_shards(const rocksdb::Options& opt,
799 const std::vector<ColumnFamily>& sharding_def)
800{
801 for (auto& p : sharding_def) {
802 // copy default CF settings, block cache, merge operators as
803 // the base for new CF
804 rocksdb::ColumnFamilyOptions cf_opt(opt);
f67539c2 805 rocksdb::Status status;
522d829b
TL
806 // apply options to column family
807 int r = update_column_family_options(p.name, p.options, &cf_opt);
808 if (r != 0) {
809 return r;
f67539c2 810 }
f67539c2
TL
811 for (size_t idx = 0; idx < p.shard_cnt; idx++) {
812 std::string cf_name;
813 if (p.shard_cnt == 1)
814 cf_name = p.name;
815 else
20effc67 816 cf_name = p.name + "-" + std::to_string(idx);
f67539c2
TL
817 rocksdb::ColumnFamilyHandle *cf;
818 status = db->CreateColumnFamily(cf_opt, cf_name, &cf);
819 if (!status.ok()) {
820 derr << __func__ << " Failed to create rocksdb column family: "
821 << cf_name << dendl;
822 return -EINVAL;
823 }
824 // store the new CF handle
825 add_column_family(p.name, p.hash_l, p.hash_h, idx, cf);
826 }
827 }
828 return 0;
829}
830
831int RocksDBStore::apply_sharding(const rocksdb::Options& opt,
832 const std::string& sharding_text)
833{
834 // create and open column families
835 if (!sharding_text.empty()) {
836 bool b;
837 int r;
838 rocksdb::Status status;
839 std::vector<ColumnFamily> sharding_def;
840 char const* error_position;
841 std::string error_msg;
842 b = parse_sharding_def(sharding_text, sharding_def, &error_position, &error_msg);
843 if (!b) {
844 dout(1) << __func__ << " bad sharding: " << dendl;
845 dout(1) << __func__ << sharding_text << dendl;
846 dout(1) << __func__ << std::string(error_position - &sharding_text[0], ' ') << "^" << error_msg << dendl;
847 return -EINVAL;
848 }
849 r = create_shards(opt, sharding_def);
850 if (r != 0 ) {
851 derr << __func__ << " create_shards failed error=" << r << dendl;
852 return r;
853 }
854 opt.env->CreateDir(sharding_def_dir);
855 status = rocksdb::WriteStringToFile(opt.env, sharding_text,
856 sharding_def_file, true);
857 if (!status.ok()) {
858 derr << __func__ << " cannot write to " << sharding_def_file << dendl;
859 return -EIO;
860 }
861 } else {
862 opt.env->DeleteFile(sharding_def_file);
863 }
864 return 0;
865}
522d829b 866
f67539c2
TL
867// linking to rocksdb function defined in options_helper.cc
868// it can parse nested params like "nested_opt={opt1=1;opt2=2}"
f67539c2
TL
869extern rocksdb::Status rocksdb::StringToMap(const std::string& opts_str,
870 std::unordered_map<std::string, std::string>* opts_map);
871
522d829b
TL
872// Splits column family options from single string into name->value column_opts_map.
873// The split is done using RocksDB parser that understands "{" and "}", so it
874// properly extracts compound options.
875// If non-RocksDB option "block_cache" is defined it is extracted to block_cache_opt.
876int RocksDBStore::split_column_family_options(const std::string& options,
877 std::unordered_map<std::string, std::string>* opt_map,
f67539c2
TL
878 std::string* block_cache_opt)
879{
522d829b
TL
880 dout(20) << __func__ << " options=" << options << dendl;
881 rocksdb::Status status = rocksdb::StringToMap(options, opt_map);
f67539c2 882 if (!status.ok()) {
522d829b
TL
883 dout(5) << __func__ << " error '" << status.getState()
884 << "' while parsing options '" << options << "'" << dendl;
f67539c2
TL
885 return -EINVAL;
886 }
522d829b
TL
887 // if "block_cache" option exists, then move it to separate string
888 if (auto it = opt_map->find("block_cache"); it != opt_map->end()) {
f67539c2 889 *block_cache_opt = it->second;
522d829b 890 opt_map->erase(it);
f67539c2
TL
891 } else {
892 block_cache_opt->clear();
893 }
894 return 0;
895}
896
522d829b
TL
897// Updates column family options.
898// Take options from more_options and apply them to cf_opt.
899// Allowed options are exactly the same as allowed for column families in RocksDB.
900// Ceph addition is "block_cache" option that is translated to block_cache and
901// allows to specialize separate block cache for O column family.
902//
903// base_name - name of column without shard suffix: "-"+number
904// options - additional options to apply
905// cf_opt - column family options to update
906int RocksDBStore::update_column_family_options(const std::string& base_name,
907 const std::string& more_options,
908 rocksdb::ColumnFamilyOptions* cf_opt)
909{
910 std::unordered_map<std::string, std::string> options_map;
911 std::string block_cache_opt;
912 rocksdb::Status status;
913 int r = split_column_family_options(more_options, &options_map, &block_cache_opt);
914 if (r != 0) {
915 dout(5) << __func__ << " failed to parse options; column family=" << base_name
916 << " options=" << more_options << dendl;
917 return r;
918 }
919 status = rocksdb::GetColumnFamilyOptionsFromMap(*cf_opt, options_map, cf_opt);
920 if (!status.ok()) {
921 dout(5) << __func__ << " invalid column family optionsp; column family="
922 << base_name << " options=" << more_options << dendl;
923 dout(5) << __func__ << " RocksDB error='" << status.getState() << "'" << dendl;
924 return -EINVAL;
925 }
926 if (base_name != rocksdb::kDefaultColumnFamilyName) {
927 // default cf has its merge operator defined in load_rocksdb_options, should not override it
928 install_cf_mergeop(base_name, cf_opt);
929 }
930 if (!block_cache_opt.empty()) {
931 r = apply_block_cache_options(base_name, block_cache_opt, cf_opt);
932 if (r != 0) {
933 // apply_block_cache_options already does all necessary douts
934 return r;
935 }
936 }
937 return 0;
938}
939
940int RocksDBStore::apply_block_cache_options(const std::string& column_name,
941 const std::string& block_cache_opt,
942 rocksdb::ColumnFamilyOptions* cf_opt)
943{
944 rocksdb::Status status;
945 std::unordered_map<std::string, std::string> cache_options_map;
946 status = rocksdb::StringToMap(block_cache_opt, &cache_options_map);
947 if (!status.ok()) {
948 dout(5) << __func__ << " invalid block cache options; column=" << column_name
949 << " options=" << block_cache_opt << dendl;
950 dout(5) << __func__ << " RocksDB error='" << status.getState() << "'" << dendl;
951 return -EINVAL;
952 }
953 bool require_new_block_cache = false;
954 std::string cache_type = cct->_conf->rocksdb_cache_type;
955 if (const auto it = cache_options_map.find("type"); it != cache_options_map.end()) {
956 cache_type = it->second;
957 cache_options_map.erase(it);
958 require_new_block_cache = true;
959 }
960 size_t cache_size = cct->_conf->rocksdb_cache_size;
961 if (auto it = cache_options_map.find("size"); it != cache_options_map.end()) {
962 std::string error;
963 cache_size = strict_iecstrtoll(it->second.c_str(), &error);
964 if (!error.empty()) {
965 dout(10) << __func__ << " invalid size: '" << it->second << "'" << dendl;
966 return -EINVAL;
967 }
968 cache_options_map.erase(it);
969 require_new_block_cache = true;
970 }
971 double high_pri_pool_ratio = 0.0;
972 if (auto it = cache_options_map.find("high_ratio"); it != cache_options_map.end()) {
973 std::string error;
974 high_pri_pool_ratio = strict_strtod(it->second.c_str(), &error);
975 if (!error.empty()) {
976 dout(10) << __func__ << " invalid high_pri (float): '" << it->second << "'" << dendl;
977 return -EINVAL;
978 }
979 cache_options_map.erase(it);
980 require_new_block_cache = true;
981 }
f67539c2 982
522d829b
TL
983 rocksdb::BlockBasedTableOptions column_bbt_opts;
984 status = GetBlockBasedTableOptionsFromMap(bbt_opts, cache_options_map, &column_bbt_opts);
985 if (!status.ok()) {
986 dout(5) << __func__ << " invalid block cache options; column=" << column_name
987 << " options=" << block_cache_opt << dendl;
988 dout(5) << __func__ << " RocksDB error='" << status.getState() << "'" << dendl;
989 return -EINVAL;
990 }
991 std::shared_ptr<rocksdb::Cache> block_cache;
992 if (column_bbt_opts.no_block_cache) {
993 // clear all settings except no_block_cache
994 // rocksdb does not like then
995 column_bbt_opts = rocksdb::BlockBasedTableOptions();
996 column_bbt_opts.no_block_cache = true;
997 } else {
998 if (require_new_block_cache) {
999 block_cache = create_block_cache(cache_type, cache_size, high_pri_pool_ratio);
1000 if (!block_cache) {
1001 dout(5) << __func__ << " failed to create block cache for params: " << block_cache_opt << dendl;
1002 return -EINVAL;
1003 }
1004 } else {
1005 block_cache = bbt_opts.block_cache;
1006 }
1007 }
1008 column_bbt_opts.block_cache = block_cache;
1009 cf_bbt_opts[column_name] = column_bbt_opts;
1010 cf_opt->table_factory.reset(NewBlockBasedTableFactory(cf_bbt_opts[column_name]));
1011 return 0;
1012}
f67539c2
TL
1013
1014int RocksDBStore::verify_sharding(const rocksdb::Options& opt,
1015 std::vector<rocksdb::ColumnFamilyDescriptor>& existing_cfs,
1016 std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> >& existing_cfs_shard,
1017 std::vector<rocksdb::ColumnFamilyDescriptor>& missing_cfs,
1018 std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> >& missing_cfs_shard)
1019{
1020 rocksdb::Status status;
1021 std::string stored_sharding_text;
1022 status = opt.env->FileExists(sharding_def_file);
1023 if (status.ok()) {
1024 status = rocksdb::ReadFileToString(opt.env,
1025 sharding_def_file,
1026 &stored_sharding_text);
1027 if(!status.ok()) {
1028 derr << __func__ << " cannot read from " << sharding_def_file << dendl;
1029 return -EIO;
1030 }
1031 dout(20) << __func__ << " sharding=" << stored_sharding_text << dendl;
1032 } else {
1033 dout(30) << __func__ << " no sharding" << dendl;
1034 //no "sharding_def" present
1035 }
1036 //check if sharding_def matches stored_sharding_def
1037 std::vector<ColumnFamily> stored_sharding_def;
1038 parse_sharding_def(stored_sharding_text, stored_sharding_def);
1039
1040 std::sort(stored_sharding_def.begin(), stored_sharding_def.end(),
1041 [](ColumnFamily& a, ColumnFamily& b) { return a.name < b.name; } );
1042
1043 std::vector<string> rocksdb_cfs;
1044 status = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(opt),
1045 path, &rocksdb_cfs);
1046 if (!status.ok()) {
522d829b 1047 derr << __func__ << " unable to list column families: " << status.ToString() << dendl;
f67539c2
TL
1048 return -EIO;
1049 }
1050 dout(5) << __func__ << " column families from rocksdb: " << rocksdb_cfs << dendl;
1051
1052 auto emplace_cf = [&] (const RocksDBStore::ColumnFamily& column,
1053 int32_t shard_id,
1054 const std::string& shard_name,
1055 const rocksdb::ColumnFamilyOptions& opt) {
1056 if (std::find(rocksdb_cfs.begin(), rocksdb_cfs.end(), shard_name) != rocksdb_cfs.end()) {
1057 existing_cfs.emplace_back(shard_name, opt);
1058 existing_cfs_shard.emplace_back(shard_id, column);
1059 } else {
1060 missing_cfs.emplace_back(shard_name, opt);
1061 missing_cfs_shard.emplace_back(shard_id, column);
1062 }
1063 };
1064
1065 for (auto& column : stored_sharding_def) {
1066 rocksdb::ColumnFamilyOptions cf_opt(opt);
522d829b 1067 int r = update_column_family_options(column.name, column.options, &cf_opt);
f67539c2 1068 if (r != 0) {
522d829b 1069 return r;
f67539c2
TL
1070 }
1071 if (column.shard_cnt == 1) {
1072 emplace_cf(column, 0, column.name, cf_opt);
1073 } else {
1074 for (size_t i = 0; i < column.shard_cnt; i++) {
20effc67 1075 std::string cf_name = column.name + "-" + std::to_string(i);
f67539c2
TL
1076 emplace_cf(column, i, cf_name, cf_opt);
1077 }
1078 }
1079 }
1080 existing_cfs.emplace_back("default", opt);
11fdf7f2 1081
f67539c2
TL
1082 if (existing_cfs.size() != rocksdb_cfs.size()) {
1083 std::vector<std::string> columns_from_stored;
1084 sharding_def_to_columns(stored_sharding_def, columns_from_stored);
1085 derr << __func__ << " extra columns in rocksdb. rocksdb columns = " << rocksdb_cfs
1086 << " target columns = " << columns_from_stored << dendl;
1087 return -EIO;
1088 }
11fdf7f2
TL
1089 return 0;
1090}
1091
f67539c2
TL
1092std::ostream& operator<<(std::ostream& out, const RocksDBStore::ColumnFamily& cf)
1093{
1094 out << "(";
1095 out << cf.name;
1096 out << ",";
1097 out << cf.shard_cnt;
1098 out << ",";
1099 out << cf.hash_l;
1100 out << "-";
1101 if (cf.hash_h != std::numeric_limits<uint32_t>::max()) {
1102 out << cf.hash_h;
1103 }
1104 out << ",";
1105 out << cf.options;
1106 out << ")";
1107 return out;
1108}
1109
11fdf7f2
TL
1110int RocksDBStore::do_open(ostream &out,
1111 bool create_if_missing,
1112 bool open_readonly,
f67539c2 1113 const std::string& sharding_text)
11fdf7f2
TL
1114{
1115 ceph_assert(!(create_if_missing && open_readonly));
1116 rocksdb::Options opt;
1117 int r = load_rocksdb_options(create_if_missing, opt);
1118 if (r) {
1119 dout(1) << __func__ << " load rocksdb options failed" << dendl;
1120 return r;
1121 }
1122 rocksdb::Status status;
1123 if (create_if_missing) {
1124 status = rocksdb::DB::Open(opt, path, &db);
1125 if (!status.ok()) {
1126 derr << status.ToString() << dendl;
1127 return -EINVAL;
1128 }
f67539c2
TL
1129 r = apply_sharding(opt, sharding_text);
1130 if (r < 0) {
1131 return r;
11fdf7f2
TL
1132 }
1133 default_cf = db->DefaultColumnFamily();
1134 } else {
f67539c2
TL
1135 std::vector<rocksdb::ColumnFamilyDescriptor> existing_cfs;
1136 std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> > existing_cfs_shard;
1137 std::vector<rocksdb::ColumnFamilyDescriptor> missing_cfs;
1138 std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> > missing_cfs_shard;
1139
1140 r = verify_sharding(opt,
1141 existing_cfs, existing_cfs_shard,
1142 missing_cfs, missing_cfs_shard);
1143 if (r < 0) {
1144 return r;
1145 }
1146 std::string sharding_recreate_text;
1147 status = rocksdb::ReadFileToString(opt.env,
1148 sharding_recreate,
1149 &sharding_recreate_text);
1150 bool recreate_mode = status.ok() && sharding_recreate_text == "1";
1151
1152 ceph_assert(!recreate_mode || !open_readonly);
1153 if (recreate_mode == false && missing_cfs.size() != 0) {
1154 // We do not accept when there are missing column families, except case that we are during resharding.
1155 // We can get into this case if resharding was interrupted. It gives a chance to continue.
1156 // Opening DB is only allowed in read-only mode.
1157 if (open_readonly == false &&
1158 std::find_if(missing_cfs.begin(), missing_cfs.end(),
1159 [](const rocksdb::ColumnFamilyDescriptor& c) { return c.name == resharding_column_lock; }
1160 ) != missing_cfs.end()) {
1161 derr << __func__ << " missing column families: " << missing_cfs_shard << dendl;
1162 return -EIO;
1163 }
1164 }
1165
11fdf7f2
TL
1166 if (existing_cfs.empty()) {
1167 // no column families
1168 if (open_readonly) {
f67539c2 1169 status = rocksdb::DB::OpenForReadOnly(opt, path, &db);
11fdf7f2 1170 } else {
f67539c2 1171 status = rocksdb::DB::Open(opt, path, &db);
11fdf7f2
TL
1172 }
1173 if (!status.ok()) {
1174 derr << status.ToString() << dendl;
1175 return -EINVAL;
1176 }
1177 default_cf = db->DefaultColumnFamily();
1178 } else {
11fdf7f2
TL
1179 std::vector<rocksdb::ColumnFamilyHandle*> handles;
1180 if (open_readonly) {
1181 status = rocksdb::DB::OpenForReadOnly(rocksdb::DBOptions(opt),
f67539c2 1182 path, existing_cfs,
11fdf7f2
TL
1183 &handles, &db);
1184 } else {
1185 status = rocksdb::DB::Open(rocksdb::DBOptions(opt),
f67539c2 1186 path, existing_cfs, &handles, &db);
11fdf7f2
TL
1187 }
1188 if (!status.ok()) {
1189 derr << status.ToString() << dendl;
1190 return -EINVAL;
1191 }
f67539c2
TL
1192 ceph_assert(existing_cfs.size() == existing_cfs_shard.size() + 1);
1193 ceph_assert(handles.size() == existing_cfs.size());
1194 dout(10) << __func__ << " existing_cfs=" << existing_cfs.size() << dendl;
1195 for (size_t i = 0; i < existing_cfs_shard.size(); i++) {
1196 add_column_family(existing_cfs_shard[i].second.name,
1197 existing_cfs_shard[i].second.hash_l,
1198 existing_cfs_shard[i].second.hash_h,
1199 existing_cfs_shard[i].first,
1200 handles[i]);
1201 }
1202 default_cf = handles[handles.size() - 1];
1203 must_close_default_cf = true;
1204
1205 if (missing_cfs.size() > 0 &&
1206 std::find_if(missing_cfs.begin(), missing_cfs.end(),
1207 [](const rocksdb::ColumnFamilyDescriptor& c) { return c.name == resharding_column_lock; }
1208 ) == missing_cfs.end())
1209 {
1210 dout(10) << __func__ << " missing_cfs=" << missing_cfs.size() << dendl;
1211 ceph_assert(recreate_mode);
1212 ceph_assert(missing_cfs.size() == missing_cfs_shard.size());
1213 for (size_t i = 0; i < missing_cfs.size(); i++) {
1214 rocksdb::ColumnFamilyHandle *cf;
1215 status = db->CreateColumnFamily(missing_cfs[i].options, missing_cfs[i].name, &cf);
1216 if (!status.ok()) {
1217 derr << __func__ << " Failed to create rocksdb column family: "
1218 << missing_cfs[i].name << dendl;
1219 return -EINVAL;
1220 }
1221 add_column_family(missing_cfs_shard[i].second.name,
1222 missing_cfs_shard[i].second.hash_l,
1223 missing_cfs_shard[i].second.hash_h,
1224 missing_cfs_shard[i].first,
1225 cf);
11fdf7f2 1226 }
f67539c2 1227 opt.env->DeleteFile(sharding_recreate);
11fdf7f2
TL
1228 }
1229 }
7c673cae 1230 }
11fdf7f2 1231 ceph_assert(default_cf != nullptr);
7c673cae 1232
f67539c2 1233 PerfCountersBuilder plb(cct, "rocksdb", l_rocksdb_first, l_rocksdb_last);
7c673cae
FG
1234 plb.add_time_avg(l_rocksdb_get_latency, "get_latency", "Get latency");
1235 plb.add_time_avg(l_rocksdb_submit_latency, "submit_latency", "Submit Latency");
1236 plb.add_time_avg(l_rocksdb_submit_sync_latency, "submit_sync_latency", "Submit Sync Latency");
1237 plb.add_u64_counter(l_rocksdb_compact, "compact", "Compactions");
1238 plb.add_u64_counter(l_rocksdb_compact_range, "compact_range", "Compactions by range");
1239 plb.add_u64_counter(l_rocksdb_compact_queue_merge, "compact_queue_merge", "Mergings of ranges in compaction queue");
1240 plb.add_u64(l_rocksdb_compact_queue_len, "compact_queue_len", "Length of compaction queue");
1241 plb.add_time_avg(l_rocksdb_write_wal_time, "rocksdb_write_wal_time", "Rocksdb write wal time");
1242 plb.add_time_avg(l_rocksdb_write_memtable_time, "rocksdb_write_memtable_time", "Rocksdb write memtable time");
1243 plb.add_time_avg(l_rocksdb_write_delay_time, "rocksdb_write_delay_time", "Rocksdb write delay time");
1244 plb.add_time_avg(l_rocksdb_write_pre_and_post_process_time,
1245 "rocksdb_write_pre_and_post_time", "total time spent on writing a record, excluding write process");
1246 logger = plb.create_perf_counters();
1247 cct->get_perfcounters_collection()->add(logger);
1248
1249 if (compact_on_mount) {
1250 derr << "Compacting rocksdb store..." << dendl;
1251 compact();
1252 derr << "Finished compacting rocksdb store" << dendl;
1253 }
1254 return 0;
1255}
1256
1257int RocksDBStore::_test_init(const string& dir)
1258{
1259 rocksdb::Options options;
1260 options.create_if_missing = true;
1261 rocksdb::DB *db;
1262 rocksdb::Status status = rocksdb::DB::Open(options, dir, &db);
1263 delete db;
1264 db = nullptr;
1265 return status.ok() ? 0 : -EIO;
1266}
1267
1268RocksDBStore::~RocksDBStore()
1269{
1270 close();
7c673cae
FG
1271 if (priv) {
1272 delete static_cast<rocksdb::Env*>(priv);
1273 }
1274}
1275
1276void RocksDBStore::close()
1277{
1278 // stop compaction thread
9f95a23c 1279 compact_queue_lock.lock();
7c673cae 1280 if (compact_thread.is_started()) {
92f5a8d4 1281 dout(1) << __func__ << " waiting for compaction thread to stop" << dendl;
7c673cae 1282 compact_queue_stop = true;
9f95a23c
TL
1283 compact_queue_cond.notify_all();
1284 compact_queue_lock.unlock();
7c673cae 1285 compact_thread.join();
f67539c2 1286 dout(1) << __func__ << " compaction thread to stopped" << dendl;
7c673cae 1287 } else {
9f95a23c 1288 compact_queue_lock.unlock();
7c673cae
FG
1289 }
1290
f67539c2 1291 if (logger) {
7c673cae 1292 cct->get_perfcounters_collection()->remove(logger);
f67539c2
TL
1293 delete logger;
1294 logger = nullptr;
1295 }
1296
1297 // Ensure db is destroyed before dependent db_cache and filterpolicy
1298 for (auto& p : cf_handles) {
1299 for (size_t i = 0; i < p.second.handles.size(); i++) {
1300 db->DestroyColumnFamilyHandle(p.second.handles[i]);
1301 }
1302 }
1303 cf_handles.clear();
1304 if (must_close_default_cf) {
1305 db->DestroyColumnFamilyHandle(default_cf);
1306 must_close_default_cf = false;
1307 }
1308 default_cf = nullptr;
1309 delete db;
1310 db = nullptr;
7c673cae
FG
1311}
1312
11fdf7f2
TL
1313int RocksDBStore::repair(std::ostream &out)
1314{
f67539c2 1315 rocksdb::Status status;
11fdf7f2
TL
1316 rocksdb::Options opt;
1317 int r = load_rocksdb_options(false, opt);
1318 if (r) {
1319 dout(1) << __func__ << " load rocksdb options failed" << dendl;
1320 out << "load rocksdb options failed" << std::endl;
1321 return r;
1322 }
f67539c2
TL
1323 //need to save sharding definition, repairDB will delete files it does not know
1324 std::string stored_sharding_text;
1325 status = opt.env->FileExists(sharding_def_file);
11fdf7f2 1326 if (status.ok()) {
f67539c2
TL
1327 status = rocksdb::ReadFileToString(opt.env,
1328 sharding_def_file,
1329 &stored_sharding_text);
1330 if (!status.ok()) {
1331 stored_sharding_text.clear();
1332 }
1333 }
1334 dout(10) << __func__ << " stored_sharding: " << stored_sharding_text << dendl;
1335 status = rocksdb::RepairDB(path, opt);
1336 bool repaired = status.ok();
1337 if (!stored_sharding_text.empty()) {
1338 //recreate markers even if repair failed
1339 opt.env->CreateDir(sharding_def_dir);
1340 status = rocksdb::WriteStringToFile(opt.env, stored_sharding_text,
1341 sharding_def_file, true);
1342 if (!status.ok()) {
1343 derr << __func__ << " cannot write to " << sharding_def_file << dendl;
1344 return -1;
1345 }
1346 status = rocksdb::WriteStringToFile(opt.env, "1",
1347 sharding_recreate, true);
1348 if (!status.ok()) {
1349 derr << __func__ << " cannot write to " << sharding_recreate << dendl;
1350 return -1;
1351 }
1352 // fiinalize sharding recreate
1353 if (do_open(out, false, false)) {
1354 derr << __func__ << " cannot finalize repair" << dendl;
1355 return -1;
1356 }
1357 close();
1358 }
1359
1360 if (repaired && status.ok()) {
11fdf7f2
TL
1361 return 0;
1362 } else {
1363 out << "repair rocksdb failed : " << status.ToString() << std::endl;
f67539c2 1364 return -1;
11fdf7f2
TL
1365 }
1366}
1367
7c673cae
FG
1368void RocksDBStore::split_stats(const std::string &s, char delim, std::vector<std::string> &elems) {
1369 std::stringstream ss;
1370 ss.str(s);
1371 std::string item;
1372 while (std::getline(ss, item, delim)) {
1373 elems.push_back(item);
1374 }
1375}
1376
9f95a23c
TL
1377bool RocksDBStore::get_property(
1378 const std::string &property,
1379 uint64_t *out)
1380{
1381 return db->GetIntProperty(property, out);
1382}
1383
1384int64_t RocksDBStore::estimate_prefix_size(const string& prefix,
1385 const string& key_prefix)
11fdf7f2 1386{
11fdf7f2 1387 uint64_t size = 0;
f67539c2
TL
1388 auto p_iter = cf_handles.find(prefix);
1389 if (p_iter != cf_handles.end()) {
1390 for (auto cf : p_iter->second.handles) {
1391 uint64_t s = 0;
1392 string start = key_prefix + string(1, '\x00');
1393 string limit = key_prefix + string("\xff\xff\xff\xff");
1394 rocksdb::Range r(start, limit);
33c7a0ef 1395 db->GetApproximateSizes(cf, &r, 1, &s);
f67539c2
TL
1396 size += s;
1397 }
11fdf7f2 1398 } else {
9f95a23c
TL
1399 string start = combine_strings(prefix , key_prefix);
1400 string limit = combine_strings(prefix , key_prefix + "\xff\xff\xff\xff");
1401 rocksdb::Range r(start, limit);
33c7a0ef 1402 db->GetApproximateSizes(default_cf, &r, 1, &size);
11fdf7f2
TL
1403 }
1404 return size;
1405}
1406
7c673cae
FG
1407void RocksDBStore::get_statistics(Formatter *f)
1408{
f67539c2 1409 if (!cct->_conf->rocksdb_perf) {
11fdf7f2 1410 dout(20) << __func__ << " RocksDB perf is disabled, can't probe for stats"
7c673cae
FG
1411 << dendl;
1412 return;
1413 }
1414
f67539c2 1415 if (cct->_conf->rocksdb_collect_compaction_stats) {
7c673cae
FG
1416 std::string stat_str;
1417 bool status = db->GetProperty("rocksdb.stats", &stat_str);
1418 if (status) {
1419 f->open_object_section("rocksdb_statistics");
1420 f->dump_string("rocksdb_compaction_statistics", "");
1421 vector<string> stats;
1422 split_stats(stat_str, '\n', stats);
1423 for (auto st :stats) {
1424 f->dump_string("", st);
1425 }
1426 f->close_section();
1427 }
1428 }
f67539c2 1429 if (cct->_conf->rocksdb_collect_extended_stats) {
7c673cae
FG
1430 if (dbstats) {
1431 f->open_object_section("rocksdb_extended_statistics");
1432 string stat_str = dbstats->ToString();
1433 vector<string> stats;
1434 split_stats(stat_str, '\n', stats);
1435 f->dump_string("rocksdb_extended_statistics", "");
1436 for (auto st :stats) {
1437 f->dump_string(".", st);
1438 }
1439 f->close_section();
1440 }
1441 f->open_object_section("rocksdbstore_perf_counters");
1442 logger->dump_formatted(f,0);
1443 f->close_section();
1444 }
f67539c2 1445 if (cct->_conf->rocksdb_collect_memory_stats) {
7c673cae 1446 f->open_object_section("rocksdb_memtable_statistics");
11fdf7f2
TL
1447 std::string str;
1448 if (!bbt_opts.no_block_cache) {
1449 str.append(stringify(bbt_opts.block_cache->GetUsage()));
1450 f->dump_string("block_cache_usage", str.data());
1451 str.clear();
1452 str.append(stringify(bbt_opts.block_cache->GetPinnedUsage()));
1453 f->dump_string("block_cache_pinned_blocks_usage", str);
1454 str.clear();
1455 }
7c673cae
FG
1456 db->GetProperty("rocksdb.cur-size-all-mem-tables", &str);
1457 f->dump_string("rocksdb_memtable_usage", str);
11fdf7f2
TL
1458 str.clear();
1459 db->GetProperty("rocksdb.estimate-table-readers-mem", &str);
1460 f->dump_string("rocksdb_index_filter_blocks_usage", str);
7c673cae
FG
1461 f->close_section();
1462 }
1463}
1464
f67539c2
TL
1465struct RocksDBStore::RocksWBHandler: public rocksdb::WriteBatch::Handler {
1466 RocksWBHandler(const RocksDBStore& db) : db(db) {}
1467 const RocksDBStore& db;
1468 std::stringstream seen;
1469 int num_seen = 0;
1470
1471 void dump(const char* op_name,
1472 uint32_t column_family_id,
1473 const rocksdb::Slice& key_in,
1474 const rocksdb::Slice* value = nullptr) {
1475 string prefix;
1476 string key;
1477 ssize_t size = value ? value->size() : -1;
1478 seen << std::endl << op_name << "(";
1479
1480 if (column_family_id == 0) {
1481 db.split_key(key_in, &prefix, &key);
1482 } else {
1483 auto it = db.cf_ids_to_prefix.find(column_family_id);
1484 ceph_assert(it != db.cf_ids_to_prefix.end());
1485 prefix = it->second;
1486 key = key_in.ToString();
1487 }
1488 seen << " prefix = " << prefix;
1489 seen << " key = " << pretty_binary_string(key);
1490 if (size != -1)
1491 seen << " value size = " << std::to_string(size);
1492 seen << ")";
1493 num_seen++;
1494 }
1495 void Put(const rocksdb::Slice& key,
1496 const rocksdb::Slice& value) override {
1497 dump("Put", 0, key, &value);
1498 }
1499 rocksdb::Status PutCF(uint32_t column_family_id, const rocksdb::Slice& key,
1500 const rocksdb::Slice& value) override {
1501 dump("PutCF", column_family_id, key, &value);
1502 return rocksdb::Status::OK();
1503 }
1504 void SingleDelete(const rocksdb::Slice& key) override {
1505 dump("SingleDelete", 0, key);
1506 }
1507 rocksdb::Status SingleDeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override {
1508 dump("SingleDeleteCF", column_family_id, key);
1509 return rocksdb::Status::OK();
1510 }
1511 void Delete(const rocksdb::Slice& key) override {
1512 dump("Delete", 0, key);
1513 }
1514 rocksdb::Status DeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override {
1515 dump("DeleteCF", column_family_id, key);
1516 return rocksdb::Status::OK();
1517 }
1518 void Merge(const rocksdb::Slice& key,
1519 const rocksdb::Slice& value) override {
1520 dump("Merge", 0, key, &value);
1521 }
1522 rocksdb::Status MergeCF(uint32_t column_family_id, const rocksdb::Slice& key,
1523 const rocksdb::Slice& value) override {
1524 dump("MergeCF", column_family_id, key, &value);
1525 return rocksdb::Status::OK();
1526 }
1527 bool Continue() override { return num_seen < 50; }
1528};
1529
11fdf7f2 1530int RocksDBStore::submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t)
7c673cae 1531{
7c673cae
FG
1532 // enable rocksdb breakdown
1533 // considering performance overhead, default is disabled
f67539c2 1534 if (cct->_conf->rocksdb_perf) {
7c673cae 1535 rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
11fdf7f2 1536 rocksdb::get_perf_context()->Reset();
7c673cae
FG
1537 }
1538
1539 RocksDBTransactionImpl * _t =
1540 static_cast<RocksDBTransactionImpl *>(t.get());
7c673cae
FG
1541 woptions.disableWAL = disableWAL;
1542 lgeneric_subdout(cct, rocksdb, 30) << __func__;
f67539c2 1543 RocksWBHandler bat_txc(*this);
7c673cae 1544 _t->bat.Iterate(&bat_txc);
f67539c2 1545 *_dout << " Rocksdb transaction: " << bat_txc.seen.str() << dendl;
7c673cae
FG
1546
1547 rocksdb::Status s = db->Write(woptions, &_t->bat);
1548 if (!s.ok()) {
f67539c2 1549 RocksWBHandler rocks_txc(*this);
7c673cae
FG
1550 _t->bat.Iterate(&rocks_txc);
1551 derr << __func__ << " error: " << s.ToString() << " code = " << s.code()
f67539c2 1552 << " Rocksdb transaction: " << rocks_txc.seen.str() << dendl;
7c673cae 1553 }
7c673cae 1554
f67539c2 1555 if (cct->_conf->rocksdb_perf) {
7c673cae
FG
1556 utime_t write_memtable_time;
1557 utime_t write_delay_time;
1558 utime_t write_wal_time;
1559 utime_t write_pre_and_post_process_time;
1560 write_wal_time.set_from_double(
11fdf7f2 1561 static_cast<double>(rocksdb::get_perf_context()->write_wal_time)/1000000000);
7c673cae 1562 write_memtable_time.set_from_double(
11fdf7f2 1563 static_cast<double>(rocksdb::get_perf_context()->write_memtable_time)/1000000000);
7c673cae 1564 write_delay_time.set_from_double(
11fdf7f2 1565 static_cast<double>(rocksdb::get_perf_context()->write_delay_time)/1000000000);
7c673cae 1566 write_pre_and_post_process_time.set_from_double(
11fdf7f2 1567 static_cast<double>(rocksdb::get_perf_context()->write_pre_and_post_process_time)/1000000000);
7c673cae
FG
1568 logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time);
1569 logger->tinc(l_rocksdb_write_delay_time, write_delay_time);
1570 logger->tinc(l_rocksdb_write_wal_time, write_wal_time);
1571 logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time);
1572 }
1573
7c673cae
FG
1574 return s.ok() ? 0 : -1;
1575}
1576
11fdf7f2 1577int RocksDBStore::submit_transaction(KeyValueDB::Transaction t)
7c673cae
FG
1578{
1579 utime_t start = ceph_clock_now();
7c673cae 1580 rocksdb::WriteOptions woptions;
11fdf7f2 1581 woptions.sync = false;
7c673cae 1582
11fdf7f2 1583 int result = submit_common(woptions, t);
7c673cae 1584
11fdf7f2 1585 utime_t lat = ceph_clock_now() - start;
11fdf7f2
TL
1586 logger->tinc(l_rocksdb_submit_latency, lat);
1587
1588 return result;
1589}
7c673cae 1590
11fdf7f2
TL
1591int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t)
1592{
1593 utime_t start = ceph_clock_now();
1594 rocksdb::WriteOptions woptions;
1595 // if disableWAL, sync can't set
1596 woptions.sync = !disableWAL;
1597
1598 int result = submit_common(woptions, t);
1599
1600 utime_t lat = ceph_clock_now() - start;
7c673cae
FG
1601 logger->tinc(l_rocksdb_submit_sync_latency, lat);
1602
11fdf7f2 1603 return result;
7c673cae
FG
1604}
1605
1606RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db)
1607{
1608 db = _db;
1609}
1610
11fdf7f2
TL
1611void RocksDBStore::RocksDBTransactionImpl::put_bat(
1612 rocksdb::WriteBatch& bat,
1613 rocksdb::ColumnFamilyHandle *cf,
1614 const string &key,
7c673cae
FG
1615 const bufferlist &to_set_bl)
1616{
7c673cae
FG
1617 // bufferlist::c_str() is non-constant, so we can't call c_str()
1618 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
11fdf7f2
TL
1619 bat.Put(cf,
1620 rocksdb::Slice(key),
1621 rocksdb::Slice(to_set_bl.buffers().front().c_str(),
1622 to_set_bl.length()));
7c673cae 1623 } else {
31f18b77 1624 rocksdb::Slice key_slice(key);
9f95a23c 1625 vector<rocksdb::Slice> value_slices(to_set_bl.get_num_buffers());
11fdf7f2
TL
1626 bat.Put(cf,
1627 rocksdb::SliceParts(&key_slice, 1),
c07f9fc5 1628 prepare_sliceparts(to_set_bl, &value_slices));
7c673cae
FG
1629 }
1630}
1631
1632void RocksDBStore::RocksDBTransactionImpl::set(
1633 const string &prefix,
11fdf7f2 1634 const string &k,
7c673cae
FG
1635 const bufferlist &to_set_bl)
1636{
f67539c2 1637 auto cf = db->get_cf_handle(prefix, k);
11fdf7f2
TL
1638 if (cf) {
1639 put_bat(bat, cf, k, to_set_bl);
1640 } else {
1641 string key = combine_strings(prefix, k);
1642 put_bat(bat, db->default_cf, key, to_set_bl);
1643 }
1644}
7c673cae 1645
11fdf7f2
TL
1646void RocksDBStore::RocksDBTransactionImpl::set(
1647 const string &prefix,
1648 const char *k, size_t keylen,
1649 const bufferlist &to_set_bl)
1650{
f67539c2 1651 auto cf = db->get_cf_handle(prefix, k, keylen);
11fdf7f2
TL
1652 if (cf) {
1653 string key(k, keylen); // fixme?
1654 put_bat(bat, cf, key, to_set_bl);
7c673cae 1655 } else {
11fdf7f2
TL
1656 string key;
1657 combine_strings(prefix, k, keylen, &key);
1658 put_bat(bat, cf, key, to_set_bl);
7c673cae
FG
1659 }
1660}
1661
1662void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
1663 const string &k)
1664{
f67539c2 1665 auto cf = db->get_cf_handle(prefix, k);
11fdf7f2
TL
1666 if (cf) {
1667 bat.Delete(cf, rocksdb::Slice(k));
1668 } else {
1669 bat.Delete(db->default_cf, combine_strings(prefix, k));
1670 }
7c673cae
FG
1671}
1672
1673void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
1674 const char *k,
1675 size_t keylen)
1676{
f67539c2 1677 auto cf = db->get_cf_handle(prefix, k, keylen);
11fdf7f2
TL
1678 if (cf) {
1679 bat.Delete(cf, rocksdb::Slice(k, keylen));
1680 } else {
1681 string key;
1682 combine_strings(prefix, k, keylen, &key);
1683 bat.Delete(db->default_cf, rocksdb::Slice(key));
1684 }
7c673cae
FG
1685}
1686
1687void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix,
1688 const string &k)
1689{
f67539c2 1690 auto cf = db->get_cf_handle(prefix, k);
11fdf7f2
TL
1691 if (cf) {
1692 bat.SingleDelete(cf, k);
1693 } else {
1694 bat.SingleDelete(db->default_cf, combine_strings(prefix, k));
1695 }
7c673cae
FG
1696}
1697
1698void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
1699{
f67539c2
TL
1700 auto p_iter = db->cf_handles.find(prefix);
1701 if (p_iter == db->cf_handles.end()) {
1702 uint64_t cnt = db->delete_range_threshold;
1703 bat.SetSavePoint();
1704 auto it = db->get_iterator(prefix);
1705 for (it->seek_to_first(); it->valid() && (--cnt) != 0; it->next()) {
1706 bat.Delete(db->default_cf, combine_strings(prefix, it->key()));
1707 }
1708 if (cnt == 0) {
1709 bat.RollbackToSavePoint();
1710 string endprefix = prefix;
9f95a23c
TL
1711 endprefix.push_back('\x01');
1712 bat.DeleteRange(db->default_cf,
1713 combine_strings(prefix, string()),
1714 combine_strings(endprefix, string()));
11fdf7f2 1715 } else {
f67539c2
TL
1716 bat.PopSavePoint();
1717 }
1718 } else {
1719 ceph_assert(p_iter->second.handles.size() >= 1);
1720 for (auto cf : p_iter->second.handles) {
1721 uint64_t cnt = db->delete_range_threshold;
1722 bat.SetSavePoint();
1723 auto it = db->new_shard_iterator(cf);
1724 for (it->SeekToFirst(); it->Valid() && (--cnt) != 0; it->Next()) {
1725 bat.Delete(cf, it->key());
1726 }
1727 if (cnt == 0) {
1728 bat.RollbackToSavePoint();
1729 string endprefix = "\xff\xff\xff\xff"; // FIXME: this is cheating...
1730 bat.DeleteRange(cf, string(), endprefix);
1731 } else {
1732 bat.PopSavePoint();
1733 }
31f18b77 1734 }
7c673cae
FG
1735 }
1736}
1737
1738void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix,
1739 const string &start,
1740 const string &end)
1741{
2a845540
TL
1742 ldout(db->cct, 10) << __func__ << " enter start=" << start
1743 << " end=" << end << dendl;
f67539c2
TL
1744 auto p_iter = db->cf_handles.find(prefix);
1745 if (p_iter == db->cf_handles.end()) {
1746 uint64_t cnt = db->delete_range_threshold;
1747 bat.SetSavePoint();
1748 auto it = db->get_iterator(prefix);
1749 for (it->lower_bound(start);
1750 it->valid() && db->comparator->Compare(it->key(), end) < 0 && (--cnt) != 0;
1751 it->next()) {
1752 bat.Delete(db->default_cf, combine_strings(prefix, it->key()));
11fdf7f2 1753 }
f67539c2 1754 if (cnt == 0) {
2a845540
TL
1755 ldout(db->cct, 10) << __func__ << " p_iter == end(), resorting to DeleteRange"
1756 << dendl;
9f95a23c 1757 bat.RollbackToSavePoint();
f67539c2
TL
1758 bat.DeleteRange(db->default_cf,
1759 rocksdb::Slice(combine_strings(prefix, start)),
1760 rocksdb::Slice(combine_strings(prefix, end)));
1761 } else {
1762 bat.PopSavePoint();
1763 }
1764 } else {
1765 ceph_assert(p_iter->second.handles.size() >= 1);
1766 for (auto cf : p_iter->second.handles) {
1767 uint64_t cnt = db->delete_range_threshold;
1768 bat.SetSavePoint();
1769 rocksdb::Iterator* it = db->new_shard_iterator(cf);
1770 ceph_assert(it != nullptr);
1771 for (it->Seek(start);
1772 it->Valid() && db->comparator->Compare(it->key(), end) < 0 && (--cnt) != 0;
1773 it->Next()) {
1774 bat.Delete(cf, it->key());
1775 }
1776 if (cnt == 0) {
2a845540
TL
1777 ldout(db->cct, 10) << __func__ << " p_iter != end(), resorting to DeleteRange"
1778 << dendl;
f67539c2
TL
1779 bat.RollbackToSavePoint();
1780 bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end));
494da23a 1781 } else {
f67539c2 1782 bat.PopSavePoint();
494da23a 1783 }
f67539c2 1784 delete it;
9f95a23c 1785 }
7c673cae 1786 }
2a845540 1787 ldout(db->cct, 10) << __func__ << " end" << dendl;
7c673cae
FG
1788}
1789
1790void RocksDBStore::RocksDBTransactionImpl::merge(
1791 const string &prefix,
1792 const string &k,
1793 const bufferlist &to_set_bl)
1794{
f67539c2 1795 auto cf = db->get_cf_handle(prefix, k);
11fdf7f2
TL
1796 if (cf) {
1797 // bufferlist::c_str() is non-constant, so we can't call c_str()
1798 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
1799 bat.Merge(
1800 cf,
1801 rocksdb::Slice(k),
1802 rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length()));
1803 } else {
1804 // make a copy
1805 rocksdb::Slice key_slice(k);
9f95a23c 1806 vector<rocksdb::Slice> value_slices(to_set_bl.get_num_buffers());
11fdf7f2
TL
1807 bat.Merge(cf, rocksdb::SliceParts(&key_slice, 1),
1808 prepare_sliceparts(to_set_bl, &value_slices));
1809 }
7c673cae 1810 } else {
11fdf7f2
TL
1811 string key = combine_strings(prefix, k);
1812 // bufferlist::c_str() is non-constant, so we can't call c_str()
1813 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
1814 bat.Merge(
1815 db->default_cf,
1816 rocksdb::Slice(key),
1817 rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length()));
1818 } else {
1819 // make a copy
1820 rocksdb::Slice key_slice(key);
9f95a23c 1821 vector<rocksdb::Slice> value_slices(to_set_bl.get_num_buffers());
11fdf7f2
TL
1822 bat.Merge(
1823 db->default_cf,
1824 rocksdb::SliceParts(&key_slice, 1),
1825 prepare_sliceparts(to_set_bl, &value_slices));
1826 }
7c673cae
FG
1827 }
1828}
1829
7c673cae
FG
1830int RocksDBStore::get(
1831 const string &prefix,
1832 const std::set<string> &keys,
1833 std::map<string, bufferlist> *out)
1834{
f67539c2 1835 rocksdb::PinnableSlice value;
7c673cae 1836 utime_t start = ceph_clock_now();
f67539c2 1837 if (cf_handles.count(prefix) > 0) {
11fdf7f2 1838 for (auto& key : keys) {
f67539c2 1839 auto cf_handle = get_cf_handle(prefix, key);
11fdf7f2 1840 auto status = db->Get(rocksdb::ReadOptions(),
f67539c2 1841 cf_handle,
11fdf7f2
TL
1842 rocksdb::Slice(key),
1843 &value);
1844 if (status.ok()) {
f67539c2 1845 (*out)[key].append(value.data(), value.size());
11fdf7f2
TL
1846 } else if (status.IsIOError()) {
1847 ceph_abort_msg(status.getState());
1848 }
f67539c2 1849 value.Reset();
11fdf7f2
TL
1850 }
1851 } else {
1852 for (auto& key : keys) {
11fdf7f2
TL
1853 string k = combine_strings(prefix, key);
1854 auto status = db->Get(rocksdb::ReadOptions(),
1855 default_cf,
1856 rocksdb::Slice(k),
1857 &value);
1858 if (status.ok()) {
f67539c2 1859 (*out)[key].append(value.data(), value.size());
11fdf7f2
TL
1860 } else if (status.IsIOError()) {
1861 ceph_abort_msg(status.getState());
1862 }
f67539c2 1863 value.Reset();
224ce89b 1864 }
7c673cae
FG
1865 }
1866 utime_t lat = ceph_clock_now() - start;
7c673cae
FG
1867 logger->tinc(l_rocksdb_get_latency, lat);
1868 return 0;
1869}
1870
1871int RocksDBStore::get(
1872 const string &prefix,
1873 const string &key,
1874 bufferlist *out)
1875{
11fdf7f2 1876 ceph_assert(out && (out->length() == 0));
7c673cae
FG
1877 utime_t start = ceph_clock_now();
1878 int r = 0;
f67539c2 1879 rocksdb::PinnableSlice value;
7c673cae 1880 rocksdb::Status s;
f67539c2 1881 auto cf = get_cf_handle(prefix, key);
11fdf7f2
TL
1882 if (cf) {
1883 s = db->Get(rocksdb::ReadOptions(),
1884 cf,
1885 rocksdb::Slice(key),
1886 &value);
1887 } else {
1888 string k = combine_strings(prefix, key);
1889 s = db->Get(rocksdb::ReadOptions(),
1890 default_cf,
1891 rocksdb::Slice(k),
1892 &value);
1893 }
7c673cae 1894 if (s.ok()) {
f67539c2 1895 out->append(value.data(), value.size());
224ce89b 1896 } else if (s.IsNotFound()) {
7c673cae 1897 r = -ENOENT;
224ce89b 1898 } else {
11fdf7f2 1899 ceph_abort_msg(s.getState());
7c673cae
FG
1900 }
1901 utime_t lat = ceph_clock_now() - start;
7c673cae
FG
1902 logger->tinc(l_rocksdb_get_latency, lat);
1903 return r;
1904}
1905
1906int RocksDBStore::get(
1907 const string& prefix,
1908 const char *key,
1909 size_t keylen,
1910 bufferlist *out)
1911{
11fdf7f2 1912 ceph_assert(out && (out->length() == 0));
7c673cae
FG
1913 utime_t start = ceph_clock_now();
1914 int r = 0;
f67539c2 1915 rocksdb::PinnableSlice value;
7c673cae 1916 rocksdb::Status s;
f67539c2 1917 auto cf = get_cf_handle(prefix, key, keylen);
11fdf7f2
TL
1918 if (cf) {
1919 s = db->Get(rocksdb::ReadOptions(),
1920 cf,
1921 rocksdb::Slice(key, keylen),
1922 &value);
1923 } else {
1924 string k;
1925 combine_strings(prefix, key, keylen, &k);
1926 s = db->Get(rocksdb::ReadOptions(),
1927 default_cf,
1928 rocksdb::Slice(k),
1929 &value);
1930 }
7c673cae 1931 if (s.ok()) {
f67539c2 1932 out->append(value.data(), value.size());
224ce89b 1933 } else if (s.IsNotFound()) {
7c673cae 1934 r = -ENOENT;
224ce89b 1935 } else {
11fdf7f2 1936 ceph_abort_msg(s.getState());
7c673cae
FG
1937 }
1938 utime_t lat = ceph_clock_now() - start;
7c673cae
FG
1939 logger->tinc(l_rocksdb_get_latency, lat);
1940 return r;
1941}
1942
1943int RocksDBStore::split_key(rocksdb::Slice in, string *prefix, string *key)
1944{
1945 size_t prefix_len = 0;
f67539c2 1946
7c673cae
FG
1947 // Find separator inside Slice
1948 char* separator = (char*) memchr(in.data(), 0, in.size());
1949 if (separator == NULL)
1950 return -EINVAL;
1951 prefix_len = size_t(separator - in.data());
1952 if (prefix_len >= in.size())
1953 return -EINVAL;
1954
1955 // Fetch prefix and/or key directly from Slice
1956 if (prefix)
1957 *prefix = string(in.data(), prefix_len);
1958 if (key)
1959 *key = string(separator+1, in.size()-prefix_len-1);
1960 return 0;
1961}
1962
1963void RocksDBStore::compact()
1964{
1965 logger->inc(l_rocksdb_compact);
1966 rocksdb::CompactRangeOptions options;
11fdf7f2
TL
1967 db->CompactRange(options, default_cf, nullptr, nullptr);
1968 for (auto cf : cf_handles) {
f67539c2
TL
1969 for (auto shard_cf : cf.second.handles) {
1970 db->CompactRange(
1971 options,
1972 shard_cf,
1973 nullptr, nullptr);
1974 }
11fdf7f2 1975 }
7c673cae
FG
1976}
1977
7c673cae
FG
1978void RocksDBStore::compact_thread_entry()
1979{
9f95a23c 1980 std::unique_lock l{compact_queue_lock};
92f5a8d4 1981 dout(10) << __func__ << " enter" << dendl;
7c673cae 1982 while (!compact_queue_stop) {
92f5a8d4 1983 if (!compact_queue.empty()) {
f67539c2 1984 auto range = compact_queue.front();
7c673cae
FG
1985 compact_queue.pop_front();
1986 logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
9f95a23c 1987 l.unlock();
7c673cae 1988 logger->inc(l_rocksdb_compact_range);
11fdf7f2
TL
1989 if (range.first.empty() && range.second.empty()) {
1990 compact();
1991 } else {
1992 compact_range(range.first, range.second);
1993 }
9f95a23c 1994 l.lock();
7c673cae
FG
1995 continue;
1996 }
92f5a8d4 1997 dout(10) << __func__ << " waiting" << dendl;
9f95a23c 1998 compact_queue_cond.wait(l);
7c673cae 1999 }
9f95a23c 2000 dout(10) << __func__ << " exit" << dendl;
7c673cae
FG
2001}
2002
2003void RocksDBStore::compact_range_async(const string& start, const string& end)
2004{
11fdf7f2 2005 std::lock_guard l(compact_queue_lock);
7c673cae
FG
2006
2007 // try to merge adjacent ranges. this is O(n), but the queue should
2008 // be short. note that we do not cover all overlap cases and merge
2009 // opportunities here, but we capture the ones we currently need.
2010 list< pair<string,string> >::iterator p = compact_queue.begin();
2011 while (p != compact_queue.end()) {
2012 if (p->first == start && p->second == end) {
2013 // dup; no-op
2014 return;
2015 }
9f95a23c
TL
2016 if (start <= p->first && p->first <= end) {
2017 // new region crosses start of existing range
2018 // select right bound that is bigger
2019 compact_queue.push_back(make_pair(start, end > p->second ? end : p->second));
7c673cae
FG
2020 compact_queue.erase(p);
2021 logger->inc(l_rocksdb_compact_queue_merge);
2022 break;
2023 }
9f95a23c
TL
2024 if (start <= p->second && p->second <= end) {
2025 // new region crosses end of existing range
2026 //p->first < p->second and p->second <= end, so p->first <= end.
2027 //But we break if previous condition, so start > p->first.
7c673cae
FG
2028 compact_queue.push_back(make_pair(p->first, end));
2029 compact_queue.erase(p);
2030 logger->inc(l_rocksdb_compact_queue_merge);
2031 break;
2032 }
2033 ++p;
2034 }
2035 if (p == compact_queue.end()) {
2036 // no merge, new entry.
2037 compact_queue.push_back(make_pair(start, end));
2038 logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
2039 }
9f95a23c 2040 compact_queue_cond.notify_all();
7c673cae
FG
2041 if (!compact_thread.is_started()) {
2042 compact_thread.create("rstore_compact");
2043 }
2044}
2045bool RocksDBStore::check_omap_dir(string &omap_dir)
2046{
2047 rocksdb::Options options;
2048 options.create_if_missing = true;
2049 rocksdb::DB *db;
2050 rocksdb::Status status = rocksdb::DB::Open(options, omap_dir, &db);
2051 delete db;
2052 db = nullptr;
2053 return status.ok();
2054}
f67539c2 2055
7c673cae
FG
2056void RocksDBStore::compact_range(const string& start, const string& end)
2057{
2058 rocksdb::CompactRangeOptions options;
2059 rocksdb::Slice cstart(start);
2060 rocksdb::Slice cend(end);
f67539c2
TL
2061 string prefix_start, key_start;
2062 string prefix_end, key_end;
2063 string key_highest = "\xff\xff\xff\xff"; //cheating
2064 string key_lowest = "";
2065
2066 auto compact_range = [&] (const decltype(cf_handles)::iterator column_it,
2067 const std::string& start,
2068 const std::string& end) {
2069 rocksdb::Slice cstart(start);
2070 rocksdb::Slice cend(end);
2071 for (const auto& shard_it : column_it->second.handles) {
2072 db->CompactRange(options, shard_it, &cstart, &cend);
2073 }
2074 };
2075 db->CompactRange(options, default_cf, &cstart, &cend);
2076 split_key(cstart, &prefix_start, &key_start);
2077 split_key(cend, &prefix_end, &key_end);
2078 if (prefix_start == prefix_end) {
2079 const auto& column = cf_handles.find(prefix_start);
2080 if (column != cf_handles.end()) {
2081 compact_range(column, key_start, key_end);
2082 }
2083 } else {
2084 auto column = cf_handles.find(prefix_start);
2085 if (column != cf_handles.end()) {
2086 compact_range(column, key_start, key_highest);
2087 ++column;
2088 }
2089 const auto& column_end = cf_handles.find(prefix_end);
2090 while (column != column_end) {
2091 compact_range(column, key_lowest, key_highest);
2092 column++;
2093 }
2094 if (column != cf_handles.end()) {
2095 compact_range(column, key_lowest, key_end);
2096 }
2097 }
7c673cae 2098}
91327a77 2099
7c673cae
FG
2100RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl()
2101{
2102 delete dbiter;
2103}
2104int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first()
2105{
2106 dbiter->SeekToFirst();
11fdf7f2 2107 ceph_assert(!dbiter->status().IsIOError());
7c673cae
FG
2108 return dbiter->status().ok() ? 0 : -1;
2109}
2110int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string &prefix)
2111{
2112 rocksdb::Slice slice_prefix(prefix);
2113 dbiter->Seek(slice_prefix);
11fdf7f2 2114 ceph_assert(!dbiter->status().IsIOError());
7c673cae
FG
2115 return dbiter->status().ok() ? 0 : -1;
2116}
2117int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last()
2118{
2119 dbiter->SeekToLast();
11fdf7f2 2120 ceph_assert(!dbiter->status().IsIOError());
7c673cae
FG
2121 return dbiter->status().ok() ? 0 : -1;
2122}
2123int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string &prefix)
2124{
2125 string limit = past_prefix(prefix);
2126 rocksdb::Slice slice_limit(limit);
2127 dbiter->Seek(slice_limit);
2128
2129 if (!dbiter->Valid()) {
2130 dbiter->SeekToLast();
2131 } else {
2132 dbiter->Prev();
2133 }
2134 return dbiter->status().ok() ? 0 : -1;
2135}
2136int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after)
2137{
2138 lower_bound(prefix, after);
2139 if (valid()) {
2140 pair<string,string> key = raw_key();
2141 if (key.first == prefix && key.second == after)
2142 next();
2143 }
2144 return dbiter->status().ok() ? 0 : -1;
2145}
2146int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to)
2147{
2148 string bound = combine_strings(prefix, to);
2149 rocksdb::Slice slice_bound(bound);
2150 dbiter->Seek(slice_bound);
2151 return dbiter->status().ok() ? 0 : -1;
2152}
2153bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid()
2154{
2155 return dbiter->Valid();
2156}
2157int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next()
2158{
2159 if (valid()) {
2160 dbiter->Next();
2161 }
11fdf7f2 2162 ceph_assert(!dbiter->status().IsIOError());
7c673cae
FG
2163 return dbiter->status().ok() ? 0 : -1;
2164}
2165int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev()
2166{
2167 if (valid()) {
2168 dbiter->Prev();
2169 }
11fdf7f2 2170 ceph_assert(!dbiter->status().IsIOError());
7c673cae
FG
2171 return dbiter->status().ok() ? 0 : -1;
2172}
2173string RocksDBStore::RocksDBWholeSpaceIteratorImpl::key()
2174{
2175 string out_key;
2176 split_key(dbiter->key(), 0, &out_key);
2177 return out_key;
2178}
2179pair<string,string> RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key()
2180{
2181 string prefix, key;
2182 split_key(dbiter->key(), &prefix, &key);
2183 return make_pair(prefix, key);
2184}
2185
2186bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key_is_prefixed(const string &prefix) {
2187 // Look for "prefix\0" right in rocksb::Slice
2188 rocksdb::Slice key = dbiter->key();
2189 if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) {
2190 return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0;
2191 } else {
2192 return false;
2193 }
2194}
2195
2196bufferlist RocksDBStore::RocksDBWholeSpaceIteratorImpl::value()
2197{
2198 return to_bufferlist(dbiter->value());
2199}
2200
2201size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::key_size()
2202{
2203 return dbiter->key().size();
2204}
2205
2206size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_size()
2207{
2208 return dbiter->value().size();
2209}
2210
2211bufferptr RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_as_ptr()
2212{
2213 rocksdb::Slice val = dbiter->value();
2214 return bufferptr(val.data(), val.size());
2215}
2216
2217int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status()
2218{
2219 return dbiter->status().ok() ? 0 : -1;
2220}
2221
2222string RocksDBStore::past_prefix(const string &prefix)
2223{
2224 string limit = prefix;
2225 limit.push_back(1);
2226 return limit;
2227}
2228
11fdf7f2
TL
2229class CFIteratorImpl : public KeyValueDB::IteratorImpl {
2230protected:
2231 string prefix;
2232 rocksdb::Iterator *dbiter;
33c7a0ef
TL
2233 const KeyValueDB::IteratorBounds bounds;
2234 const rocksdb::Slice iterate_lower_bound;
2235 const rocksdb::Slice iterate_upper_bound;
11fdf7f2 2236public:
33c7a0ef
TL
2237 explicit CFIteratorImpl(const RocksDBStore* db,
2238 const std::string& p,
2239 rocksdb::ColumnFamilyHandle* cf,
2240 KeyValueDB::IteratorBounds bounds_)
2241 : prefix(p), bounds(std::move(bounds_)),
2242 iterate_lower_bound(make_slice(bounds.lower_bound)),
2243 iterate_upper_bound(make_slice(bounds.upper_bound))
2244 {
2245 auto options = rocksdb::ReadOptions();
2246 if (db->cct->_conf->osd_rocksdb_iterator_bounds_enabled) {
2247 if (bounds.lower_bound) {
2248 options.iterate_lower_bound = &iterate_lower_bound;
2249 }
2250 if (bounds.upper_bound) {
2251 options.iterate_upper_bound = &iterate_upper_bound;
2252 }
2253 }
2254 dbiter = db->db->NewIterator(options, cf);
2255 }
11fdf7f2
TL
2256 ~CFIteratorImpl() {
2257 delete dbiter;
2258 }
2259
2260 int seek_to_first() override {
2261 dbiter->SeekToFirst();
2262 return dbiter->status().ok() ? 0 : -1;
2263 }
2264 int seek_to_last() override {
2265 dbiter->SeekToLast();
2266 return dbiter->status().ok() ? 0 : -1;
2267 }
2268 int upper_bound(const string &after) override {
2269 lower_bound(after);
2270 if (valid() && (key() == after)) {
2271 next();
2272 }
2273 return dbiter->status().ok() ? 0 : -1;
2274 }
2275 int lower_bound(const string &to) override {
2276 rocksdb::Slice slice_bound(to);
2277 dbiter->Seek(slice_bound);
2278 return dbiter->status().ok() ? 0 : -1;
2279 }
2280 int next() override {
2281 if (valid()) {
2282 dbiter->Next();
2283 }
2284 return dbiter->status().ok() ? 0 : -1;
2285 }
2286 int prev() override {
2287 if (valid()) {
2288 dbiter->Prev();
2289 }
2290 return dbiter->status().ok() ? 0 : -1;
2291 }
2292 bool valid() override {
2293 return dbiter->Valid();
2294 }
2295 string key() override {
2296 return dbiter->key().ToString();
2297 }
2298 std::pair<std::string, std::string> raw_key() override {
2299 return make_pair(prefix, key());
2300 }
2301 bufferlist value() override {
2302 return to_bufferlist(dbiter->value());
2303 }
2304 bufferptr value_as_ptr() override {
2305 rocksdb::Slice val = dbiter->value();
2306 return bufferptr(val.data(), val.size());
2307 }
2308 int status() override {
2309 return dbiter->status().ok() ? 0 : -1;
2310 }
2311};
2312
f67539c2
TL
2313
2314//merge column iterators and rest iterator
2315class WholeMergeIteratorImpl : public KeyValueDB::WholeSpaceIteratorImpl {
2316private:
2317 RocksDBStore* db;
2318 KeyValueDB::WholeSpaceIterator main;
2319 std::map<std::string, KeyValueDB::Iterator> shards;
2320 std::map<std::string, KeyValueDB::Iterator>::iterator current_shard;
2321 enum {on_main, on_shard} smaller;
2322
2323public:
2324 WholeMergeIteratorImpl(RocksDBStore* db)
2325 : db(db)
2326 , main(db->get_default_cf_iterator())
2327 {
2328 for (auto& e : db->cf_handles) {
2329 shards.emplace(e.first, db->get_iterator(e.first));
2330 }
2331 }
2332
2333 // returns true if value in main is smaller then in shards
2334 // invalid is larger then actual value
2335 bool is_main_smaller() {
2336 if (main->valid()) {
2337 if (current_shard != shards.end()) {
2338 auto main_rk = main->raw_key();
2339 ceph_assert(current_shard->second->valid());
2340 auto shards_rk = current_shard->second->raw_key();
2341 if (main_rk.first < shards_rk.first)
2342 return true;
2343 if (main_rk.first > shards_rk.first)
2344 return false;
2345 return main_rk.second < shards_rk.second;
2346 } else {
2347 return true;
2348 }
2349 } else {
2350 if (current_shard != shards.end()) {
2351 return false;
2352 } else {
2353 //this means that neither is valid
2354 //we select main to be smaller, so valid() will signal properly
2355 return true;
2356 }
2357 }
2358 }
2359
2360 int seek_to_first() override {
2361 int r0 = main->seek_to_first();
2362 int r1 = 0;
2363 // find first shard that has some data
2364 current_shard = shards.begin();
2365 while (current_shard != shards.end()) {
2366 r1 = current_shard->second->seek_to_first();
2367 if (r1 != 0 || current_shard->second->valid()) {
2368 //this is the first shard that will yield some keys
2369 break;
2370 }
2371 ++current_shard;
2372 }
2373 smaller = is_main_smaller() ? on_main : on_shard;
2374 return r0 == 0 && r1 == 0 ? 0 : -1;
2375 }
2376
2377 int seek_to_first(const std::string &prefix) override {
2378 int r0 = main->seek_to_first(prefix);
2379 int r1 = 0;
2380 // find first shard that has some data
2381 current_shard = shards.lower_bound(prefix);
2382 while (current_shard != shards.end()) {
2383 r1 = current_shard->second->seek_to_first();
2384 if (r1 != 0 || current_shard->second->valid()) {
2385 //this is the first shard that will yield some keys
2386 break;
2387 }
2388 ++current_shard;
2389 }
2390 smaller = is_main_smaller() ? on_main : on_shard;
2391 return r0 == 0 && r1 == 0 ? 0 : -1;
2392 };
2393
2394 int seek_to_last() override {
2395 int r0 = main->seek_to_last();
2396 int r1 = 0;
2397 r1 = shards_seek_to_last();
2398 //if we have 2 candidates, we need to select
2399 if (main->valid()) {
2400 if (shards_valid()) {
2401 if (is_main_smaller()) {
2402 smaller = on_shard;
2403 main->next();
2404 } else {
2405 smaller = on_main;
2406 shards_next();
2407 }
2408 } else {
2409 smaller = on_main;
2410 }
2411 } else {
2412 if (shards_valid()) {
2413 smaller = on_shard;
2414 } else {
2415 smaller = on_main;
2416 }
2417 }
2418 return r0 == 0 && r1 == 0 ? 0 : -1;
2419 }
2420
2421 int seek_to_last(const std::string &prefix) override {
2422 int r0 = main->seek_to_last(prefix);
2423 int r1 = 0;
2424 // find last shard that has some data
2425 bool found = false;
2426 current_shard = shards.lower_bound(prefix);
2427 while (current_shard != shards.begin()) {
2428 r1 = current_shard->second->seek_to_last();
2429 if (r1 != 0)
2430 break;
2431 if (current_shard->second->valid()) {
2432 found = true;
2433 break;
2434 }
2435 }
2436 //if we have 2 candidates, we need to select
2437 if (main->valid() && found) {
2438 if (is_main_smaller()) {
2439 main->next();
2440 } else {
2441 shards_next();
2442 }
2443 }
2444 if (!found) {
2445 //set shards state that properly represents eof
2446 current_shard = shards.end();
2447 }
2448 smaller = is_main_smaller() ? on_main : on_shard;
2449 return r0 == 0 && r1 == 0 ? 0 : -1;
2450 }
2451
2452 int upper_bound(const std::string &prefix, const std::string &after) override {
2453 int r0 = main->upper_bound(prefix, after);
2454 int r1 = 0;
2455 if (r0 != 0)
2456 return r0;
2457 current_shard = shards.lower_bound(prefix);
2458 if (current_shard != shards.end()) {
2459 bool located = false;
2460 if (current_shard->first == prefix) {
2461 r1 = current_shard->second->upper_bound(after);
2462 if (r1 != 0)
2463 return r1;
2464 if (current_shard->second->valid()) {
2465 located = true;
2466 }
2467 }
2468 if (!located) {
2469 while (current_shard != shards.end()) {
2470 r1 = current_shard->second->seek_to_first();
2471 if (r1 != 0)
2472 return r1;
2473 if (current_shard->second->valid())
2474 break;
2475 ++current_shard;
2476 }
2477 }
2478 }
2479 smaller = is_main_smaller() ? on_main : on_shard;
2480 return 0;
2481 }
2482
2483 int lower_bound(const std::string &prefix, const std::string &to) override {
2484 int r0 = main->lower_bound(prefix, to);
2485 int r1 = 0;
2486 if (r0 != 0)
2487 return r0;
2488 current_shard = shards.lower_bound(prefix);
2489 if (current_shard != shards.end()) {
2490 bool located = false;
2491 if (current_shard->first == prefix) {
2492 r1 = current_shard->second->lower_bound(to);
2493 if (r1 != 0)
2494 return r1;
2495 if (current_shard->second->valid()) {
2496 located = true;
2497 }
2498 }
2499 if (!located) {
2500 while (current_shard != shards.end()) {
2501 r1 = current_shard->second->seek_to_first();
2502 if (r1 != 0)
2503 return r1;
2504 if (current_shard->second->valid())
2505 break;
2506 ++current_shard;
2507 }
2508 }
2509 }
2510 smaller = is_main_smaller() ? on_main : on_shard;
2511 return 0;
2512 }
2513
2514 bool valid() override {
2515 if (smaller == on_main) {
2516 return main->valid();
2517 } else {
2518 if (current_shard == shards.end())
2519 return false;
2520 return current_shard->second->valid();
2521 }
2522 };
2523
2524 int next() override {
2525 int r;
2526 if (smaller == on_main) {
2527 r = main->next();
2528 } else {
2529 r = shards_next();
2530 }
2531 if (r != 0)
2532 return r;
2533 smaller = is_main_smaller() ? on_main : on_shard;
2534 return 0;
2535 }
2536
2537 int prev() override {
2538 int r;
2539 bool main_was_valid = false;
2540 if (main->valid()) {
2541 main_was_valid = true;
2542 r = main->prev();
2543 } else {
2544 r = main->seek_to_last();
2545 }
2546 if (r != 0)
2547 return r;
2548
2549 bool shards_was_valid = false;
2550 if (shards_valid()) {
2551 shards_was_valid = true;
2552 r = shards_prev();
2553 } else {
2554 r = shards_seek_to_last();
2555 }
2556 if (r != 0)
2557 return r;
2558
2559 if (!main->valid() && !shards_valid()) {
2560 //end, no previous. set marker so valid() can work
2561 smaller = on_main;
2562 return 0;
2563 }
2564
2565 //if 1 is valid, select it
2566 //if 2 are valid select larger and advance the other
2567 if (main->valid()) {
2568 if (shards_valid()) {
2569 if (is_main_smaller()) {
2570 smaller = on_shard;
2571 if (main_was_valid) {
2572 if (main->valid()) {
2573 r = main->next();
2574 } else {
2575 r = main->seek_to_first();
2576 }
2577 } else {
2578 //if we have resurrected main, kill it
2579 if (main->valid()) {
2580 main->next();
2581 }
2582 }
2583 } else {
2584 smaller = on_main;
2585 if (shards_was_valid) {
2586 if (shards_valid()) {
2587 r = shards_next();
2588 } else {
2589 r = shards_seek_to_first();
2590 }
2591 } else {
2592 //if we have resurected shards, kill it
2593 if (shards_valid()) {
2594 shards_next();
2595 }
2596 }
2597 }
2598 } else {
2599 smaller = on_main;
2600 r = shards_seek_to_first();
2601 }
2602 } else {
2603 smaller = on_shard;
2604 r = main->seek_to_first();
2605 }
2606 return r;
2607 }
2608
2609 std::string key() override
2610 {
2611 if (smaller == on_main) {
2612 return main->key();
2613 } else {
2614 return current_shard->second->key();
2615 }
2616 }
2617
2618 std::pair<std::string,std::string> raw_key() override
2619 {
2620 if (smaller == on_main) {
2621 return main->raw_key();
2622 } else {
2623 return { current_shard->first, current_shard->second->key() };
2624 }
2625 }
2626
2627 bool raw_key_is_prefixed(const std::string &prefix) override
2628 {
2629 if (smaller == on_main) {
2630 return main->raw_key_is_prefixed(prefix);
2631 } else {
2632 return current_shard->first == prefix;
2633 }
2634 }
2635
2636 ceph::buffer::list value() override
2637 {
2638 if (smaller == on_main) {
2639 return main->value();
2640 } else {
2641 return current_shard->second->value();
2642 }
2643 }
2644
2645 int status() override
2646 {
2647 //because we already had to inspect key, it must be ok
2648 return 0;
2649 }
2650
2651 size_t key_size() override
2652 {
2653 if (smaller == on_main) {
2654 return main->key_size();
2655 } else {
2656 return current_shard->second->key().size();
2657 }
2658 }
2659 size_t value_size() override
2660 {
2661 if (smaller == on_main) {
2662 return main->value_size();
2663 } else {
2664 return current_shard->second->value().length();
2665 }
2666 }
2667
2668 int shards_valid() {
2669 if (current_shard == shards.end())
2670 return false;
2671 return current_shard->second->valid();
2672 }
2673
2674 int shards_next() {
2675 if (current_shard == shards.end()) {
2676 //illegal to next() on !valid()
2677 return -1;
2678 }
2679 int r = 0;
2680 r = current_shard->second->next();
2681 if (r != 0)
2682 return r;
2683 if (current_shard->second->valid())
2684 return 0;
2685 //current shard exhaused, search for key
2686 ++current_shard;
2687 while (current_shard != shards.end()) {
2688 r = current_shard->second->seek_to_first();
2689 if (r != 0)
2690 return r;
2691 if (current_shard->second->valid())
2692 break;
2693 ++current_shard;
2694 }
2695 //either we found key or not, but it is success
2696 return 0;
2697 }
2698
2699 int shards_prev() {
2700 if (current_shard == shards.end()) {
2701 //illegal to prev() on !valid()
2702 return -1;
2703 }
2704 int r = current_shard->second->prev();
2705 while (r == 0) {
2706 if (current_shard->second->valid()) {
2707 break;
2708 }
2709 if (current_shard == shards.begin()) {
2710 //we have reached pre-first element
2711 //this makes it !valid(), but guarantees next() moves to first element
2712 break;
2713 }
2714 --current_shard;
2715 r = current_shard->second->seek_to_last();
2716 }
2717 return r;
2718 }
2719
2720 int shards_seek_to_last() {
2721 int r = 0;
2722 current_shard = shards.end();
2723 if (current_shard == shards.begin()) {
2724 //no shards at all
2725 return 0;
2726 }
2727 while (current_shard != shards.begin()) {
2728 --current_shard;
2729 r = current_shard->second->seek_to_last();
2730 if (r != 0)
2731 return r;
2732 if (current_shard->second->valid()) {
2733 return 0;
2734 }
2735 }
2736 //no keys at all
2737 current_shard = shards.end();
2738 return r;
2739 }
2740
2741 int shards_seek_to_first() {
2742 int r = 0;
2743 current_shard = shards.begin();
2744 while (current_shard != shards.end()) {
2745 r = current_shard->second->seek_to_first();
2746 if (r != 0)
2747 break;
2748 if (current_shard->second->valid()) {
2749 //this is the first shard that will yield some keys
2750 break;
2751 }
2752 ++current_shard;
2753 }
2754 return r;
2755 }
2756};
2757
2758class ShardMergeIteratorImpl : public KeyValueDB::IteratorImpl {
2759private:
2760 struct KeyLess {
2761 private:
2762 const rocksdb::Comparator* comparator;
2763 public:
2764 KeyLess(const rocksdb::Comparator* comparator) : comparator(comparator) { };
2765
2766 bool operator()(rocksdb::Iterator* a, rocksdb::Iterator* b) const
2767 {
2768 if (a->Valid()) {
2769 if (b->Valid()) {
2770 return comparator->Compare(a->key(), b->key()) < 0;
2771 } else {
2772 return true;
2773 }
2774 } else {
2775 if (b->Valid()) {
2776 return false;
2777 } else {
2778 return false;
2779 }
2780 }
2781 }
2782 };
2783
2784 const RocksDBStore* db;
2785 KeyLess keyless;
2786 string prefix;
33c7a0ef
TL
2787 const KeyValueDB::IteratorBounds bounds;
2788 const rocksdb::Slice iterate_lower_bound;
2789 const rocksdb::Slice iterate_upper_bound;
f67539c2
TL
2790 std::vector<rocksdb::Iterator*> iters;
2791public:
2792 explicit ShardMergeIteratorImpl(const RocksDBStore* db,
2793 const std::string& prefix,
33c7a0ef
TL
2794 const std::vector<rocksdb::ColumnFamilyHandle*>& shards,
2795 KeyValueDB::IteratorBounds bounds_)
2796 : db(db), keyless(db->comparator), prefix(prefix), bounds(std::move(bounds_)),
2797 iterate_lower_bound(make_slice(bounds.lower_bound)),
2798 iterate_upper_bound(make_slice(bounds.upper_bound))
f67539c2
TL
2799 {
2800 iters.reserve(shards.size());
33c7a0ef
TL
2801 auto options = rocksdb::ReadOptions();
2802 if (db->cct->_conf->osd_rocksdb_iterator_bounds_enabled) {
2803 if (bounds.lower_bound) {
2804 options.iterate_lower_bound = &iterate_lower_bound;
2805 }
2806 if (bounds.upper_bound) {
2807 options.iterate_upper_bound = &iterate_upper_bound;
2808 }
2809 }
f67539c2 2810 for (auto& s : shards) {
33c7a0ef 2811 iters.push_back(db->db->NewIterator(options, s));
f67539c2
TL
2812 }
2813 }
2814 ~ShardMergeIteratorImpl() {
2815 for (auto& it : iters) {
2816 delete it;
2817 }
2818 }
2819 int seek_to_first() override {
2820 for (auto& it : iters) {
2821 it->SeekToFirst();
2822 if (!it->status().ok()) {
2823 return -1;
2824 }
2825 }
2826 //all iterators seeked, sort
2827 std::sort(iters.begin(), iters.end(), keyless);
2828 return 0;
2829 }
2830 int seek_to_last() override {
2831 for (auto& it : iters) {
2832 it->SeekToLast();
2833 if (!it->status().ok()) {
2834 return -1;
2835 }
2836 }
2837 for (size_t i = 1; i < iters.size(); i++) {
2838 if (iters[0]->Valid()) {
2839 if (iters[i]->Valid()) {
2840 if (keyless(iters[0], iters[i])) {
20effc67 2841 std::swap(iters[0], iters[i]);
f67539c2
TL
2842 }
2843 } else {
2844 //iters[i] empty
2845 }
2846 } else {
2847 if (iters[i]->Valid()) {
20effc67 2848 std::swap(iters[0], iters[i]);
f67539c2
TL
2849 }
2850 }
2851 //it might happen that cf was empty
2852 if (iters[i]->Valid()) {
2853 iters[i]->Next();
2854 }
2855 }
2856 //no need to sort, as at most 1 iterator is valid now
2857 return 0;
2858 }
2859 int upper_bound(const string &after) override {
2860 rocksdb::Slice slice_bound(after);
2861 for (auto& it : iters) {
2862 it->Seek(slice_bound);
2863 if (it->Valid() && it->key() == after) {
2864 it->Next();
2865 }
2866 if (!it->status().ok()) {
2867 return -1;
2868 }
2869 }
2870 std::sort(iters.begin(), iters.end(), keyless);
2871 return 0;
2872 }
2873 int lower_bound(const string &to) override {
2874 rocksdb::Slice slice_bound(to);
2875 for (auto& it : iters) {
2876 it->Seek(slice_bound);
2877 if (!it->status().ok()) {
2878 return -1;
2879 }
2880 }
2881 std::sort(iters.begin(), iters.end(), keyless);
2882 return 0;
2883 }
2884 int next() override {
2885 int r = -1;
2886 if (iters[0]->Valid()) {
2887 iters[0]->Next();
2888 if (iters[0]->status().ok()) {
2889 r = 0;
2890 //bubble up
2891 for (size_t i = 0; i < iters.size() - 1; i++) {
2892 if (keyless(iters[i], iters[i + 1])) {
2893 //matches, fixed
2894 break;
2895 }
2896 std::swap(iters[i], iters[i + 1]);
2897 }
2898 }
2899 }
2900 return r;
2901 }
2902 // iters are sorted, so
2903 // a[0] < b[0] < c[0] < d[0]
2904 // a[0] > a[-1], a[0] > b[-1], a[0] > c[-1], a[0] > d[-1]
2905 // so, prev() will be one of:
2906 // a[-1], b[-1], c[-1], d[-1]
2907 // prev() will be the one that is *largest* of them
2908 //
2909 // alg:
2910 // 1. go prev() on each iterator we can
2911 // 2. select largest key from those iterators
2912 // 3. go next() on all iterators except (2)
2913 // 4. sort
2914 int prev() override {
2915 std::vector<rocksdb::Iterator*> prev_done;
2916 //1
2917 for (auto it: iters) {
2918 if (it->Valid()) {
2919 it->Prev();
2920 if (it->Valid()) {
2921 prev_done.push_back(it);
2922 } else {
2923 it->SeekToFirst();
2924 }
2925 } else {
2926 it->SeekToLast();
2927 if (it->Valid()) {
2928 prev_done.push_back(it);
2929 }
2930 }
2931 }
2932 if (prev_done.size() == 0) {
2933 /* there is no previous element */
2934 if (iters[0]->Valid()) {
2935 iters[0]->Prev();
2936 ceph_assert(!iters[0]->Valid());
2937 }
2938 return 0;
2939 }
2940 //2,3
2941 rocksdb::Iterator* highest = prev_done[0];
2942 for (size_t i = 1; i < prev_done.size(); i++) {
2943 if (keyless(highest, prev_done[i])) {
2944 highest->Next();
2945 highest = prev_done[i];
2946 } else {
2947 prev_done[i]->Next();
2948 }
2949 }
2950 //4
2951 //insert highest in the beginning, and shift values until we pick highest
2952 //untouched rest is sorted - we just prev()/next() them
2953 rocksdb::Iterator* hold = highest;
2954 for (size_t i = 0; i < iters.size(); i++) {
2955 std::swap(hold, iters[i]);
2956 if (hold == highest) break;
2957 }
2958 ceph_assert(hold == highest);
2959 return 0;
2960 }
2961 bool valid() override {
2962 return iters[0]->Valid();
2963 }
2964 string key() override {
2965 return iters[0]->key().ToString();
2966 }
2967 std::pair<std::string, std::string> raw_key() override {
2968 return make_pair(prefix, key());
2969 }
2970 bufferlist value() override {
2971 return to_bufferlist(iters[0]->value());
2972 }
2973 bufferptr value_as_ptr() override {
2974 rocksdb::Slice val = iters[0]->value();
2975 return bufferptr(val.data(), val.size());
2976 }
2977 int status() override {
2978 return iters[0]->status().ok() ? 0 : -1;
2979 }
2980};
2981
33c7a0ef 2982KeyValueDB::Iterator RocksDBStore::get_iterator(const std::string& prefix, IteratorOpts opts, IteratorBounds bounds)
f67539c2
TL
2983{
2984 auto cf_it = cf_handles.find(prefix);
2985 if (cf_it != cf_handles.end()) {
33c7a0ef 2986 rocksdb::ColumnFamilyHandle* cf = nullptr;
f67539c2 2987 if (cf_it->second.handles.size() == 1) {
33c7a0ef
TL
2988 cf = cf_it->second.handles[0];
2989 } else if (cct->_conf->osd_rocksdb_iterator_bounds_enabled) {
2990 cf = get_cf_handle(prefix, bounds);
2991 }
2992 if (cf) {
f67539c2 2993 return std::make_shared<CFIteratorImpl>(
33c7a0ef
TL
2994 this,
2995 prefix,
2996 cf,
2997 std::move(bounds));
f67539c2
TL
2998 } else {
2999 return std::make_shared<ShardMergeIteratorImpl>(
3000 this,
3001 prefix,
33c7a0ef
TL
3002 cf_it->second.handles,
3003 std::move(bounds));
f67539c2
TL
3004 }
3005 } else {
3006 return KeyValueDB::get_iterator(prefix, opts);
3007 }
3008}
3009
3010rocksdb::Iterator* RocksDBStore::new_shard_iterator(rocksdb::ColumnFamilyHandle* cf)
3011{
3012 return db->NewIterator(rocksdb::ReadOptions(), cf);
3013}
3014
3015RocksDBStore::WholeSpaceIterator RocksDBStore::get_wholespace_iterator(IteratorOpts opts)
11fdf7f2 3016{
f67539c2 3017 if (cf_handles.size() == 0) {
f67539c2 3018 return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
33c7a0ef 3019 this, default_cf, opts);
11fdf7f2 3020 } else {
f67539c2
TL
3021 return std::make_shared<WholeMergeIteratorImpl>(this);
3022 }
3023}
3024
3025RocksDBStore::WholeSpaceIterator RocksDBStore::get_default_cf_iterator()
3026{
33c7a0ef 3027 return std::make_shared<RocksDBWholeSpaceIteratorImpl>(this, default_cf, 0);
f67539c2
TL
3028}
3029
3030int RocksDBStore::prepare_for_reshard(const std::string& new_sharding,
3031 RocksDBStore::columns_t& to_process_columns)
3032{
3033 //0. lock db from opening
3034 //1. list existing columns
3035 //2. apply merge operator to (main + columns) opts
3036 //3. prepare std::vector<rocksdb::ColumnFamilyDescriptor> existing_cfs
3037 //4. open db, acquire existing column handles
3038 //5. calculate missing columns
3039 //6. create missing columns
3040 //7. construct cf_handles according to new sharding
3041 //8. check is all cf_handles are filled
3042
3043 bool b;
3044 std::vector<ColumnFamily> new_sharding_def;
3045 char const* error_position;
3046 std::string error_msg;
3047 b = parse_sharding_def(new_sharding, new_sharding_def, &error_position, &error_msg);
3048 if (!b) {
3049 dout(1) << __func__ << " bad sharding: " << dendl;
3050 dout(1) << __func__ << new_sharding << dendl;
3051 dout(1) << __func__ << std::string(error_position - &new_sharding[0], ' ') << "^" << error_msg << dendl;
3052 return -EINVAL;
3053 }
3054
3055 //0. lock db from opening
3056 std::string stored_sharding_text;
3057 rocksdb::ReadFileToString(env,
3058 sharding_def_file,
3059 &stored_sharding_text);
3060 if (stored_sharding_text.find(resharding_column_lock) == string::npos) {
3061 rocksdb::Status status;
3062 if (stored_sharding_text.size() != 0)
3063 stored_sharding_text += " ";
3064 stored_sharding_text += resharding_column_lock;
3065 env->CreateDir(sharding_def_dir);
3066 status = rocksdb::WriteStringToFile(env, stored_sharding_text,
3067 sharding_def_file, true);
3068 if (!status.ok()) {
3069 derr << __func__ << " cannot write to " << sharding_def_file << dendl;
3070 return -EIO;
3071 }
3072 }
3073
3074 //1. list existing columns
3075
3076 rocksdb::Status status;
3077 std::vector<std::string> existing_columns;
3078 rocksdb::Options opt;
3079 int r = load_rocksdb_options(false, opt);
3080 if (r) {
3081 dout(1) << __func__ << " load rocksdb options failed" << dendl;
3082 return r;
11fdf7f2 3083 }
f67539c2
TL
3084 status = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(opt), path, &existing_columns);
3085 if (!status.ok()) {
3086 derr << "Unable to list column families: " << status.ToString() << dendl;
3087 return -EINVAL;
3088 }
3089 dout(5) << "existing columns = " << existing_columns << dendl;
3090
3091 //2. apply merge operator to (main + columns) opts
3092 //3. prepare std::vector<rocksdb::ColumnFamilyDescriptor> cfs_to_open
3093
3094 std::vector<rocksdb::ColumnFamilyDescriptor> cfs_to_open;
3095 for (const auto& full_name : existing_columns) {
3096 //split col_name to <prefix>-<number>
3097 std::string base_name;
3098 size_t pos = full_name.find('-');
3099 if (std::string::npos == pos)
3100 base_name = full_name;
3101 else
3102 base_name = full_name.substr(0,pos);
3103
3104 rocksdb::ColumnFamilyOptions cf_opt(opt);
3105 // search if we have options for this column
3106 std::string options;
3107 for (const auto& nsd : new_sharding_def) {
3108 if (nsd.name == base_name) {
3109 options = nsd.options;
3110 break;
3111 }
3112 }
522d829b
TL
3113 int r = update_column_family_options(base_name, options, &cf_opt);
3114 if (r != 0) {
3115 return r;
f67539c2 3116 }
f67539c2
TL
3117 cfs_to_open.emplace_back(full_name, cf_opt);
3118 }
3119
3120 //4. open db, acquire existing column handles
3121 std::vector<rocksdb::ColumnFamilyHandle*> handles;
3122 status = rocksdb::DB::Open(rocksdb::DBOptions(opt),
3123 path, cfs_to_open, &handles, &db);
3124 if (!status.ok()) {
3125 derr << status.ToString() << dendl;
3126 return -EINVAL;
3127 }
3128 for (size_t i = 0; i < cfs_to_open.size(); i++) {
3129 dout(10) << "column " << cfs_to_open[i].name << " handle " << (void*)handles[i] << dendl;
3130 }
3131
3132 //5. calculate missing columns
3133 std::vector<std::string> new_sharding_columns;
3134 std::vector<std::string> missing_columns;
3135 sharding_def_to_columns(new_sharding_def,
3136 new_sharding_columns);
3137 dout(5) << "target columns = " << new_sharding_columns << dendl;
3138 for (const auto& n : new_sharding_columns) {
3139 bool found = false;
3140 for (const auto& e : existing_columns) {
3141 if (n == e) {
3142 found = true;
3143 break;
3144 }
3145 }
3146 if (!found) {
3147 missing_columns.push_back(n);
3148 }
3149 }
3150 dout(5) << "missing columns = " << missing_columns << dendl;
3151
3152 //6. create missing columns
3153 for (const auto& full_name : missing_columns) {
3154 std::string base_name;
3155 size_t pos = full_name.find('-');
3156 if (std::string::npos == pos)
3157 base_name = full_name;
3158 else
3159 base_name = full_name.substr(0,pos);
3160
3161 rocksdb::ColumnFamilyOptions cf_opt(opt);
3162 // search if we have options for this column
3163 std::string options;
3164 for (const auto& nsd : new_sharding_def) {
3165 if (nsd.name == base_name) {
3166 options = nsd.options;
3167 break;
3168 }
3169 }
522d829b
TL
3170 int r = update_column_family_options(base_name, options, &cf_opt);
3171 if (r != 0) {
3172 return r;
f67539c2 3173 }
f67539c2
TL
3174 rocksdb::ColumnFamilyHandle *cf;
3175 status = db->CreateColumnFamily(cf_opt, full_name, &cf);
3176 if (!status.ok()) {
3177 derr << __func__ << " Failed to create rocksdb column family: "
3178 << full_name << dendl;
3179 return -EINVAL;
3180 }
3181 dout(10) << "created column " << full_name << " handle = " << (void*)cf << dendl;
3182 existing_columns.push_back(full_name);
3183 handles.push_back(cf);
3184 }
3185
3186 //7. construct cf_handles according to new sharding
3187 for (size_t i = 0; i < existing_columns.size(); i++) {
3188 std::string full_name = existing_columns[i];
3189 rocksdb::ColumnFamilyHandle *cf = handles[i];
3190 std::string base_name;
3191 size_t shard_idx = 0;
3192 size_t pos = full_name.find('-');
3193 dout(10) << "processing column " << full_name << dendl;
3194 if (std::string::npos == pos) {
3195 base_name = full_name;
3196 } else {
3197 base_name = full_name.substr(0,pos);
3198 shard_idx = atoi(full_name.substr(pos+1).c_str());
3199 }
3200 if (rocksdb::kDefaultColumnFamilyName == base_name) {
3201 default_cf = handles[i];
3202 must_close_default_cf = true;
3203 std::unique_ptr<rocksdb::ColumnFamilyHandle, cf_deleter_t> ptr{
3204 cf, [](rocksdb::ColumnFamilyHandle*) {}};
3205 to_process_columns.emplace(full_name, std::move(ptr));
3206 } else {
3207 for (const auto& nsd : new_sharding_def) {
3208 if (nsd.name == base_name) {
3209 if (shard_idx < nsd.shard_cnt) {
3210 add_column_family(base_name, nsd.hash_l, nsd.hash_h, shard_idx, cf);
3211 } else {
3212 //ignore columns with index larger then shard count
3213 }
3214 break;
3215 }
3216 }
3217 std::unique_ptr<rocksdb::ColumnFamilyHandle, cf_deleter_t> ptr{
3218 cf, [this](rocksdb::ColumnFamilyHandle* handle) {
3219 db->DestroyColumnFamilyHandle(handle);
3220 }};
3221 to_process_columns.emplace(full_name, std::move(ptr));
3222 }
3223 }
3224
3225 //8. check if all cf_handles are filled
3226 for (const auto& col : cf_handles) {
3227 for (size_t i = 0; i < col.second.handles.size(); i++) {
3228 if (col.second.handles[i] == nullptr) {
3229 derr << "missing handle for column " << col.first << " shard " << i << dendl;
3230 return -EIO;
3231 }
3232 }
3233 }
3234 return 0;
3235}
3236
3237int RocksDBStore::reshard_cleanup(const RocksDBStore::columns_t& current_columns)
3238{
3239 std::vector<std::string> new_sharding_columns;
3240 for (const auto& [name, handle] : cf_handles) {
3241 if (handle.handles.size() == 1) {
3242 new_sharding_columns.push_back(name);
3243 } else {
3244 for (size_t i = 0; i < handle.handles.size(); i++) {
20effc67 3245 new_sharding_columns.push_back(name + "-" + std::to_string(i));
f67539c2
TL
3246 }
3247 }
3248 }
3249
3250 for (auto& [name, handle] : current_columns) {
3251 auto found = std::find(new_sharding_columns.begin(),
3252 new_sharding_columns.end(),
3253 name) != new_sharding_columns.end();
3254 if (found || name == rocksdb::kDefaultColumnFamilyName) {
3255 dout(5) << "Column " << name << " is part of new sharding." << dendl;
3256 continue;
3257 }
3258 dout(5) << "Column " << name << " not part of new sharding. Deleting." << dendl;
3259
3260 // verify that column is empty
3261 std::unique_ptr<rocksdb::Iterator> it{
3262 db->NewIterator(rocksdb::ReadOptions(), handle.get())};
3263 ceph_assert(it);
3264 it->SeekToFirst();
3265 ceph_assert(!it->Valid());
3266
3267 if (rocksdb::Status status = db->DropColumnFamily(handle.get()); !status.ok()) {
3268 derr << __func__ << " Failed to drop column: " << name << dendl;
3269 return -EINVAL;
3270 }
3271 }
3272 return 0;
3273}
3274
3275int RocksDBStore::reshard(const std::string& new_sharding, const RocksDBStore::resharding_ctrl* ctrl_in)
3276{
3277
3278 resharding_ctrl ctrl = ctrl_in ? *ctrl_in : resharding_ctrl();
3279 size_t bytes_in_batch = 0;
3280 size_t keys_in_batch = 0;
3281 size_t bytes_per_iterator = 0;
3282 size_t keys_per_iterator = 0;
3283 size_t keys_processed = 0;
3284 size_t keys_moved = 0;
3285
3286 auto flush_batch = [&](rocksdb::WriteBatch* batch) {
3287 dout(10) << "flushing batch, " << keys_in_batch << " keys, for "
3288 << bytes_in_batch << " bytes" << dendl;
3289 rocksdb::WriteOptions woptions;
3290 woptions.sync = true;
3291 rocksdb::Status s = db->Write(woptions, batch);
3292 ceph_assert(s.ok());
3293 bytes_in_batch = 0;
3294 keys_in_batch = 0;
3295 batch->Clear();
3296 };
3297
3298 auto process_column = [&](rocksdb::ColumnFamilyHandle* handle,
3299 const std::string& fixed_prefix)
3300 {
3301 dout(5) << " column=" << (void*)handle << " prefix=" << fixed_prefix << dendl;
3302 std::unique_ptr<rocksdb::Iterator> it{
3303 db->NewIterator(rocksdb::ReadOptions(), handle)};
3304 ceph_assert(it);
3305
3306 rocksdb::WriteBatch bat;
3307 for (it->SeekToFirst(); it->Valid(); it->Next()) {
3308 rocksdb::Slice raw_key = it->key();
3309 dout(30) << "key=" << pretty_binary_string(raw_key.ToString()) << dendl;
3310 //check if need to refresh iterator
3311 if (bytes_per_iterator >= ctrl.bytes_per_iterator ||
3312 keys_per_iterator >= ctrl.keys_per_iterator) {
3313 dout(8) << "refreshing iterator" << dendl;
3314 bytes_per_iterator = 0;
3315 keys_per_iterator = 0;
3316 std::string raw_key_str = raw_key.ToString();
3317 it.reset(db->NewIterator(rocksdb::ReadOptions(), handle));
3318 ceph_assert(it);
3319 it->Seek(raw_key_str);
3320 ceph_assert(it->Valid());
3321 raw_key = it->key();
3322 }
3323 rocksdb::Slice value = it->value();
3324 std::string prefix, key;
3325 if (fixed_prefix.size() == 0) {
3326 split_key(raw_key, &prefix, &key);
3327 } else {
3328 prefix = fixed_prefix;
3329 key = raw_key.ToString();
3330 }
3331 keys_processed++;
3332 if ((keys_processed % 10000) == 0) {
3333 dout(10) << "processed " << keys_processed << " keys, moved " << keys_moved << dendl;
3334 }
3335 rocksdb::ColumnFamilyHandle* new_handle = get_cf_handle(prefix, key);
3336 if (new_handle == nullptr) {
3337 new_handle = default_cf;
3338 }
3339 if (handle == new_handle) {
3340 continue;
3341 }
3342 std::string new_raw_key;
3343 if (new_handle == default_cf) {
3344 new_raw_key = combine_strings(prefix, key);
3345 } else {
3346 new_raw_key = key;
3347 }
3348 bat.Delete(handle, raw_key);
3349 bat.Put(new_handle, new_raw_key, value);
3350 dout(25) << "moving " << (void*)handle << "/" << pretty_binary_string(raw_key.ToString()) <<
3351 " to " << (void*)new_handle << "/" << pretty_binary_string(new_raw_key) <<
3352 " size " << value.size() << dendl;
3353 keys_moved++;
3354 bytes_in_batch += new_raw_key.size() * 2 + value.size();
3355 keys_in_batch++;
3356 bytes_per_iterator += new_raw_key.size() * 2 + value.size();
3357 keys_per_iterator++;
3358
3359 //check if need to write batch
3360 if (bytes_in_batch >= ctrl.bytes_per_batch ||
3361 keys_in_batch >= ctrl.keys_per_batch) {
3362 flush_batch(&bat);
3363 if (ctrl.unittest_fail_after_first_batch) {
3364 return -1000;
3365 }
3366 }
3367 }
3368 if (bat.Count() > 0) {
3369 flush_batch(&bat);
3370 }
3371 return 0;
3372 };
3373
3374 auto close_column_handles = make_scope_guard([this] {
3375 cf_handles.clear();
3376 close();
3377 });
3378 columns_t to_process_columns;
3379 int r = prepare_for_reshard(new_sharding, to_process_columns);
3380 if (r != 0) {
3381 dout(1) << "failed to prepare db for reshard" << dendl;
3382 return r;
3383 }
3384
3385 for (auto& [name, handle] : to_process_columns) {
3386 dout(5) << "Processing column=" << name
3387 << " handle=" << handle.get() << dendl;
3388 if (name == rocksdb::kDefaultColumnFamilyName) {
3389 ceph_assert(handle.get() == default_cf);
3390 r = process_column(default_cf, std::string());
3391 } else {
3392 std::string fixed_prefix = name.substr(0, name.find('-'));
3393 dout(10) << "Prefix: " << fixed_prefix << dendl;
3394 r = process_column(handle.get(), fixed_prefix);
3395 }
3396 if (r != 0) {
3397 derr << "Error processing column " << name << dendl;
3398 return r;
3399 }
3400 if (ctrl.unittest_fail_after_processing_column) {
3401 return -1001;
3402 }
3403 }
3404
3405 r = reshard_cleanup(to_process_columns);
3406 if (r != 0) {
3407 dout(5) << "failed to cleanup after reshard" << dendl;
3408 return r;
3409 }
3410
3411 if (ctrl.unittest_fail_after_successful_processing) {
3412 return -1002;
3413 }
3414 env->CreateDir(sharding_def_dir);
3415 if (auto status = rocksdb::WriteStringToFile(env, new_sharding,
3416 sharding_def_file, true);
3417 !status.ok()) {
3418 derr << __func__ << " cannot write to " << sharding_def_file << dendl;
3419 return -EIO;
3420 }
3421
3422 return r;
3423}
3424
3425bool RocksDBStore::get_sharding(std::string& sharding) {
3426 rocksdb::Status status;
3427 std::string stored_sharding_text;
3428 bool result = false;
3429 sharding.clear();
3430
3431 status = env->FileExists(sharding_def_file);
3432 if (status.ok()) {
3433 status = rocksdb::ReadFileToString(env,
3434 sharding_def_file,
3435 &stored_sharding_text);
3436 if(status.ok()) {
3437 result = true;
3438 sharding = stored_sharding_text;
3439 }
3440 }
3441 return result;
11fdf7f2 3442}