]> git.proxmox.com Git - ceph.git/blob - ceph/src/kv/RocksDBStore.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / kv / RocksDBStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <filesystem>
5 #include <map>
6 #include <memory>
7 #include <set>
8 #include <string>
9 #include <errno.h>
10 #include <unistd.h>
11 #include <sys/types.h>
12 #include <sys/stat.h>
13
14 #include "rocksdb/db.h"
15 #include "rocksdb/table.h"
16 #include "rocksdb/env.h"
17 #include "rocksdb/slice.h"
18 #include "rocksdb/cache.h"
19 #include "rocksdb/filter_policy.h"
20 #include "rocksdb/utilities/convenience.h"
21 #include "rocksdb/utilities/table_properties_collectors.h"
22 #include "rocksdb/merge_operator.h"
23
24 #include "common/perf_counters.h"
25 #include "common/PriorityCache.h"
26 #include "include/common_fwd.h"
27 #include "include/scope_guard.h"
28 #include "include/str_list.h"
29 #include "include/stringify.h"
30 #include "include/str_map.h"
31 #include "KeyValueDB.h"
32 #include "RocksDBStore.h"
33
34 #include "common/debug.h"
35
36 #define dout_context cct
37 #define dout_subsys ceph_subsys_rocksdb
38 #undef dout_prefix
39 #define dout_prefix *_dout << "rocksdb: "
40
41 namespace fs = std::filesystem;
42
43 using std::function;
44 using std::list;
45 using std::map;
46 using std::ostream;
47 using std::pair;
48 using std::set;
49 using std::string;
50 using std::unique_ptr;
51 using std::vector;
52
53 using ceph::bufferlist;
54 using ceph::bufferptr;
55 using ceph::Formatter;
56
57 static const char* sharding_def_dir = "sharding";
58 static const char* sharding_def_file = "sharding/def";
59 static const char* sharding_recreate = "sharding/recreate_columns";
60 static const char* resharding_column_lock = "reshardingXcommencingXlocked";
61
62 static bufferlist to_bufferlist(rocksdb::Slice in) {
63 bufferlist bl;
64 bl.append(bufferptr(in.data(), in.size()));
65 return bl;
66 }
67
68 static rocksdb::SliceParts prepare_sliceparts(const bufferlist &bl,
69 vector<rocksdb::Slice> *slices)
70 {
71 unsigned n = 0;
72 for (auto& buf : bl.buffers()) {
73 (*slices)[n].data_ = buf.c_str();
74 (*slices)[n].size_ = buf.length();
75 n++;
76 }
77 return rocksdb::SliceParts(slices->data(), slices->size());
78 }
79
80
81 //
82 // One of these for the default rocksdb column family, routing each prefix
83 // to the appropriate MergeOperator.
84 //
85 class RocksDBStore::MergeOperatorRouter
86 : public rocksdb::AssociativeMergeOperator
87 {
88 RocksDBStore& store;
89 public:
90 const char *Name() const override {
91 // Construct a name that rocksDB will validate against. We want to
92 // do this in a way that doesn't constrain the ordering of calls
93 // to set_merge_operator, so sort the merge operators and then
94 // construct a name from all of those parts.
95 store.assoc_name.clear();
96 map<std::string,std::string> names;
97
98 for (auto& p : store.merge_ops) {
99 names[p.first] = p.second->name();
100 }
101 for (auto& p : names) {
102 store.assoc_name += '.';
103 store.assoc_name += p.first;
104 store.assoc_name += ':';
105 store.assoc_name += p.second;
106 }
107 return store.assoc_name.c_str();
108 }
109
110 explicit MergeOperatorRouter(RocksDBStore &_store) : store(_store) {}
111
112 bool Merge(const rocksdb::Slice& key,
113 const rocksdb::Slice* existing_value,
114 const rocksdb::Slice& value,
115 std::string* new_value,
116 rocksdb::Logger* logger) const override {
117 // for default column family
118 // extract prefix from key and compare against each registered merge op;
119 // even though merge operator for explicit CF is included in merge_ops,
120 // it won't be picked up, since it won't match.
121 for (auto& p : store.merge_ops) {
122 if (p.first.compare(0, p.first.length(),
123 key.data(), p.first.length()) == 0 &&
124 key.data()[p.first.length()] == 0) {
125 if (existing_value) {
126 p.second->merge(existing_value->data(), existing_value->size(),
127 value.data(), value.size(),
128 new_value);
129 } else {
130 p.second->merge_nonexistent(value.data(), value.size(), new_value);
131 }
132 break;
133 }
134 }
135 return true; // OK :)
136 }
137 };
138
139 //
140 // One of these per non-default column family, linked directly to the
141 // merge operator for that CF/prefix (if any).
142 //
143 class RocksDBStore::MergeOperatorLinker
144 : public rocksdb::AssociativeMergeOperator
145 {
146 private:
147 std::shared_ptr<KeyValueDB::MergeOperator> mop;
148 public:
149 explicit MergeOperatorLinker(const std::shared_ptr<KeyValueDB::MergeOperator> &o) : mop(o) {}
150
151 const char *Name() const override {
152 return mop->name();
153 }
154
155 bool Merge(const rocksdb::Slice& key,
156 const rocksdb::Slice* existing_value,
157 const rocksdb::Slice& value,
158 std::string* new_value,
159 rocksdb::Logger* logger) const override {
160 if (existing_value) {
161 mop->merge(existing_value->data(), existing_value->size(),
162 value.data(), value.size(),
163 new_value);
164 } else {
165 mop->merge_nonexistent(value.data(), value.size(), new_value);
166 }
167 return true;
168 }
169 };
170
171 int RocksDBStore::set_merge_operator(
172 const string& prefix,
173 std::shared_ptr<KeyValueDB::MergeOperator> mop)
174 {
175 // If you fail here, it's because you can't do this on an open database
176 ceph_assert(db == nullptr);
177 merge_ops.push_back(std::make_pair(prefix,mop));
178 return 0;
179 }
180
181 class CephRocksdbLogger : public rocksdb::Logger {
182 CephContext *cct;
183 public:
184 explicit CephRocksdbLogger(CephContext *c) : cct(c) {
185 cct->get();
186 }
187 ~CephRocksdbLogger() override {
188 cct->put();
189 }
190
191 // Write an entry to the log file with the specified format.
192 void Logv(const char* format, va_list ap) override {
193 Logv(rocksdb::INFO_LEVEL, format, ap);
194 }
195
196 // Write an entry to the log file with the specified log level
197 // and format. Any log with level under the internal log level
198 // of *this (see @SetInfoLogLevel and @GetInfoLogLevel) will not be
199 // printed.
200 void Logv(const rocksdb::InfoLogLevel log_level, const char* format,
201 va_list ap) override {
202 int v = rocksdb::NUM_INFO_LOG_LEVELS - log_level - 1;
203 dout(ceph::dout::need_dynamic(v));
204 char buf[65536];
205 vsnprintf(buf, sizeof(buf), format, ap);
206 *_dout << buf << dendl;
207 }
208 };
209
210 rocksdb::Logger *create_rocksdb_ceph_logger()
211 {
212 return new CephRocksdbLogger(g_ceph_context);
213 }
214
215 static int string2bool(const string &val, bool &b_val)
216 {
217 if (strcasecmp(val.c_str(), "false") == 0) {
218 b_val = false;
219 return 0;
220 } else if (strcasecmp(val.c_str(), "true") == 0) {
221 b_val = true;
222 return 0;
223 } else {
224 std::string err;
225 int b = strict_strtol(val.c_str(), 10, &err);
226 if (!err.empty())
227 return -EINVAL;
228 b_val = !!b;
229 return 0;
230 }
231 }
232
233 namespace rocksdb {
234 extern std::string trim(const std::string& str);
235 }
236
237 // this function is a modification of rocksdb's StringToMap:
238 // 1) accepts ' \n ; as separators
239 // 2) leaves compound options with enclosing { and }
240 rocksdb::Status StringToMap(const std::string& opts_str,
241 std::unordered_map<std::string, std::string>* opts_map)
242 {
243 using rocksdb::Status;
244 using rocksdb::trim;
245 assert(opts_map);
246 // Example:
247 // opts_str = "write_buffer_size=1024;max_write_buffer_number=2;"
248 // "nested_opt={opt1=1;opt2=2};max_bytes_for_level_base=100"
249 size_t pos = 0;
250 std::string opts = trim(opts_str);
251 while (pos < opts.size()) {
252 size_t eq_pos = opts.find('=', pos);
253 if (eq_pos == std::string::npos) {
254 return Status::InvalidArgument("Mismatched key value pair, '=' expected");
255 }
256 std::string key = trim(opts.substr(pos, eq_pos - pos));
257 if (key.empty()) {
258 return Status::InvalidArgument("Empty key found");
259 }
260
261 // skip space after '=' and look for '{' for possible nested options
262 pos = eq_pos + 1;
263 while (pos < opts.size() && isspace(opts[pos])) {
264 ++pos;
265 }
266 // Empty value at the end
267 if (pos >= opts.size()) {
268 (*opts_map)[key] = "";
269 break;
270 }
271 if (opts[pos] == '{') {
272 int count = 1;
273 size_t brace_pos = pos + 1;
274 while (brace_pos < opts.size()) {
275 if (opts[brace_pos] == '{') {
276 ++count;
277 } else if (opts[brace_pos] == '}') {
278 --count;
279 if (count == 0) {
280 break;
281 }
282 }
283 ++brace_pos;
284 }
285 // found the matching closing brace
286 if (count == 0) {
287 //include both '{' and '}'
288 (*opts_map)[key] = trim(opts.substr(pos, brace_pos - pos + 1));
289 // skip all whitespace and move to the next ';,'
290 // brace_pos points to the matching '}'
291 pos = brace_pos + 1;
292 while (pos < opts.size() && isspace(opts[pos])) {
293 ++pos;
294 }
295 if (pos < opts.size() && opts[pos] != ';' && opts[pos] != ',') {
296 return Status::InvalidArgument(
297 "Unexpected chars after nested options");
298 }
299 ++pos;
300 } else {
301 return Status::InvalidArgument(
302 "Mismatched curly braces for nested options");
303 }
304 } else {
305 size_t sc_pos = opts.find_first_of(",;", pos);
306 if (sc_pos == std::string::npos) {
307 (*opts_map)[key] = trim(opts.substr(pos));
308 // It either ends with a trailing , ; or the last key-value pair
309 break;
310 } else {
311 (*opts_map)[key] = trim(opts.substr(pos, sc_pos - pos));
312 }
313 pos = sc_pos + 1;
314 }
315 }
316 return Status::OK();
317 }
318
319 int RocksDBStore::tryInterpret(const string &key, const string &val, rocksdb::Options &opt)
320 {
321 if (key == "compaction_threads") {
322 std::string err;
323 int f = strict_iecstrtoll(val, &err);
324 if (!err.empty())
325 return -EINVAL;
326 //Low priority threadpool is used for compaction
327 opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::LOW);
328 } else if (key == "flusher_threads") {
329 std::string err;
330 int f = strict_iecstrtoll(val, &err);
331 if (!err.empty())
332 return -EINVAL;
333 //High priority threadpool is used for flusher
334 opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::HIGH);
335 } else if (key == "compact_on_mount") {
336 int ret = string2bool(val, compact_on_mount);
337 if (ret != 0)
338 return ret;
339 } else if (key == "disableWAL") {
340 int ret = string2bool(val, disableWAL);
341 if (ret != 0)
342 return ret;
343 } else {
344 //unrecognize config options.
345 return -EINVAL;
346 }
347 return 0;
348 }
349
350 int RocksDBStore::ParseOptionsFromString(const string &opt_str, rocksdb::Options &opt)
351 {
352 return ParseOptionsFromStringStatic(cct, opt_str, opt,
353 [&](const string& k, const string& v, rocksdb::Options& o) {
354 return tryInterpret(k, v, o);
355 }
356 );
357 }
358
359 int RocksDBStore::ParseOptionsFromStringStatic(
360 CephContext *cct,
361 const string& opt_str,
362 rocksdb::Options& opt,
363 function<int(const string&, const string&, rocksdb::Options&)> interp)
364 {
365 // keep aligned with func tryInterpret
366 const set<string> need_interp_keys = {"compaction_threads", "flusher_threads", "compact_on_mount", "disableWAL"};
367 rocksdb::Status status;
368 std::unordered_map<std::string, std::string> str_map;
369 status = StringToMap(opt_str, &str_map);
370 if (!status.ok()) {
371 dout(5) << __func__ << " error '" << status.getState() <<
372 "' while parsing options '" << opt_str << "'" << dendl;
373 return -EINVAL;
374 }
375
376 for (auto it = str_map.begin(); it != str_map.end(); ++it) {
377 string this_opt = it->first + "=" + it->second;
378 rocksdb::Status status =
379 rocksdb::GetOptionsFromString(opt, this_opt, &opt);
380 int r = 0;
381 if (!status.ok()) {
382 if (interp != nullptr) {
383 r = interp(it->first, it->second, opt);
384 } else if (!need_interp_keys.count(it->first)) {
385 r = -1;
386 }
387 if (r < 0) {
388 derr << status.ToString() << dendl;
389 return -EINVAL;
390 }
391 }
392 lgeneric_dout(cct, 1) << " set rocksdb option " << it->first
393 << " = " << it->second << dendl;
394 }
395 return 0;
396 }
397
398 int RocksDBStore::init(string _options_str)
399 {
400 options_str = _options_str;
401 rocksdb::Options opt;
402 //try parse options
403 if (options_str.length()) {
404 int r = ParseOptionsFromString(options_str, opt);
405 if (r != 0) {
406 return -EINVAL;
407 }
408 }
409 return 0;
410 }
411
412 int RocksDBStore::create_db_dir()
413 {
414 if (env) {
415 unique_ptr<rocksdb::Directory> dir;
416 env->NewDirectory(path, &dir);
417 } else {
418 if (!fs::exists(path)) {
419 std::error_code ec;
420 if (!fs::create_directory(path, ec)) {
421 derr << __func__ << " failed to create " << path
422 << ": " << ec.message() << dendl;
423 return -ec.value();
424 }
425 fs::permissions(path,
426 fs::perms::owner_all |
427 fs::perms::group_read | fs::perms::group_exec |
428 fs::perms::others_read | fs::perms::others_exec);
429 }
430 }
431 return 0;
432 }
433
434 int RocksDBStore::install_cf_mergeop(
435 const string &key_prefix,
436 rocksdb::ColumnFamilyOptions *cf_opt)
437 {
438 ceph_assert(cf_opt != nullptr);
439 cf_opt->merge_operator.reset();
440 for (auto& i : merge_ops) {
441 if (i.first == key_prefix) {
442 cf_opt->merge_operator.reset(new MergeOperatorLinker(i.second));
443 }
444 }
445 return 0;
446 }
447
448 int RocksDBStore::create_and_open(ostream &out,
449 const std::string& cfs)
450 {
451 int r = create_db_dir();
452 if (r < 0)
453 return r;
454 return do_open(out, true, false, cfs);
455 }
456
457 std::shared_ptr<rocksdb::Cache> RocksDBStore::create_block_cache(
458 const std::string& cache_type, size_t cache_size, double cache_prio_high) {
459 std::shared_ptr<rocksdb::Cache> cache;
460 auto shard_bits = cct->_conf->rocksdb_cache_shard_bits;
461 if (cache_type == "binned_lru") {
462 cache = rocksdb_cache::NewBinnedLRUCache(cct, cache_size, shard_bits, false, cache_prio_high);
463 } else if (cache_type == "lru") {
464 cache = rocksdb::NewLRUCache(cache_size, shard_bits);
465 } else if (cache_type == "clock") {
466 cache = rocksdb::NewClockCache(cache_size, shard_bits);
467 if (!cache) {
468 derr << "rocksdb_cache_type '" << cache
469 << "' chosen, but RocksDB not compiled with LibTBB. "
470 << dendl;
471 }
472 } else {
473 derr << "unrecognized rocksdb_cache_type '" << cache_type << "'" << dendl;
474 }
475 return cache;
476 }
477
478 int RocksDBStore::load_rocksdb_options(bool create_if_missing, rocksdb::Options& opt)
479 {
480 rocksdb::Status status;
481
482 if (options_str.length()) {
483 int r = ParseOptionsFromString(options_str, opt);
484 if (r != 0) {
485 return -EINVAL;
486 }
487 }
488
489 if (cct->_conf->rocksdb_perf) {
490 dbstats = rocksdb::CreateDBStatistics();
491 opt.statistics = dbstats;
492 }
493
494 opt.create_if_missing = create_if_missing;
495 if (kv_options.count("separate_wal_dir")) {
496 opt.wal_dir = path + ".wal";
497 }
498
499 // Since ceph::for_each_substr doesn't return a value and
500 // std::stoull does throw, we may as well just catch everything here.
501 try {
502 if (kv_options.count("db_paths")) {
503 list<string> paths;
504 get_str_list(kv_options["db_paths"], "; \t", paths);
505 for (auto& p : paths) {
506 size_t pos = p.find(',');
507 if (pos == std::string::npos) {
508 derr << __func__ << " invalid db path item " << p << " in "
509 << kv_options["db_paths"] << dendl;
510 return -EINVAL;
511 }
512 string path = p.substr(0, pos);
513 string size_str = p.substr(pos + 1);
514 uint64_t size = atoll(size_str.c_str());
515 if (!size) {
516 derr << __func__ << " invalid db path item " << p << " in "
517 << kv_options["db_paths"] << dendl;
518 return -EINVAL;
519 }
520 opt.db_paths.push_back(rocksdb::DbPath(path, size));
521 dout(10) << __func__ << " db_path " << path << " size " << size << dendl;
522 }
523 }
524 } catch (const std::system_error& e) {
525 return -e.code().value();
526 }
527
528 if (cct->_conf->rocksdb_log_to_ceph_log) {
529 opt.info_log.reset(new CephRocksdbLogger(cct));
530 }
531
532 if (priv) {
533 dout(10) << __func__ << " using custom Env " << priv << dendl;
534 opt.env = static_cast<rocksdb::Env*>(priv);
535 } else {
536 env = opt.env;
537 }
538
539 opt.env->SetAllowNonOwnerAccess(false);
540
541 // caches
542 if (!set_cache_flag) {
543 cache_size = cct->_conf->rocksdb_cache_size;
544 }
545 uint64_t row_cache_size = cache_size * cct->_conf->rocksdb_cache_row_ratio;
546 uint64_t block_cache_size = cache_size - row_cache_size;
547
548 bbt_opts.block_cache = create_block_cache(cct->_conf->rocksdb_cache_type, block_cache_size);
549 if (!bbt_opts.block_cache) {
550 return -EINVAL;
551 }
552 bbt_opts.block_size = cct->_conf->rocksdb_block_size;
553
554 if (row_cache_size > 0)
555 opt.row_cache = rocksdb::NewLRUCache(row_cache_size,
556 cct->_conf->rocksdb_cache_shard_bits);
557 uint64_t bloom_bits = cct->_conf.get_val<uint64_t>("rocksdb_bloom_bits_per_key");
558 if (bloom_bits > 0) {
559 dout(10) << __func__ << " set bloom filter bits per key to "
560 << bloom_bits << dendl;
561 bbt_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(bloom_bits));
562 }
563 using std::placeholders::_1;
564 if (cct->_conf.with_val<std::string>("rocksdb_index_type",
565 std::bind(std::equal_to<std::string>(), _1,
566 "binary_search")))
567 bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kBinarySearch;
568 if (cct->_conf.with_val<std::string>("rocksdb_index_type",
569 std::bind(std::equal_to<std::string>(), _1,
570 "hash_search")))
571 bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kHashSearch;
572 if (cct->_conf.with_val<std::string>("rocksdb_index_type",
573 std::bind(std::equal_to<std::string>(), _1,
574 "two_level")))
575 bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
576 if (!bbt_opts.no_block_cache) {
577 bbt_opts.cache_index_and_filter_blocks =
578 cct->_conf.get_val<bool>("rocksdb_cache_index_and_filter_blocks");
579 bbt_opts.cache_index_and_filter_blocks_with_high_priority =
580 cct->_conf.get_val<bool>("rocksdb_cache_index_and_filter_blocks_with_high_priority");
581 bbt_opts.pin_l0_filter_and_index_blocks_in_cache =
582 cct->_conf.get_val<bool>("rocksdb_pin_l0_filter_and_index_blocks_in_cache");
583 }
584 bbt_opts.partition_filters = cct->_conf.get_val<bool>("rocksdb_partition_filters");
585 if (cct->_conf.get_val<Option::size_t>("rocksdb_metadata_block_size") > 0)
586 bbt_opts.metadata_block_size = cct->_conf.get_val<Option::size_t>("rocksdb_metadata_block_size");
587
588 opt.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbt_opts));
589 dout(10) << __func__ << " block size " << cct->_conf->rocksdb_block_size
590 << ", block_cache size " << byte_u_t(block_cache_size)
591 << ", row_cache size " << byte_u_t(row_cache_size)
592 << "; shards "
593 << (1 << cct->_conf->rocksdb_cache_shard_bits)
594 << ", type " << cct->_conf->rocksdb_cache_type
595 << dendl;
596
597 opt.merge_operator.reset(new MergeOperatorRouter(*this));
598 comparator = opt.comparator;
599 return 0;
600 }
601
602 void RocksDBStore::add_column_family(const std::string& cf_name, uint32_t hash_l, uint32_t hash_h,
603 size_t shard_idx, rocksdb::ColumnFamilyHandle *handle) {
604 dout(10) << __func__ << " column_name=" << cf_name << " shard_idx=" << shard_idx <<
605 " hash_l=" << hash_l << " hash_h=" << hash_h << " handle=" << (void*) handle << dendl;
606 bool exists = cf_handles.count(cf_name) > 0;
607 auto& column = cf_handles[cf_name];
608 if (exists) {
609 ceph_assert(hash_l == column.hash_l);
610 ceph_assert(hash_h == column.hash_h);
611 } else {
612 ceph_assert(hash_l < hash_h);
613 column.hash_l = hash_l;
614 column.hash_h = hash_h;
615 }
616 if (column.handles.size() <= shard_idx)
617 column.handles.resize(shard_idx + 1);
618 column.handles[shard_idx] = handle;
619 cf_ids_to_prefix.emplace(handle->GetID(), cf_name);
620 }
621
622 bool RocksDBStore::is_column_family(const std::string& prefix) {
623 return cf_handles.count(prefix);
624 }
625
626 std::string_view RocksDBStore::get_key_hash_view(const prefix_shards& shards, const char* key, const size_t keylen) {
627 uint32_t hash_l = std::min<uint32_t>(shards.hash_l, keylen);
628 uint32_t hash_h = std::min<uint32_t>(shards.hash_h, keylen);
629 return { key + hash_l, hash_h - hash_l };
630 }
631
632 rocksdb::ColumnFamilyHandle *RocksDBStore::get_key_cf(const prefix_shards& shards, const char* key, const size_t keylen) {
633 auto sv = get_key_hash_view(shards, key, keylen);
634 uint32_t hash = ceph_str_hash_rjenkins(sv.data(), sv.size());
635 return shards.handles[hash % shards.handles.size()];
636 }
637
638 rocksdb::ColumnFamilyHandle *RocksDBStore::get_cf_handle(const std::string& prefix, const std::string& key) {
639 auto iter = cf_handles.find(prefix);
640 if (iter == cf_handles.end()) {
641 return nullptr;
642 } else {
643 if (iter->second.handles.size() == 1) {
644 return iter->second.handles[0];
645 } else {
646 return get_key_cf(iter->second, key.data(), key.size());
647 }
648 }
649 }
650
651 rocksdb::ColumnFamilyHandle *RocksDBStore::get_cf_handle(const std::string& prefix, const char* key, size_t keylen) {
652 auto iter = cf_handles.find(prefix);
653 if (iter == cf_handles.end()) {
654 return nullptr;
655 } else {
656 if (iter->second.handles.size() == 1) {
657 return iter->second.handles[0];
658 } else {
659 return get_key_cf(iter->second, key, keylen);
660 }
661 }
662 }
663
664 /**
665 * If the specified IteratorBounds arg has both an upper and a lower bound defined, and they have equal placement hash
666 * strings, we can be sure that the entire iteration range exists in a single CF. In that case, we return the relevant
667 * CF handle. In all other cases, we return a nullptr to indicate that the specified bounds cannot necessarily be mapped
668 * to a single CF.
669 */
670 rocksdb::ColumnFamilyHandle *RocksDBStore::check_cf_handle_bounds(const cf_handles_iterator& iter, const IteratorBounds& bounds) {
671 if (!bounds.lower_bound || !bounds.upper_bound) {
672 return nullptr;
673 }
674 ceph_assert(iter != cf_handles.end());
675 ceph_assert(iter->second.handles.size() != 1);
676 if (iter->second.hash_l != 0) {
677 return nullptr;
678 }
679 auto lower_bound_hash_str = get_key_hash_view(iter->second, bounds.lower_bound->data(), bounds.lower_bound->size());
680 auto upper_bound_hash_str = get_key_hash_view(iter->second, bounds.upper_bound->data(), bounds.upper_bound->size());
681 if (lower_bound_hash_str == upper_bound_hash_str) {
682 auto key = *bounds.lower_bound;
683 return get_key_cf(iter->second, key.data(), key.size());
684 } else {
685 return nullptr;
686 }
687 }
688
689 /**
690 * Definition of sharding:
691 * space-separated list of: column_def [ '=' options ]
692 * column_def := column_name '(' shard_count ')'
693 * column_def := column_name '(' shard_count ',' hash_begin '-' ')'
694 * column_def := column_name '(' shard_count ',' hash_begin '-' hash_end ')'
695 * I=write_buffer_size=1048576 O(6) m(7,10-) prefix(4,0-10)=disable_auto_compactions=true,max_bytes_for_level_base=1048576
696 */
697 bool RocksDBStore::parse_sharding_def(const std::string_view text_def_in,
698 std::vector<ColumnFamily>& sharding_def,
699 char const* *error_position,
700 std::string *error_msg)
701 {
702 std::string_view text_def = text_def_in;
703 char const* error_position_local = nullptr;
704 std::string error_msg_local;
705 if (error_position == nullptr) {
706 error_position = &error_position_local;
707 }
708 *error_position = nullptr;
709 if (error_msg == nullptr) {
710 error_msg = &error_msg_local;
711 error_msg->clear();
712 }
713
714 sharding_def.clear();
715 while (!text_def.empty()) {
716 std::string_view options;
717 std::string_view name;
718 size_t shard_cnt = 1;
719 uint32_t l_bound = 0;
720 uint32_t h_bound = std::numeric_limits<uint32_t>::max();
721
722 std::string_view column_def;
723 size_t spos = text_def.find(' ');
724 if (spos == std::string_view::npos) {
725 column_def = text_def;
726 text_def = std::string_view(text_def.end(), 0);
727 } else {
728 column_def = text_def.substr(0, spos);
729 text_def = text_def.substr(spos + 1);
730 }
731 size_t eqpos = column_def.find('=');
732 if (eqpos != std::string_view::npos) {
733 options = column_def.substr(eqpos + 1);
734 column_def = column_def.substr(0, eqpos);
735 }
736
737 size_t bpos = column_def.find('(');
738 if (bpos != std::string_view::npos) {
739 name = column_def.substr(0, bpos);
740 const char* nptr = &column_def[bpos + 1];
741 char* endptr;
742 shard_cnt = strtol(nptr, &endptr, 10);
743 if (nptr == endptr) {
744 *error_position = nptr;
745 *error_msg = "expecting integer";
746 break;
747 }
748 nptr = endptr;
749 if (*nptr == ',') {
750 nptr++;
751 l_bound = strtol(nptr, &endptr, 10);
752 if (nptr == endptr) {
753 *error_position = nptr;
754 *error_msg = "expecting integer";
755 break;
756 }
757 nptr = endptr;
758 if (*nptr != '-') {
759 *error_position = nptr;
760 *error_msg = "expecting '-'";
761 break;
762 }
763 nptr++;
764 h_bound = strtol(nptr, &endptr, 10);
765 if (nptr == endptr) {
766 h_bound = std::numeric_limits<uint32_t>::max();
767 }
768 nptr = endptr;
769 }
770 if (*nptr != ')') {
771 *error_position = nptr;
772 *error_msg = "expecting ')'";
773 break;
774 }
775 } else {
776 name = column_def;
777 }
778 sharding_def.emplace_back(std::string(name), shard_cnt, std::string(options), l_bound, h_bound);
779 }
780 return *error_position == nullptr;
781 }
782
783 void RocksDBStore::sharding_def_to_columns(const std::vector<ColumnFamily>& sharding_def,
784 std::vector<std::string>& columns)
785 {
786 columns.clear();
787 for (size_t i = 0; i < sharding_def.size(); i++) {
788 if (sharding_def[i].shard_cnt == 1) {
789 columns.push_back(sharding_def[i].name);
790 } else {
791 for (size_t j = 0; j < sharding_def[i].shard_cnt; j++) {
792 columns.push_back(sharding_def[i].name + "-" + std::to_string(j));
793 }
794 }
795 }
796 }
797
798 int RocksDBStore::create_shards(const rocksdb::Options& opt,
799 const std::vector<ColumnFamily>& sharding_def)
800 {
801 for (auto& p : sharding_def) {
802 // copy default CF settings, block cache, merge operators as
803 // the base for new CF
804 rocksdb::ColumnFamilyOptions cf_opt(opt);
805 rocksdb::Status status;
806 // apply options to column family
807 int r = update_column_family_options(p.name, p.options, &cf_opt);
808 if (r != 0) {
809 return r;
810 }
811 for (size_t idx = 0; idx < p.shard_cnt; idx++) {
812 std::string cf_name;
813 if (p.shard_cnt == 1)
814 cf_name = p.name;
815 else
816 cf_name = p.name + "-" + std::to_string(idx);
817 rocksdb::ColumnFamilyHandle *cf;
818 status = db->CreateColumnFamily(cf_opt, cf_name, &cf);
819 if (!status.ok()) {
820 derr << __func__ << " Failed to create rocksdb column family: "
821 << cf_name << dendl;
822 return -EINVAL;
823 }
824 // store the new CF handle
825 add_column_family(p.name, p.hash_l, p.hash_h, idx, cf);
826 }
827 }
828 return 0;
829 }
830
831 int RocksDBStore::apply_sharding(const rocksdb::Options& opt,
832 const std::string& sharding_text)
833 {
834 // create and open column families
835 if (!sharding_text.empty()) {
836 bool b;
837 int r;
838 rocksdb::Status status;
839 std::vector<ColumnFamily> sharding_def;
840 char const* error_position;
841 std::string error_msg;
842 b = parse_sharding_def(sharding_text, sharding_def, &error_position, &error_msg);
843 if (!b) {
844 dout(1) << __func__ << " bad sharding: " << dendl;
845 dout(1) << __func__ << sharding_text << dendl;
846 dout(1) << __func__ << std::string(error_position - &sharding_text[0], ' ') << "^" << error_msg << dendl;
847 return -EINVAL;
848 }
849 r = create_shards(opt, sharding_def);
850 if (r != 0 ) {
851 derr << __func__ << " create_shards failed error=" << r << dendl;
852 return r;
853 }
854 opt.env->CreateDir(sharding_def_dir);
855 status = rocksdb::WriteStringToFile(opt.env, sharding_text,
856 sharding_def_file, true);
857 if (!status.ok()) {
858 derr << __func__ << " cannot write to " << sharding_def_file << dendl;
859 return -EIO;
860 }
861 } else {
862 opt.env->DeleteFile(sharding_def_file);
863 }
864 return 0;
865 }
866
867 // linking to rocksdb function defined in options_helper.cc
868 // it can parse nested params like "nested_opt={opt1=1;opt2=2}"
869 extern rocksdb::Status rocksdb::StringToMap(const std::string& opts_str,
870 std::unordered_map<std::string, std::string>* opts_map);
871
872 // Splits column family options from single string into name->value column_opts_map.
873 // The split is done using RocksDB parser that understands "{" and "}", so it
874 // properly extracts compound options.
875 // If non-RocksDB option "block_cache" is defined it is extracted to block_cache_opt.
876 int RocksDBStore::split_column_family_options(const std::string& options,
877 std::unordered_map<std::string, std::string>* opt_map,
878 std::string* block_cache_opt)
879 {
880 dout(20) << __func__ << " options=" << options << dendl;
881 rocksdb::Status status = rocksdb::StringToMap(options, opt_map);
882 if (!status.ok()) {
883 dout(5) << __func__ << " error '" << status.getState()
884 << "' while parsing options '" << options << "'" << dendl;
885 return -EINVAL;
886 }
887 // if "block_cache" option exists, then move it to separate string
888 if (auto it = opt_map->find("block_cache"); it != opt_map->end()) {
889 *block_cache_opt = it->second;
890 opt_map->erase(it);
891 } else {
892 block_cache_opt->clear();
893 }
894 return 0;
895 }
896
897 // Updates column family options.
898 // Take options from more_options and apply them to cf_opt.
899 // Allowed options are exactly the same as allowed for column families in RocksDB.
900 // Ceph addition is "block_cache" option that is translated to block_cache and
901 // allows to specialize separate block cache for O column family.
902 //
903 // base_name - name of column without shard suffix: "-"+number
904 // options - additional options to apply
905 // cf_opt - column family options to update
906 int RocksDBStore::update_column_family_options(const std::string& base_name,
907 const std::string& more_options,
908 rocksdb::ColumnFamilyOptions* cf_opt)
909 {
910 std::unordered_map<std::string, std::string> options_map;
911 std::string block_cache_opt;
912 rocksdb::Status status;
913 int r = split_column_family_options(more_options, &options_map, &block_cache_opt);
914 if (r != 0) {
915 dout(5) << __func__ << " failed to parse options; column family=" << base_name
916 << " options=" << more_options << dendl;
917 return r;
918 }
919 status = rocksdb::GetColumnFamilyOptionsFromMap(*cf_opt, options_map, cf_opt);
920 if (!status.ok()) {
921 dout(5) << __func__ << " invalid column family optionsp; column family="
922 << base_name << " options=" << more_options << dendl;
923 dout(5) << __func__ << " RocksDB error='" << status.getState() << "'" << dendl;
924 return -EINVAL;
925 }
926 if (base_name != rocksdb::kDefaultColumnFamilyName) {
927 // default cf has its merge operator defined in load_rocksdb_options, should not override it
928 install_cf_mergeop(base_name, cf_opt);
929 }
930 if (!block_cache_opt.empty()) {
931 r = apply_block_cache_options(base_name, block_cache_opt, cf_opt);
932 if (r != 0) {
933 // apply_block_cache_options already does all necessary douts
934 return r;
935 }
936 }
937
938 // Set Compact on Deletion Factory
939 if (cct->_conf->rocksdb_cf_compact_on_deletion) {
940 size_t sliding_window = cct->_conf->rocksdb_cf_compact_on_deletion_sliding_window;
941 size_t trigger = cct->_conf->rocksdb_cf_compact_on_deletion_trigger;
942 cf_opt->table_properties_collector_factories.emplace_back(
943 rocksdb::NewCompactOnDeletionCollectorFactory(sliding_window, trigger));
944 }
945 return 0;
946 }
947
948 int RocksDBStore::apply_block_cache_options(const std::string& column_name,
949 const std::string& block_cache_opt,
950 rocksdb::ColumnFamilyOptions* cf_opt)
951 {
952 rocksdb::Status status;
953 std::unordered_map<std::string, std::string> cache_options_map;
954 status = rocksdb::StringToMap(block_cache_opt, &cache_options_map);
955 if (!status.ok()) {
956 dout(5) << __func__ << " invalid block cache options; column=" << column_name
957 << " options=" << block_cache_opt << dendl;
958 dout(5) << __func__ << " RocksDB error='" << status.getState() << "'" << dendl;
959 return -EINVAL;
960 }
961 bool require_new_block_cache = false;
962 std::string cache_type = cct->_conf->rocksdb_cache_type;
963 if (const auto it = cache_options_map.find("type"); it != cache_options_map.end()) {
964 cache_type = it->second;
965 cache_options_map.erase(it);
966 require_new_block_cache = true;
967 }
968 size_t cache_size = cct->_conf->rocksdb_cache_size;
969 if (auto it = cache_options_map.find("size"); it != cache_options_map.end()) {
970 std::string error;
971 cache_size = strict_iecstrtoll(it->second.c_str(), &error);
972 if (!error.empty()) {
973 dout(10) << __func__ << " invalid size: '" << it->second << "'" << dendl;
974 return -EINVAL;
975 }
976 cache_options_map.erase(it);
977 require_new_block_cache = true;
978 }
979 double high_pri_pool_ratio = 0.0;
980 if (auto it = cache_options_map.find("high_ratio"); it != cache_options_map.end()) {
981 std::string error;
982 high_pri_pool_ratio = strict_strtod(it->second.c_str(), &error);
983 if (!error.empty()) {
984 dout(10) << __func__ << " invalid high_pri (float): '" << it->second << "'" << dendl;
985 return -EINVAL;
986 }
987 cache_options_map.erase(it);
988 require_new_block_cache = true;
989 }
990
991 rocksdb::BlockBasedTableOptions column_bbt_opts;
992 status = GetBlockBasedTableOptionsFromMap(bbt_opts, cache_options_map, &column_bbt_opts);
993 if (!status.ok()) {
994 dout(5) << __func__ << " invalid block cache options; column=" << column_name
995 << " options=" << block_cache_opt << dendl;
996 dout(5) << __func__ << " RocksDB error='" << status.getState() << "'" << dendl;
997 return -EINVAL;
998 }
999 std::shared_ptr<rocksdb::Cache> block_cache;
1000 if (column_bbt_opts.no_block_cache) {
1001 // clear all settings except no_block_cache
1002 // rocksdb does not like then
1003 column_bbt_opts = rocksdb::BlockBasedTableOptions();
1004 column_bbt_opts.no_block_cache = true;
1005 } else {
1006 if (require_new_block_cache) {
1007 block_cache = create_block_cache(cache_type, cache_size, high_pri_pool_ratio);
1008 if (!block_cache) {
1009 dout(5) << __func__ << " failed to create block cache for params: " << block_cache_opt << dendl;
1010 return -EINVAL;
1011 }
1012 } else {
1013 block_cache = bbt_opts.block_cache;
1014 }
1015 }
1016 column_bbt_opts.block_cache = block_cache;
1017 cf_bbt_opts[column_name] = column_bbt_opts;
1018 cf_opt->table_factory.reset(NewBlockBasedTableFactory(cf_bbt_opts[column_name]));
1019 return 0;
1020 }
1021
1022 int RocksDBStore::verify_sharding(const rocksdb::Options& opt,
1023 std::vector<rocksdb::ColumnFamilyDescriptor>& existing_cfs,
1024 std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> >& existing_cfs_shard,
1025 std::vector<rocksdb::ColumnFamilyDescriptor>& missing_cfs,
1026 std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> >& missing_cfs_shard)
1027 {
1028 rocksdb::Status status;
1029 std::string stored_sharding_text;
1030 status = opt.env->FileExists(sharding_def_file);
1031 if (status.ok()) {
1032 status = rocksdb::ReadFileToString(opt.env,
1033 sharding_def_file,
1034 &stored_sharding_text);
1035 if(!status.ok()) {
1036 derr << __func__ << " cannot read from " << sharding_def_file << dendl;
1037 return -EIO;
1038 }
1039 dout(20) << __func__ << " sharding=" << stored_sharding_text << dendl;
1040 } else {
1041 dout(30) << __func__ << " no sharding" << dendl;
1042 //no "sharding_def" present
1043 }
1044 //check if sharding_def matches stored_sharding_def
1045 std::vector<ColumnFamily> stored_sharding_def;
1046 parse_sharding_def(stored_sharding_text, stored_sharding_def);
1047
1048 std::sort(stored_sharding_def.begin(), stored_sharding_def.end(),
1049 [](ColumnFamily& a, ColumnFamily& b) { return a.name < b.name; } );
1050
1051 std::vector<string> rocksdb_cfs;
1052 status = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(opt),
1053 path, &rocksdb_cfs);
1054 if (!status.ok()) {
1055 derr << __func__ << " unable to list column families: " << status.ToString() << dendl;
1056 return -EIO;
1057 }
1058 dout(5) << __func__ << " column families from rocksdb: " << rocksdb_cfs << dendl;
1059
1060 auto emplace_cf = [&] (const RocksDBStore::ColumnFamily& column,
1061 int32_t shard_id,
1062 const std::string& shard_name,
1063 const rocksdb::ColumnFamilyOptions& opt) {
1064 if (std::find(rocksdb_cfs.begin(), rocksdb_cfs.end(), shard_name) != rocksdb_cfs.end()) {
1065 existing_cfs.emplace_back(shard_name, opt);
1066 existing_cfs_shard.emplace_back(shard_id, column);
1067 } else {
1068 missing_cfs.emplace_back(shard_name, opt);
1069 missing_cfs_shard.emplace_back(shard_id, column);
1070 }
1071 };
1072
1073 for (auto& column : stored_sharding_def) {
1074 rocksdb::ColumnFamilyOptions cf_opt(opt);
1075 int r = update_column_family_options(column.name, column.options, &cf_opt);
1076 if (r != 0) {
1077 return r;
1078 }
1079 if (column.shard_cnt == 1) {
1080 emplace_cf(column, 0, column.name, cf_opt);
1081 } else {
1082 for (size_t i = 0; i < column.shard_cnt; i++) {
1083 std::string cf_name = column.name + "-" + std::to_string(i);
1084 emplace_cf(column, i, cf_name, cf_opt);
1085 }
1086 }
1087 }
1088 existing_cfs.emplace_back("default", opt);
1089
1090 if (existing_cfs.size() != rocksdb_cfs.size()) {
1091 std::vector<std::string> columns_from_stored;
1092 sharding_def_to_columns(stored_sharding_def, columns_from_stored);
1093 derr << __func__ << " extra columns in rocksdb. rocksdb columns = " << rocksdb_cfs
1094 << " target columns = " << columns_from_stored << dendl;
1095 return -EIO;
1096 }
1097 return 0;
1098 }
1099
1100 std::ostream& operator<<(std::ostream& out, const RocksDBStore::ColumnFamily& cf)
1101 {
1102 out << "(";
1103 out << cf.name;
1104 out << ",";
1105 out << cf.shard_cnt;
1106 out << ",";
1107 out << cf.hash_l;
1108 out << "-";
1109 if (cf.hash_h != std::numeric_limits<uint32_t>::max()) {
1110 out << cf.hash_h;
1111 }
1112 out << ",";
1113 out << cf.options;
1114 out << ")";
1115 return out;
1116 }
1117
1118 int RocksDBStore::do_open(ostream &out,
1119 bool create_if_missing,
1120 bool open_readonly,
1121 const std::string& sharding_text)
1122 {
1123 ceph_assert(!(create_if_missing && open_readonly));
1124 rocksdb::Options opt;
1125 int r = load_rocksdb_options(create_if_missing, opt);
1126 if (r) {
1127 dout(1) << __func__ << " load rocksdb options failed" << dendl;
1128 return r;
1129 }
1130 rocksdb::Status status;
1131 if (create_if_missing) {
1132 status = rocksdb::DB::Open(opt, path, &db);
1133 if (!status.ok()) {
1134 derr << status.ToString() << dendl;
1135 return -EINVAL;
1136 }
1137 r = apply_sharding(opt, sharding_text);
1138 if (r < 0) {
1139 return r;
1140 }
1141 default_cf = db->DefaultColumnFamily();
1142 } else {
1143 std::vector<rocksdb::ColumnFamilyDescriptor> existing_cfs;
1144 std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> > existing_cfs_shard;
1145 std::vector<rocksdb::ColumnFamilyDescriptor> missing_cfs;
1146 std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> > missing_cfs_shard;
1147
1148 r = verify_sharding(opt,
1149 existing_cfs, existing_cfs_shard,
1150 missing_cfs, missing_cfs_shard);
1151 if (r < 0) {
1152 return r;
1153 }
1154 std::string sharding_recreate_text;
1155 status = rocksdb::ReadFileToString(opt.env,
1156 sharding_recreate,
1157 &sharding_recreate_text);
1158 bool recreate_mode = status.ok() && sharding_recreate_text == "1";
1159
1160 ceph_assert(!recreate_mode || !open_readonly);
1161 if (recreate_mode == false && missing_cfs.size() != 0) {
1162 // We do not accept when there are missing column families, except case that we are during resharding.
1163 // We can get into this case if resharding was interrupted. It gives a chance to continue.
1164 // Opening DB is only allowed in read-only mode.
1165 if (open_readonly == false &&
1166 std::find_if(missing_cfs.begin(), missing_cfs.end(),
1167 [](const rocksdb::ColumnFamilyDescriptor& c) { return c.name == resharding_column_lock; }
1168 ) != missing_cfs.end()) {
1169 derr << __func__ << " missing column families: " << missing_cfs_shard << dendl;
1170 return -EIO;
1171 }
1172 }
1173
1174 if (existing_cfs.empty()) {
1175 // no column families
1176 if (open_readonly) {
1177 status = rocksdb::DB::OpenForReadOnly(opt, path, &db);
1178 } else {
1179 status = rocksdb::DB::Open(opt, path, &db);
1180 }
1181 if (!status.ok()) {
1182 derr << status.ToString() << dendl;
1183 return -EINVAL;
1184 }
1185 default_cf = db->DefaultColumnFamily();
1186 } else {
1187 std::vector<rocksdb::ColumnFamilyHandle*> handles;
1188 if (open_readonly) {
1189 status = rocksdb::DB::OpenForReadOnly(rocksdb::DBOptions(opt),
1190 path, existing_cfs,
1191 &handles, &db);
1192 } else {
1193 status = rocksdb::DB::Open(rocksdb::DBOptions(opt),
1194 path, existing_cfs, &handles, &db);
1195 }
1196 if (!status.ok()) {
1197 derr << status.ToString() << dendl;
1198 return -EINVAL;
1199 }
1200 ceph_assert(existing_cfs.size() == existing_cfs_shard.size() + 1);
1201 ceph_assert(handles.size() == existing_cfs.size());
1202 dout(10) << __func__ << " existing_cfs=" << existing_cfs.size() << dendl;
1203 for (size_t i = 0; i < existing_cfs_shard.size(); i++) {
1204 add_column_family(existing_cfs_shard[i].second.name,
1205 existing_cfs_shard[i].second.hash_l,
1206 existing_cfs_shard[i].second.hash_h,
1207 existing_cfs_shard[i].first,
1208 handles[i]);
1209 }
1210 default_cf = handles[handles.size() - 1];
1211 must_close_default_cf = true;
1212
1213 if (missing_cfs.size() > 0 &&
1214 std::find_if(missing_cfs.begin(), missing_cfs.end(),
1215 [](const rocksdb::ColumnFamilyDescriptor& c) { return c.name == resharding_column_lock; }
1216 ) == missing_cfs.end())
1217 {
1218 dout(10) << __func__ << " missing_cfs=" << missing_cfs.size() << dendl;
1219 ceph_assert(recreate_mode);
1220 ceph_assert(missing_cfs.size() == missing_cfs_shard.size());
1221 for (size_t i = 0; i < missing_cfs.size(); i++) {
1222 rocksdb::ColumnFamilyHandle *cf;
1223 status = db->CreateColumnFamily(missing_cfs[i].options, missing_cfs[i].name, &cf);
1224 if (!status.ok()) {
1225 derr << __func__ << " Failed to create rocksdb column family: "
1226 << missing_cfs[i].name << dendl;
1227 return -EINVAL;
1228 }
1229 add_column_family(missing_cfs_shard[i].second.name,
1230 missing_cfs_shard[i].second.hash_l,
1231 missing_cfs_shard[i].second.hash_h,
1232 missing_cfs_shard[i].first,
1233 cf);
1234 }
1235 opt.env->DeleteFile(sharding_recreate);
1236 }
1237 }
1238 }
1239 ceph_assert(default_cf != nullptr);
1240
1241 PerfCountersBuilder plb(cct, "rocksdb", l_rocksdb_first, l_rocksdb_last);
1242 plb.add_time_avg(l_rocksdb_get_latency, "get_latency", "Get latency");
1243 plb.add_time_avg(l_rocksdb_submit_latency, "submit_latency", "Submit Latency");
1244 plb.add_time_avg(l_rocksdb_submit_sync_latency, "submit_sync_latency", "Submit Sync Latency");
1245 plb.add_u64_counter(l_rocksdb_compact, "compact", "Compactions");
1246 plb.add_u64_counter(l_rocksdb_compact_range, "compact_range", "Compactions by range");
1247 plb.add_u64_counter(l_rocksdb_compact_queue_merge, "compact_queue_merge", "Mergings of ranges in compaction queue");
1248 plb.add_u64(l_rocksdb_compact_queue_len, "compact_queue_len", "Length of compaction queue");
1249 plb.add_time_avg(l_rocksdb_write_wal_time, "rocksdb_write_wal_time", "Rocksdb write wal time");
1250 plb.add_time_avg(l_rocksdb_write_memtable_time, "rocksdb_write_memtable_time", "Rocksdb write memtable time");
1251 plb.add_time_avg(l_rocksdb_write_delay_time, "rocksdb_write_delay_time", "Rocksdb write delay time");
1252 plb.add_time_avg(l_rocksdb_write_pre_and_post_process_time,
1253 "rocksdb_write_pre_and_post_time", "total time spent on writing a record, excluding write process");
1254 logger = plb.create_perf_counters();
1255 cct->get_perfcounters_collection()->add(logger);
1256
1257 if (compact_on_mount) {
1258 derr << "Compacting rocksdb store..." << dendl;
1259 compact();
1260 derr << "Finished compacting rocksdb store" << dendl;
1261 }
1262 return 0;
1263 }
1264
1265 int RocksDBStore::_test_init(const string& dir)
1266 {
1267 rocksdb::Options options;
1268 options.create_if_missing = true;
1269 rocksdb::DB *db;
1270 rocksdb::Status status = rocksdb::DB::Open(options, dir, &db);
1271 delete db;
1272 db = nullptr;
1273 return status.ok() ? 0 : -EIO;
1274 }
1275
1276 RocksDBStore::~RocksDBStore()
1277 {
1278 close();
1279 if (priv) {
1280 delete static_cast<rocksdb::Env*>(priv);
1281 }
1282 }
1283
1284 void RocksDBStore::close()
1285 {
1286 // stop compaction thread
1287 compact_queue_lock.lock();
1288 if (compact_thread.is_started()) {
1289 dout(1) << __func__ << " waiting for compaction thread to stop" << dendl;
1290 compact_queue_stop = true;
1291 compact_queue_cond.notify_all();
1292 compact_queue_lock.unlock();
1293 compact_thread.join();
1294 dout(1) << __func__ << " compaction thread to stopped" << dendl;
1295 } else {
1296 compact_queue_lock.unlock();
1297 }
1298
1299 if (logger) {
1300 cct->get_perfcounters_collection()->remove(logger);
1301 delete logger;
1302 logger = nullptr;
1303 }
1304
1305 // Ensure db is destroyed before dependent db_cache and filterpolicy
1306 for (auto& p : cf_handles) {
1307 for (size_t i = 0; i < p.second.handles.size(); i++) {
1308 db->DestroyColumnFamilyHandle(p.second.handles[i]);
1309 }
1310 }
1311 cf_handles.clear();
1312 if (must_close_default_cf) {
1313 db->DestroyColumnFamilyHandle(default_cf);
1314 must_close_default_cf = false;
1315 }
1316 default_cf = nullptr;
1317 delete db;
1318 db = nullptr;
1319 }
1320
1321 int RocksDBStore::repair(std::ostream &out)
1322 {
1323 rocksdb::Status status;
1324 rocksdb::Options opt;
1325 int r = load_rocksdb_options(false, opt);
1326 if (r) {
1327 dout(1) << __func__ << " load rocksdb options failed" << dendl;
1328 out << "load rocksdb options failed" << std::endl;
1329 return r;
1330 }
1331 //need to save sharding definition, repairDB will delete files it does not know
1332 std::string stored_sharding_text;
1333 status = opt.env->FileExists(sharding_def_file);
1334 if (status.ok()) {
1335 status = rocksdb::ReadFileToString(opt.env,
1336 sharding_def_file,
1337 &stored_sharding_text);
1338 if (!status.ok()) {
1339 stored_sharding_text.clear();
1340 }
1341 }
1342 dout(10) << __func__ << " stored_sharding: " << stored_sharding_text << dendl;
1343 status = rocksdb::RepairDB(path, opt);
1344 bool repaired = status.ok();
1345 if (!stored_sharding_text.empty()) {
1346 //recreate markers even if repair failed
1347 opt.env->CreateDir(sharding_def_dir);
1348 status = rocksdb::WriteStringToFile(opt.env, stored_sharding_text,
1349 sharding_def_file, true);
1350 if (!status.ok()) {
1351 derr << __func__ << " cannot write to " << sharding_def_file << dendl;
1352 return -1;
1353 }
1354 status = rocksdb::WriteStringToFile(opt.env, "1",
1355 sharding_recreate, true);
1356 if (!status.ok()) {
1357 derr << __func__ << " cannot write to " << sharding_recreate << dendl;
1358 return -1;
1359 }
1360 // fiinalize sharding recreate
1361 if (do_open(out, false, false)) {
1362 derr << __func__ << " cannot finalize repair" << dendl;
1363 return -1;
1364 }
1365 close();
1366 }
1367
1368 if (repaired && status.ok()) {
1369 return 0;
1370 } else {
1371 out << "repair rocksdb failed : " << status.ToString() << std::endl;
1372 return -1;
1373 }
1374 }
1375
1376 void RocksDBStore::split_stats(const std::string &s, char delim, std::vector<std::string> &elems) {
1377 std::stringstream ss;
1378 ss.str(s);
1379 std::string item;
1380 while (std::getline(ss, item, delim)) {
1381 elems.push_back(item);
1382 }
1383 }
1384
1385 bool RocksDBStore::get_property(
1386 const std::string &property,
1387 uint64_t *out)
1388 {
1389 return db->GetIntProperty(property, out);
1390 }
1391
1392 int64_t RocksDBStore::estimate_prefix_size(const string& prefix,
1393 const string& key_prefix)
1394 {
1395 uint64_t size = 0;
1396 auto p_iter = cf_handles.find(prefix);
1397 if (p_iter != cf_handles.end()) {
1398 for (auto cf : p_iter->second.handles) {
1399 uint64_t s = 0;
1400 string start = key_prefix + string(1, '\x00');
1401 string limit = key_prefix + string("\xff\xff\xff\xff");
1402 rocksdb::Range r(start, limit);
1403 db->GetApproximateSizes(cf, &r, 1, &s);
1404 size += s;
1405 }
1406 } else {
1407 string start = combine_strings(prefix , key_prefix);
1408 string limit = combine_strings(prefix , key_prefix + "\xff\xff\xff\xff");
1409 rocksdb::Range r(start, limit);
1410 db->GetApproximateSizes(default_cf, &r, 1, &size);
1411 }
1412 return size;
1413 }
1414
1415 void RocksDBStore::get_statistics(Formatter *f)
1416 {
1417 if (!cct->_conf->rocksdb_perf) {
1418 dout(20) << __func__ << " RocksDB perf is disabled, can't probe for stats"
1419 << dendl;
1420 return;
1421 }
1422
1423 if (cct->_conf->rocksdb_collect_compaction_stats) {
1424 std::string stat_str;
1425 bool status = db->GetProperty("rocksdb.stats", &stat_str);
1426 if (status) {
1427 f->open_object_section("rocksdb_statistics");
1428 f->dump_string("rocksdb_compaction_statistics", "");
1429 vector<string> stats;
1430 split_stats(stat_str, '\n', stats);
1431 for (auto st :stats) {
1432 f->dump_string("", st);
1433 }
1434 f->close_section();
1435 }
1436 }
1437 if (cct->_conf->rocksdb_collect_extended_stats) {
1438 if (dbstats) {
1439 f->open_object_section("rocksdb_extended_statistics");
1440 string stat_str = dbstats->ToString();
1441 vector<string> stats;
1442 split_stats(stat_str, '\n', stats);
1443 f->dump_string("rocksdb_extended_statistics", "");
1444 for (auto st :stats) {
1445 f->dump_string(".", st);
1446 }
1447 f->close_section();
1448 }
1449 f->open_object_section("rocksdbstore_perf_counters");
1450 logger->dump_formatted(f, false, false);
1451 f->close_section();
1452 }
1453 if (cct->_conf->rocksdb_collect_memory_stats) {
1454 f->open_object_section("rocksdb_memtable_statistics");
1455 std::string str;
1456 if (!bbt_opts.no_block_cache) {
1457 str.append(stringify(bbt_opts.block_cache->GetUsage()));
1458 f->dump_string("block_cache_usage", str.data());
1459 str.clear();
1460 str.append(stringify(bbt_opts.block_cache->GetPinnedUsage()));
1461 f->dump_string("block_cache_pinned_blocks_usage", str);
1462 str.clear();
1463 }
1464 db->GetProperty("rocksdb.cur-size-all-mem-tables", &str);
1465 f->dump_string("rocksdb_memtable_usage", str);
1466 str.clear();
1467 db->GetProperty("rocksdb.estimate-table-readers-mem", &str);
1468 f->dump_string("rocksdb_index_filter_blocks_usage", str);
1469 f->close_section();
1470 }
1471 }
1472
1473 struct RocksDBStore::RocksWBHandler: public rocksdb::WriteBatch::Handler {
1474 RocksWBHandler(const RocksDBStore& db) : db(db) {}
1475 const RocksDBStore& db;
1476 std::stringstream seen;
1477 int num_seen = 0;
1478
1479 void dump(const char* op_name,
1480 uint32_t column_family_id,
1481 const rocksdb::Slice& key_in,
1482 const rocksdb::Slice* value = nullptr) {
1483 string prefix;
1484 string key;
1485 ssize_t size = value ? value->size() : -1;
1486 seen << std::endl << op_name << "(";
1487
1488 if (column_family_id == 0) {
1489 db.split_key(key_in, &prefix, &key);
1490 } else {
1491 auto it = db.cf_ids_to_prefix.find(column_family_id);
1492 ceph_assert(it != db.cf_ids_to_prefix.end());
1493 prefix = it->second;
1494 key = key_in.ToString();
1495 }
1496 seen << " prefix = " << prefix;
1497 seen << " key = " << pretty_binary_string(key);
1498 if (size != -1)
1499 seen << " value size = " << std::to_string(size);
1500 seen << ")";
1501 num_seen++;
1502 }
1503 void Put(const rocksdb::Slice& key,
1504 const rocksdb::Slice& value) override {
1505 dump("Put", 0, key, &value);
1506 }
1507 rocksdb::Status PutCF(uint32_t column_family_id, const rocksdb::Slice& key,
1508 const rocksdb::Slice& value) override {
1509 dump("PutCF", column_family_id, key, &value);
1510 return rocksdb::Status::OK();
1511 }
1512 void SingleDelete(const rocksdb::Slice& key) override {
1513 dump("SingleDelete", 0, key);
1514 }
1515 rocksdb::Status SingleDeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override {
1516 dump("SingleDeleteCF", column_family_id, key);
1517 return rocksdb::Status::OK();
1518 }
1519 void Delete(const rocksdb::Slice& key) override {
1520 dump("Delete", 0, key);
1521 }
1522 rocksdb::Status DeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override {
1523 dump("DeleteCF", column_family_id, key);
1524 return rocksdb::Status::OK();
1525 }
1526 void Merge(const rocksdb::Slice& key,
1527 const rocksdb::Slice& value) override {
1528 dump("Merge", 0, key, &value);
1529 }
1530 rocksdb::Status MergeCF(uint32_t column_family_id, const rocksdb::Slice& key,
1531 const rocksdb::Slice& value) override {
1532 dump("MergeCF", column_family_id, key, &value);
1533 return rocksdb::Status::OK();
1534 }
1535 bool Continue() override { return num_seen < 50; }
1536 };
1537
1538 int RocksDBStore::submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t)
1539 {
1540 // enable rocksdb breakdown
1541 // considering performance overhead, default is disabled
1542 if (cct->_conf->rocksdb_perf) {
1543 rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
1544 rocksdb::get_perf_context()->Reset();
1545 }
1546
1547 RocksDBTransactionImpl * _t =
1548 static_cast<RocksDBTransactionImpl *>(t.get());
1549 woptions.disableWAL = disableWAL;
1550 lgeneric_subdout(cct, rocksdb, 30) << __func__;
1551 RocksWBHandler bat_txc(*this);
1552 _t->bat.Iterate(&bat_txc);
1553 *_dout << " Rocksdb transaction: " << bat_txc.seen.str() << dendl;
1554
1555 rocksdb::Status s = db->Write(woptions, &_t->bat);
1556 if (!s.ok()) {
1557 RocksWBHandler rocks_txc(*this);
1558 _t->bat.Iterate(&rocks_txc);
1559 derr << __func__ << " error: " << s.ToString() << " code = " << s.code()
1560 << " Rocksdb transaction: " << rocks_txc.seen.str() << dendl;
1561 }
1562
1563 if (cct->_conf->rocksdb_perf) {
1564 utime_t write_memtable_time;
1565 utime_t write_delay_time;
1566 utime_t write_wal_time;
1567 utime_t write_pre_and_post_process_time;
1568 write_wal_time.set_from_double(
1569 static_cast<double>(rocksdb::get_perf_context()->write_wal_time)/1000000000);
1570 write_memtable_time.set_from_double(
1571 static_cast<double>(rocksdb::get_perf_context()->write_memtable_time)/1000000000);
1572 write_delay_time.set_from_double(
1573 static_cast<double>(rocksdb::get_perf_context()->write_delay_time)/1000000000);
1574 write_pre_and_post_process_time.set_from_double(
1575 static_cast<double>(rocksdb::get_perf_context()->write_pre_and_post_process_time)/1000000000);
1576 logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time);
1577 logger->tinc(l_rocksdb_write_delay_time, write_delay_time);
1578 logger->tinc(l_rocksdb_write_wal_time, write_wal_time);
1579 logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time);
1580 }
1581
1582 return s.ok() ? 0 : -1;
1583 }
1584
1585 int RocksDBStore::submit_transaction(KeyValueDB::Transaction t)
1586 {
1587 utime_t start = ceph_clock_now();
1588 rocksdb::WriteOptions woptions;
1589 woptions.sync = false;
1590
1591 int result = submit_common(woptions, t);
1592
1593 utime_t lat = ceph_clock_now() - start;
1594 logger->tinc(l_rocksdb_submit_latency, lat);
1595
1596 return result;
1597 }
1598
1599 int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t)
1600 {
1601 utime_t start = ceph_clock_now();
1602 rocksdb::WriteOptions woptions;
1603 // if disableWAL, sync can't set
1604 woptions.sync = !disableWAL;
1605
1606 int result = submit_common(woptions, t);
1607
1608 utime_t lat = ceph_clock_now() - start;
1609 logger->tinc(l_rocksdb_submit_sync_latency, lat);
1610
1611 return result;
1612 }
1613
1614 RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db)
1615 {
1616 db = _db;
1617 }
1618
1619 void RocksDBStore::RocksDBTransactionImpl::put_bat(
1620 rocksdb::WriteBatch& bat,
1621 rocksdb::ColumnFamilyHandle *cf,
1622 const string &key,
1623 const bufferlist &to_set_bl)
1624 {
1625 // bufferlist::c_str() is non-constant, so we can't call c_str()
1626 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
1627 bat.Put(cf,
1628 rocksdb::Slice(key),
1629 rocksdb::Slice(to_set_bl.buffers().front().c_str(),
1630 to_set_bl.length()));
1631 } else {
1632 rocksdb::Slice key_slice(key);
1633 vector<rocksdb::Slice> value_slices(to_set_bl.get_num_buffers());
1634 bat.Put(cf,
1635 rocksdb::SliceParts(&key_slice, 1),
1636 prepare_sliceparts(to_set_bl, &value_slices));
1637 }
1638 }
1639
1640 void RocksDBStore::RocksDBTransactionImpl::set(
1641 const string &prefix,
1642 const string &k,
1643 const bufferlist &to_set_bl)
1644 {
1645 auto cf = db->get_cf_handle(prefix, k);
1646 if (cf) {
1647 put_bat(bat, cf, k, to_set_bl);
1648 } else {
1649 string key = combine_strings(prefix, k);
1650 put_bat(bat, db->default_cf, key, to_set_bl);
1651 }
1652 }
1653
1654 void RocksDBStore::RocksDBTransactionImpl::set(
1655 const string &prefix,
1656 const char *k, size_t keylen,
1657 const bufferlist &to_set_bl)
1658 {
1659 auto cf = db->get_cf_handle(prefix, k, keylen);
1660 if (cf) {
1661 string key(k, keylen); // fixme?
1662 put_bat(bat, cf, key, to_set_bl);
1663 } else {
1664 string key;
1665 combine_strings(prefix, k, keylen, &key);
1666 put_bat(bat, cf, key, to_set_bl);
1667 }
1668 }
1669
1670 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
1671 const string &k)
1672 {
1673 auto cf = db->get_cf_handle(prefix, k);
1674 if (cf) {
1675 bat.Delete(cf, rocksdb::Slice(k));
1676 } else {
1677 bat.Delete(db->default_cf, combine_strings(prefix, k));
1678 }
1679 }
1680
1681 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
1682 const char *k,
1683 size_t keylen)
1684 {
1685 auto cf = db->get_cf_handle(prefix, k, keylen);
1686 if (cf) {
1687 bat.Delete(cf, rocksdb::Slice(k, keylen));
1688 } else {
1689 string key;
1690 combine_strings(prefix, k, keylen, &key);
1691 bat.Delete(db->default_cf, rocksdb::Slice(key));
1692 }
1693 }
1694
1695 void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix,
1696 const string &k)
1697 {
1698 auto cf = db->get_cf_handle(prefix, k);
1699 if (cf) {
1700 bat.SingleDelete(cf, k);
1701 } else {
1702 bat.SingleDelete(db->default_cf, combine_strings(prefix, k));
1703 }
1704 }
1705
1706 void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
1707 {
1708 auto p_iter = db->cf_handles.find(prefix);
1709 if (p_iter == db->cf_handles.end()) {
1710 uint64_t cnt = db->get_delete_range_threshold();
1711 bat.SetSavePoint();
1712 auto it = db->get_iterator(prefix);
1713 for (it->seek_to_first(); it->valid() && (--cnt) != 0; it->next()) {
1714 bat.Delete(db->default_cf, combine_strings(prefix, it->key()));
1715 }
1716 if (cnt == 0) {
1717 bat.RollbackToSavePoint();
1718 string endprefix = prefix;
1719 endprefix.push_back('\x01');
1720 bat.DeleteRange(db->default_cf,
1721 combine_strings(prefix, string()),
1722 combine_strings(endprefix, string()));
1723 } else {
1724 bat.PopSavePoint();
1725 }
1726 } else {
1727 ceph_assert(p_iter->second.handles.size() >= 1);
1728 for (auto cf : p_iter->second.handles) {
1729 uint64_t cnt = db->get_delete_range_threshold();
1730 bat.SetSavePoint();
1731 auto it = db->new_shard_iterator(cf);
1732 for (it->seek_to_first(); it->valid() && (--cnt) != 0; it->next()) {
1733 bat.Delete(cf, it->key());
1734 }
1735 if (cnt == 0) {
1736 bat.RollbackToSavePoint();
1737 string endprefix = "\xff\xff\xff\xff"; // FIXME: this is cheating...
1738 bat.DeleteRange(cf, string(), endprefix);
1739 } else {
1740 bat.PopSavePoint();
1741 }
1742 }
1743 }
1744 }
1745
1746 void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix,
1747 const string &start,
1748 const string &end)
1749 {
1750 ldout(db->cct, 10) << __func__
1751 << " enter prefix=" << prefix
1752 << " start=" << pretty_binary_string(start)
1753 << " end=" << pretty_binary_string(end) << dendl;
1754 auto p_iter = db->cf_handles.find(prefix);
1755 uint64_t cnt = db->get_delete_range_threshold();
1756 if (p_iter == db->cf_handles.end()) {
1757 uint64_t cnt0 = cnt;
1758 bat.SetSavePoint();
1759 auto it = db->get_iterator(prefix);
1760 for (it->lower_bound(start);
1761 it->valid() && db->comparator->Compare(it->key(), end) < 0 && (--cnt) != 0;
1762 it->next()) {
1763 bat.Delete(db->default_cf, combine_strings(prefix, it->key()));
1764 }
1765 ldout(db->cct, 15) << __func__
1766 << " count = " << cnt0 - cnt
1767 << dendl;
1768 if (cnt == 0) {
1769 ldout(db->cct, 10) << __func__ << " p_iter == end(), resorting to DeleteRange"
1770 << dendl;
1771 bat.RollbackToSavePoint();
1772 bat.DeleteRange(db->default_cf,
1773 rocksdb::Slice(combine_strings(prefix, start)),
1774 rocksdb::Slice(combine_strings(prefix, end)));
1775 } else {
1776 bat.PopSavePoint();
1777 }
1778 } else if (cnt == 0) {
1779 ceph_assert(p_iter->second.handles.size() >= 1);
1780 for (auto cf : p_iter->second.handles) {
1781 ldout(db->cct, 10) << __func__ << " p_iter != end(), resorting to DeleteRange"
1782 << dendl;
1783 bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end));
1784 }
1785 } else {
1786 auto bounds = KeyValueDB::IteratorBounds();
1787 bounds.lower_bound = start;
1788 bounds.upper_bound = end;
1789 ceph_assert(p_iter->second.handles.size() >= 1);
1790 for (auto cf : p_iter->second.handles) {
1791 cnt = db->get_delete_range_threshold();
1792 uint64_t cnt0 = cnt;
1793 bat.SetSavePoint();
1794 auto it = db->new_shard_iterator(cf, prefix, bounds);
1795 for (it->lower_bound(start);
1796 it->valid() && (--cnt) != 0;
1797 it->next()) {
1798 bat.Delete(cf, it->key());
1799 }
1800 ldout(db->cct, 10) << __func__
1801 << " count = " << cnt0 - cnt
1802 << dendl;
1803 if (cnt == 0) {
1804 ldout(db->cct, 10) << __func__ << " p_iter != end(), resorting to DeleteRange"
1805 << dendl;
1806 bat.RollbackToSavePoint();
1807 bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end));
1808 } else {
1809 bat.PopSavePoint();
1810 }
1811 }
1812 }
1813 ldout(db->cct, 10) << __func__ << " end" << dendl;
1814 }
1815
1816 void RocksDBStore::RocksDBTransactionImpl::merge(
1817 const string &prefix,
1818 const string &k,
1819 const bufferlist &to_set_bl)
1820 {
1821 auto cf = db->get_cf_handle(prefix, k);
1822 if (cf) {
1823 // bufferlist::c_str() is non-constant, so we can't call c_str()
1824 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
1825 bat.Merge(
1826 cf,
1827 rocksdb::Slice(k),
1828 rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length()));
1829 } else {
1830 // make a copy
1831 rocksdb::Slice key_slice(k);
1832 vector<rocksdb::Slice> value_slices(to_set_bl.get_num_buffers());
1833 bat.Merge(cf, rocksdb::SliceParts(&key_slice, 1),
1834 prepare_sliceparts(to_set_bl, &value_slices));
1835 }
1836 } else {
1837 string key = combine_strings(prefix, k);
1838 // bufferlist::c_str() is non-constant, so we can't call c_str()
1839 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
1840 bat.Merge(
1841 db->default_cf,
1842 rocksdb::Slice(key),
1843 rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length()));
1844 } else {
1845 // make a copy
1846 rocksdb::Slice key_slice(key);
1847 vector<rocksdb::Slice> value_slices(to_set_bl.get_num_buffers());
1848 bat.Merge(
1849 db->default_cf,
1850 rocksdb::SliceParts(&key_slice, 1),
1851 prepare_sliceparts(to_set_bl, &value_slices));
1852 }
1853 }
1854 }
1855
1856 int RocksDBStore::get(
1857 const string &prefix,
1858 const std::set<string> &keys,
1859 std::map<string, bufferlist> *out)
1860 {
1861 rocksdb::PinnableSlice value;
1862 utime_t start = ceph_clock_now();
1863 if (cf_handles.count(prefix) > 0) {
1864 for (auto& key : keys) {
1865 auto cf_handle = get_cf_handle(prefix, key);
1866 auto status = db->Get(rocksdb::ReadOptions(),
1867 cf_handle,
1868 rocksdb::Slice(key),
1869 &value);
1870 if (status.ok()) {
1871 (*out)[key].append(value.data(), value.size());
1872 } else if (status.IsIOError()) {
1873 ceph_abort_msg(status.getState());
1874 }
1875 value.Reset();
1876 }
1877 } else {
1878 for (auto& key : keys) {
1879 string k = combine_strings(prefix, key);
1880 auto status = db->Get(rocksdb::ReadOptions(),
1881 default_cf,
1882 rocksdb::Slice(k),
1883 &value);
1884 if (status.ok()) {
1885 (*out)[key].append(value.data(), value.size());
1886 } else if (status.IsIOError()) {
1887 ceph_abort_msg(status.getState());
1888 }
1889 value.Reset();
1890 }
1891 }
1892 utime_t lat = ceph_clock_now() - start;
1893 logger->tinc(l_rocksdb_get_latency, lat);
1894 return 0;
1895 }
1896
1897 int RocksDBStore::get(
1898 const string &prefix,
1899 const string &key,
1900 bufferlist *out)
1901 {
1902 ceph_assert(out && (out->length() == 0));
1903 utime_t start = ceph_clock_now();
1904 int r = 0;
1905 rocksdb::PinnableSlice value;
1906 rocksdb::Status s;
1907 auto cf = get_cf_handle(prefix, key);
1908 if (cf) {
1909 s = db->Get(rocksdb::ReadOptions(),
1910 cf,
1911 rocksdb::Slice(key),
1912 &value);
1913 } else {
1914 string k = combine_strings(prefix, key);
1915 s = db->Get(rocksdb::ReadOptions(),
1916 default_cf,
1917 rocksdb::Slice(k),
1918 &value);
1919 }
1920 if (s.ok()) {
1921 out->append(value.data(), value.size());
1922 } else if (s.IsNotFound()) {
1923 r = -ENOENT;
1924 } else {
1925 ceph_abort_msg(s.getState());
1926 }
1927 utime_t lat = ceph_clock_now() - start;
1928 logger->tinc(l_rocksdb_get_latency, lat);
1929 return r;
1930 }
1931
1932 int RocksDBStore::get(
1933 const string& prefix,
1934 const char *key,
1935 size_t keylen,
1936 bufferlist *out)
1937 {
1938 ceph_assert(out && (out->length() == 0));
1939 utime_t start = ceph_clock_now();
1940 int r = 0;
1941 rocksdb::PinnableSlice value;
1942 rocksdb::Status s;
1943 auto cf = get_cf_handle(prefix, key, keylen);
1944 if (cf) {
1945 s = db->Get(rocksdb::ReadOptions(),
1946 cf,
1947 rocksdb::Slice(key, keylen),
1948 &value);
1949 } else {
1950 string k;
1951 combine_strings(prefix, key, keylen, &k);
1952 s = db->Get(rocksdb::ReadOptions(),
1953 default_cf,
1954 rocksdb::Slice(k),
1955 &value);
1956 }
1957 if (s.ok()) {
1958 out->append(value.data(), value.size());
1959 } else if (s.IsNotFound()) {
1960 r = -ENOENT;
1961 } else {
1962 ceph_abort_msg(s.getState());
1963 }
1964 utime_t lat = ceph_clock_now() - start;
1965 logger->tinc(l_rocksdb_get_latency, lat);
1966 return r;
1967 }
1968
1969 int RocksDBStore::split_key(rocksdb::Slice in, string *prefix, string *key)
1970 {
1971 size_t prefix_len = 0;
1972
1973 // Find separator inside Slice
1974 char* separator = (char*) memchr(in.data(), 0, in.size());
1975 if (separator == NULL)
1976 return -EINVAL;
1977 prefix_len = size_t(separator - in.data());
1978 if (prefix_len >= in.size())
1979 return -EINVAL;
1980
1981 // Fetch prefix and/or key directly from Slice
1982 if (prefix)
1983 *prefix = string(in.data(), prefix_len);
1984 if (key)
1985 *key = string(separator+1, in.size()-prefix_len-1);
1986 return 0;
1987 }
1988
1989 void RocksDBStore::compact()
1990 {
1991 logger->inc(l_rocksdb_compact);
1992 rocksdb::CompactRangeOptions options;
1993 db->CompactRange(options, default_cf, nullptr, nullptr);
1994 for (auto cf : cf_handles) {
1995 for (auto shard_cf : cf.second.handles) {
1996 db->CompactRange(
1997 options,
1998 shard_cf,
1999 nullptr, nullptr);
2000 }
2001 }
2002 }
2003
2004 void RocksDBStore::compact_thread_entry()
2005 {
2006 std::unique_lock l{compact_queue_lock};
2007 dout(10) << __func__ << " enter" << dendl;
2008 while (!compact_queue_stop) {
2009 if (!compact_queue.empty()) {
2010 auto range = compact_queue.front();
2011 compact_queue.pop_front();
2012 logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
2013 l.unlock();
2014 logger->inc(l_rocksdb_compact_range);
2015 if (range.first.empty() && range.second.empty()) {
2016 compact();
2017 } else {
2018 compact_range(range.first, range.second);
2019 }
2020 l.lock();
2021 continue;
2022 }
2023 dout(10) << __func__ << " waiting" << dendl;
2024 compact_queue_cond.wait(l);
2025 }
2026 dout(10) << __func__ << " exit" << dendl;
2027 }
2028
2029 void RocksDBStore::compact_range_async(const string& start, const string& end)
2030 {
2031 std::lock_guard l(compact_queue_lock);
2032
2033 // try to merge adjacent ranges. this is O(n), but the queue should
2034 // be short. note that we do not cover all overlap cases and merge
2035 // opportunities here, but we capture the ones we currently need.
2036 list< pair<string,string> >::iterator p = compact_queue.begin();
2037 while (p != compact_queue.end()) {
2038 if (p->first == start && p->second == end) {
2039 // dup; no-op
2040 return;
2041 }
2042 if (start <= p->first && p->first <= end) {
2043 // new region crosses start of existing range
2044 // select right bound that is bigger
2045 compact_queue.push_back(make_pair(start, end > p->second ? end : p->second));
2046 compact_queue.erase(p);
2047 logger->inc(l_rocksdb_compact_queue_merge);
2048 break;
2049 }
2050 if (start <= p->second && p->second <= end) {
2051 // new region crosses end of existing range
2052 //p->first < p->second and p->second <= end, so p->first <= end.
2053 //But we break if previous condition, so start > p->first.
2054 compact_queue.push_back(make_pair(p->first, end));
2055 compact_queue.erase(p);
2056 logger->inc(l_rocksdb_compact_queue_merge);
2057 break;
2058 }
2059 ++p;
2060 }
2061 if (p == compact_queue.end()) {
2062 // no merge, new entry.
2063 compact_queue.push_back(make_pair(start, end));
2064 logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
2065 }
2066 compact_queue_cond.notify_all();
2067 if (!compact_thread.is_started()) {
2068 compact_thread.create("rstore_compact");
2069 }
2070 }
2071 bool RocksDBStore::check_omap_dir(string &omap_dir)
2072 {
2073 rocksdb::Options options;
2074 options.create_if_missing = true;
2075 rocksdb::DB *db;
2076 rocksdb::Status status = rocksdb::DB::Open(options, omap_dir, &db);
2077 delete db;
2078 db = nullptr;
2079 return status.ok();
2080 }
2081
2082 void RocksDBStore::compact_range(const string& start, const string& end)
2083 {
2084 rocksdb::CompactRangeOptions options;
2085 rocksdb::Slice cstart(start);
2086 rocksdb::Slice cend(end);
2087 string prefix_start, key_start;
2088 string prefix_end, key_end;
2089 string key_highest = "\xff\xff\xff\xff"; //cheating
2090 string key_lowest = "";
2091
2092 auto compact_range = [&] (const decltype(cf_handles)::iterator column_it,
2093 const std::string& start,
2094 const std::string& end) {
2095 rocksdb::Slice cstart(start);
2096 rocksdb::Slice cend(end);
2097 for (const auto& shard_it : column_it->second.handles) {
2098 db->CompactRange(options, shard_it, &cstart, &cend);
2099 }
2100 };
2101 db->CompactRange(options, default_cf, &cstart, &cend);
2102 split_key(cstart, &prefix_start, &key_start);
2103 split_key(cend, &prefix_end, &key_end);
2104 if (prefix_start == prefix_end) {
2105 const auto& column = cf_handles.find(prefix_start);
2106 if (column != cf_handles.end()) {
2107 compact_range(column, key_start, key_end);
2108 }
2109 } else {
2110 auto column = cf_handles.find(prefix_start);
2111 if (column != cf_handles.end()) {
2112 compact_range(column, key_start, key_highest);
2113 ++column;
2114 }
2115 const auto& column_end = cf_handles.find(prefix_end);
2116 while (column != column_end) {
2117 compact_range(column, key_lowest, key_highest);
2118 column++;
2119 }
2120 if (column != cf_handles.end()) {
2121 compact_range(column, key_lowest, key_end);
2122 }
2123 }
2124 }
2125
2126 RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl()
2127 {
2128 delete dbiter;
2129 }
2130 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first()
2131 {
2132 dbiter->SeekToFirst();
2133 ceph_assert(!dbiter->status().IsIOError());
2134 return dbiter->status().ok() ? 0 : -1;
2135 }
2136 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string &prefix)
2137 {
2138 rocksdb::Slice slice_prefix(prefix);
2139 dbiter->Seek(slice_prefix);
2140 ceph_assert(!dbiter->status().IsIOError());
2141 return dbiter->status().ok() ? 0 : -1;
2142 }
2143 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last()
2144 {
2145 dbiter->SeekToLast();
2146 ceph_assert(!dbiter->status().IsIOError());
2147 return dbiter->status().ok() ? 0 : -1;
2148 }
2149 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string &prefix)
2150 {
2151 string limit = past_prefix(prefix);
2152 rocksdb::Slice slice_limit(limit);
2153 dbiter->Seek(slice_limit);
2154
2155 if (!dbiter->Valid()) {
2156 dbiter->SeekToLast();
2157 } else {
2158 dbiter->Prev();
2159 }
2160 return dbiter->status().ok() ? 0 : -1;
2161 }
2162 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after)
2163 {
2164 lower_bound(prefix, after);
2165 if (valid()) {
2166 pair<string,string> key = raw_key();
2167 if (key.first == prefix && key.second == after)
2168 next();
2169 }
2170 return dbiter->status().ok() ? 0 : -1;
2171 }
2172 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to)
2173 {
2174 string bound = combine_strings(prefix, to);
2175 rocksdb::Slice slice_bound(bound);
2176 dbiter->Seek(slice_bound);
2177 return dbiter->status().ok() ? 0 : -1;
2178 }
2179 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid()
2180 {
2181 return dbiter->Valid();
2182 }
2183 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next()
2184 {
2185 if (valid()) {
2186 dbiter->Next();
2187 }
2188 ceph_assert(!dbiter->status().IsIOError());
2189 return dbiter->status().ok() ? 0 : -1;
2190 }
2191 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev()
2192 {
2193 if (valid()) {
2194 dbiter->Prev();
2195 }
2196 ceph_assert(!dbiter->status().IsIOError());
2197 return dbiter->status().ok() ? 0 : -1;
2198 }
2199 string RocksDBStore::RocksDBWholeSpaceIteratorImpl::key()
2200 {
2201 string out_key;
2202 split_key(dbiter->key(), 0, &out_key);
2203 return out_key;
2204 }
2205 pair<string,string> RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key()
2206 {
2207 string prefix, key;
2208 split_key(dbiter->key(), &prefix, &key);
2209 return make_pair(prefix, key);
2210 }
2211
2212 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key_is_prefixed(const string &prefix) {
2213 // Look for "prefix\0" right in rocksb::Slice
2214 rocksdb::Slice key = dbiter->key();
2215 if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) {
2216 return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0;
2217 } else {
2218 return false;
2219 }
2220 }
2221
2222 bufferlist RocksDBStore::RocksDBWholeSpaceIteratorImpl::value()
2223 {
2224 return to_bufferlist(dbiter->value());
2225 }
2226
2227 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::key_size()
2228 {
2229 return dbiter->key().size();
2230 }
2231
2232 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_size()
2233 {
2234 return dbiter->value().size();
2235 }
2236
2237 bufferptr RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_as_ptr()
2238 {
2239 rocksdb::Slice val = dbiter->value();
2240 return bufferptr(val.data(), val.size());
2241 }
2242
2243 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status()
2244 {
2245 return dbiter->status().ok() ? 0 : -1;
2246 }
2247
2248 string RocksDBStore::past_prefix(const string &prefix)
2249 {
2250 string limit = prefix;
2251 limit.push_back(1);
2252 return limit;
2253 }
2254
2255 class CFIteratorImpl : public KeyValueDB::IteratorImpl {
2256 protected:
2257 string prefix;
2258 rocksdb::Iterator *dbiter;
2259 const KeyValueDB::IteratorBounds bounds;
2260 const rocksdb::Slice iterate_lower_bound;
2261 const rocksdb::Slice iterate_upper_bound;
2262 public:
2263 explicit CFIteratorImpl(const RocksDBStore* db,
2264 const std::string& p,
2265 rocksdb::ColumnFamilyHandle* cf,
2266 KeyValueDB::IteratorBounds bounds_)
2267 : prefix(p), bounds(std::move(bounds_)),
2268 iterate_lower_bound(make_slice(bounds.lower_bound)),
2269 iterate_upper_bound(make_slice(bounds.upper_bound))
2270 {
2271 auto options = rocksdb::ReadOptions();
2272 if (db->cct->_conf->osd_rocksdb_iterator_bounds_enabled) {
2273 if (bounds.lower_bound) {
2274 options.iterate_lower_bound = &iterate_lower_bound;
2275 }
2276 if (bounds.upper_bound) {
2277 options.iterate_upper_bound = &iterate_upper_bound;
2278 }
2279 }
2280 dbiter = db->db->NewIterator(options, cf);
2281 }
2282 ~CFIteratorImpl() {
2283 delete dbiter;
2284 }
2285
2286 int seek_to_first() override {
2287 dbiter->SeekToFirst();
2288 return dbiter->status().ok() ? 0 : -1;
2289 }
2290 int seek_to_last() override {
2291 dbiter->SeekToLast();
2292 return dbiter->status().ok() ? 0 : -1;
2293 }
2294 int upper_bound(const string &after) override {
2295 lower_bound(after);
2296 if (valid() && (key() == after)) {
2297 next();
2298 }
2299 return dbiter->status().ok() ? 0 : -1;
2300 }
2301 int lower_bound(const string &to) override {
2302 rocksdb::Slice slice_bound(to);
2303 dbiter->Seek(slice_bound);
2304 return dbiter->status().ok() ? 0 : -1;
2305 }
2306 int next() override {
2307 if (valid()) {
2308 dbiter->Next();
2309 }
2310 return dbiter->status().ok() ? 0 : -1;
2311 }
2312 int prev() override {
2313 if (valid()) {
2314 dbiter->Prev();
2315 }
2316 return dbiter->status().ok() ? 0 : -1;
2317 }
2318 bool valid() override {
2319 return dbiter->Valid();
2320 }
2321 string key() override {
2322 return dbiter->key().ToString();
2323 }
2324 std::pair<std::string, std::string> raw_key() override {
2325 return make_pair(prefix, key());
2326 }
2327 bufferlist value() override {
2328 return to_bufferlist(dbiter->value());
2329 }
2330 bufferptr value_as_ptr() override {
2331 rocksdb::Slice val = dbiter->value();
2332 return bufferptr(val.data(), val.size());
2333 }
2334 int status() override {
2335 return dbiter->status().ok() ? 0 : -1;
2336 }
2337 };
2338
2339
2340 //merge column iterators and rest iterator
2341 class WholeMergeIteratorImpl : public KeyValueDB::WholeSpaceIteratorImpl {
2342 private:
2343 RocksDBStore* db;
2344 KeyValueDB::WholeSpaceIterator main;
2345 std::map<std::string, KeyValueDB::Iterator> shards;
2346 std::map<std::string, KeyValueDB::Iterator>::iterator current_shard;
2347 enum {on_main, on_shard} smaller;
2348
2349 public:
2350 WholeMergeIteratorImpl(RocksDBStore* db)
2351 : db(db)
2352 , main(db->get_default_cf_iterator())
2353 {
2354 for (auto& e : db->cf_handles) {
2355 shards.emplace(e.first, db->get_iterator(e.first));
2356 }
2357 }
2358
2359 // returns true if value in main is smaller then in shards
2360 // invalid is larger then actual value
2361 bool is_main_smaller() {
2362 if (main->valid()) {
2363 if (current_shard != shards.end()) {
2364 auto main_rk = main->raw_key();
2365 ceph_assert(current_shard->second->valid());
2366 auto shards_rk = current_shard->second->raw_key();
2367 if (main_rk.first < shards_rk.first)
2368 return true;
2369 if (main_rk.first > shards_rk.first)
2370 return false;
2371 return main_rk.second < shards_rk.second;
2372 } else {
2373 return true;
2374 }
2375 } else {
2376 if (current_shard != shards.end()) {
2377 return false;
2378 } else {
2379 //this means that neither is valid
2380 //we select main to be smaller, so valid() will signal properly
2381 return true;
2382 }
2383 }
2384 }
2385
2386 int seek_to_first() override {
2387 int r0 = main->seek_to_first();
2388 int r1 = 0;
2389 // find first shard that has some data
2390 current_shard = shards.begin();
2391 while (current_shard != shards.end()) {
2392 r1 = current_shard->second->seek_to_first();
2393 if (r1 != 0 || current_shard->second->valid()) {
2394 //this is the first shard that will yield some keys
2395 break;
2396 }
2397 ++current_shard;
2398 }
2399 smaller = is_main_smaller() ? on_main : on_shard;
2400 return r0 == 0 && r1 == 0 ? 0 : -1;
2401 }
2402
2403 int seek_to_first(const std::string &prefix) override {
2404 int r0 = main->seek_to_first(prefix);
2405 int r1 = 0;
2406 // find first shard that has some data
2407 current_shard = shards.lower_bound(prefix);
2408 while (current_shard != shards.end()) {
2409 r1 = current_shard->second->seek_to_first();
2410 if (r1 != 0 || current_shard->second->valid()) {
2411 //this is the first shard that will yield some keys
2412 break;
2413 }
2414 ++current_shard;
2415 }
2416 smaller = is_main_smaller() ? on_main : on_shard;
2417 return r0 == 0 && r1 == 0 ? 0 : -1;
2418 };
2419
2420 int seek_to_last() override {
2421 int r0 = main->seek_to_last();
2422 int r1 = 0;
2423 r1 = shards_seek_to_last();
2424 //if we have 2 candidates, we need to select
2425 if (main->valid()) {
2426 if (shards_valid()) {
2427 if (is_main_smaller()) {
2428 smaller = on_shard;
2429 main->next();
2430 } else {
2431 smaller = on_main;
2432 shards_next();
2433 }
2434 } else {
2435 smaller = on_main;
2436 }
2437 } else {
2438 if (shards_valid()) {
2439 smaller = on_shard;
2440 } else {
2441 smaller = on_main;
2442 }
2443 }
2444 return r0 == 0 && r1 == 0 ? 0 : -1;
2445 }
2446
2447 int seek_to_last(const std::string &prefix) override {
2448 int r0 = main->seek_to_last(prefix);
2449 int r1 = 0;
2450 // find last shard that has some data
2451 bool found = false;
2452 current_shard = shards.lower_bound(prefix);
2453 while (current_shard != shards.begin()) {
2454 r1 = current_shard->second->seek_to_last();
2455 if (r1 != 0)
2456 break;
2457 if (current_shard->second->valid()) {
2458 found = true;
2459 break;
2460 }
2461 }
2462 //if we have 2 candidates, we need to select
2463 if (main->valid() && found) {
2464 if (is_main_smaller()) {
2465 main->next();
2466 } else {
2467 shards_next();
2468 }
2469 }
2470 if (!found) {
2471 //set shards state that properly represents eof
2472 current_shard = shards.end();
2473 }
2474 smaller = is_main_smaller() ? on_main : on_shard;
2475 return r0 == 0 && r1 == 0 ? 0 : -1;
2476 }
2477
2478 int upper_bound(const std::string &prefix, const std::string &after) override {
2479 int r0 = main->upper_bound(prefix, after);
2480 int r1 = 0;
2481 if (r0 != 0)
2482 return r0;
2483 current_shard = shards.lower_bound(prefix);
2484 if (current_shard != shards.end()) {
2485 bool located = false;
2486 if (current_shard->first == prefix) {
2487 r1 = current_shard->second->upper_bound(after);
2488 if (r1 != 0)
2489 return r1;
2490 if (current_shard->second->valid()) {
2491 located = true;
2492 }
2493 }
2494 if (!located) {
2495 while (current_shard != shards.end()) {
2496 r1 = current_shard->second->seek_to_first();
2497 if (r1 != 0)
2498 return r1;
2499 if (current_shard->second->valid())
2500 break;
2501 ++current_shard;
2502 }
2503 }
2504 }
2505 smaller = is_main_smaller() ? on_main : on_shard;
2506 return 0;
2507 }
2508
2509 int lower_bound(const std::string &prefix, const std::string &to) override {
2510 int r0 = main->lower_bound(prefix, to);
2511 int r1 = 0;
2512 if (r0 != 0)
2513 return r0;
2514 current_shard = shards.lower_bound(prefix);
2515 if (current_shard != shards.end()) {
2516 bool located = false;
2517 if (current_shard->first == prefix) {
2518 r1 = current_shard->second->lower_bound(to);
2519 if (r1 != 0)
2520 return r1;
2521 if (current_shard->second->valid()) {
2522 located = true;
2523 }
2524 }
2525 if (!located) {
2526 while (current_shard != shards.end()) {
2527 r1 = current_shard->second->seek_to_first();
2528 if (r1 != 0)
2529 return r1;
2530 if (current_shard->second->valid())
2531 break;
2532 ++current_shard;
2533 }
2534 }
2535 }
2536 smaller = is_main_smaller() ? on_main : on_shard;
2537 return 0;
2538 }
2539
2540 bool valid() override {
2541 if (smaller == on_main) {
2542 return main->valid();
2543 } else {
2544 if (current_shard == shards.end())
2545 return false;
2546 return current_shard->second->valid();
2547 }
2548 };
2549
2550 int next() override {
2551 int r;
2552 if (smaller == on_main) {
2553 r = main->next();
2554 } else {
2555 r = shards_next();
2556 }
2557 if (r != 0)
2558 return r;
2559 smaller = is_main_smaller() ? on_main : on_shard;
2560 return 0;
2561 }
2562
2563 int prev() override {
2564 int r;
2565 bool main_was_valid = false;
2566 if (main->valid()) {
2567 main_was_valid = true;
2568 r = main->prev();
2569 } else {
2570 r = main->seek_to_last();
2571 }
2572 if (r != 0)
2573 return r;
2574
2575 bool shards_was_valid = false;
2576 if (shards_valid()) {
2577 shards_was_valid = true;
2578 r = shards_prev();
2579 } else {
2580 r = shards_seek_to_last();
2581 }
2582 if (r != 0)
2583 return r;
2584
2585 if (!main->valid() && !shards_valid()) {
2586 //end, no previous. set marker so valid() can work
2587 smaller = on_main;
2588 return 0;
2589 }
2590
2591 //if 1 is valid, select it
2592 //if 2 are valid select larger and advance the other
2593 if (main->valid()) {
2594 if (shards_valid()) {
2595 if (is_main_smaller()) {
2596 smaller = on_shard;
2597 if (main_was_valid) {
2598 if (main->valid()) {
2599 r = main->next();
2600 } else {
2601 r = main->seek_to_first();
2602 }
2603 } else {
2604 //if we have resurrected main, kill it
2605 if (main->valid()) {
2606 main->next();
2607 }
2608 }
2609 } else {
2610 smaller = on_main;
2611 if (shards_was_valid) {
2612 if (shards_valid()) {
2613 r = shards_next();
2614 } else {
2615 r = shards_seek_to_first();
2616 }
2617 } else {
2618 //if we have resurected shards, kill it
2619 if (shards_valid()) {
2620 shards_next();
2621 }
2622 }
2623 }
2624 } else {
2625 smaller = on_main;
2626 r = shards_seek_to_first();
2627 }
2628 } else {
2629 smaller = on_shard;
2630 r = main->seek_to_first();
2631 }
2632 return r;
2633 }
2634
2635 std::string key() override
2636 {
2637 if (smaller == on_main) {
2638 return main->key();
2639 } else {
2640 return current_shard->second->key();
2641 }
2642 }
2643
2644 std::pair<std::string,std::string> raw_key() override
2645 {
2646 if (smaller == on_main) {
2647 return main->raw_key();
2648 } else {
2649 return { current_shard->first, current_shard->second->key() };
2650 }
2651 }
2652
2653 bool raw_key_is_prefixed(const std::string &prefix) override
2654 {
2655 if (smaller == on_main) {
2656 return main->raw_key_is_prefixed(prefix);
2657 } else {
2658 return current_shard->first == prefix;
2659 }
2660 }
2661
2662 ceph::buffer::list value() override
2663 {
2664 if (smaller == on_main) {
2665 return main->value();
2666 } else {
2667 return current_shard->second->value();
2668 }
2669 }
2670
2671 int status() override
2672 {
2673 //because we already had to inspect key, it must be ok
2674 return 0;
2675 }
2676
2677 size_t key_size() override
2678 {
2679 if (smaller == on_main) {
2680 return main->key_size();
2681 } else {
2682 return current_shard->second->key().size();
2683 }
2684 }
2685 size_t value_size() override
2686 {
2687 if (smaller == on_main) {
2688 return main->value_size();
2689 } else {
2690 return current_shard->second->value().length();
2691 }
2692 }
2693
2694 int shards_valid() {
2695 if (current_shard == shards.end())
2696 return false;
2697 return current_shard->second->valid();
2698 }
2699
2700 int shards_next() {
2701 if (current_shard == shards.end()) {
2702 //illegal to next() on !valid()
2703 return -1;
2704 }
2705 int r = 0;
2706 r = current_shard->second->next();
2707 if (r != 0)
2708 return r;
2709 if (current_shard->second->valid())
2710 return 0;
2711 //current shard exhaused, search for key
2712 ++current_shard;
2713 while (current_shard != shards.end()) {
2714 r = current_shard->second->seek_to_first();
2715 if (r != 0)
2716 return r;
2717 if (current_shard->second->valid())
2718 break;
2719 ++current_shard;
2720 }
2721 //either we found key or not, but it is success
2722 return 0;
2723 }
2724
2725 int shards_prev() {
2726 if (current_shard == shards.end()) {
2727 //illegal to prev() on !valid()
2728 return -1;
2729 }
2730 int r = current_shard->second->prev();
2731 while (r == 0) {
2732 if (current_shard->second->valid()) {
2733 break;
2734 }
2735 if (current_shard == shards.begin()) {
2736 //we have reached pre-first element
2737 //this makes it !valid(), but guarantees next() moves to first element
2738 break;
2739 }
2740 --current_shard;
2741 r = current_shard->second->seek_to_last();
2742 }
2743 return r;
2744 }
2745
2746 int shards_seek_to_last() {
2747 int r = 0;
2748 current_shard = shards.end();
2749 if (current_shard == shards.begin()) {
2750 //no shards at all
2751 return 0;
2752 }
2753 while (current_shard != shards.begin()) {
2754 --current_shard;
2755 r = current_shard->second->seek_to_last();
2756 if (r != 0)
2757 return r;
2758 if (current_shard->second->valid()) {
2759 return 0;
2760 }
2761 }
2762 //no keys at all
2763 current_shard = shards.end();
2764 return r;
2765 }
2766
2767 int shards_seek_to_first() {
2768 int r = 0;
2769 current_shard = shards.begin();
2770 while (current_shard != shards.end()) {
2771 r = current_shard->second->seek_to_first();
2772 if (r != 0)
2773 break;
2774 if (current_shard->second->valid()) {
2775 //this is the first shard that will yield some keys
2776 break;
2777 }
2778 ++current_shard;
2779 }
2780 return r;
2781 }
2782 };
2783
2784 class ShardMergeIteratorImpl : public KeyValueDB::IteratorImpl {
2785 private:
2786 struct KeyLess {
2787 private:
2788 const rocksdb::Comparator* comparator;
2789 public:
2790 KeyLess(const rocksdb::Comparator* comparator) : comparator(comparator) { };
2791
2792 bool operator()(rocksdb::Iterator* a, rocksdb::Iterator* b) const
2793 {
2794 if (a->Valid()) {
2795 if (b->Valid()) {
2796 return comparator->Compare(a->key(), b->key()) < 0;
2797 } else {
2798 return true;
2799 }
2800 } else {
2801 if (b->Valid()) {
2802 return false;
2803 } else {
2804 return false;
2805 }
2806 }
2807 }
2808 };
2809
2810 const RocksDBStore* db;
2811 KeyLess keyless;
2812 string prefix;
2813 const KeyValueDB::IteratorBounds bounds;
2814 const rocksdb::Slice iterate_lower_bound;
2815 const rocksdb::Slice iterate_upper_bound;
2816 std::vector<rocksdb::Iterator*> iters;
2817 public:
2818 explicit ShardMergeIteratorImpl(const RocksDBStore* db,
2819 const std::string& prefix,
2820 const std::vector<rocksdb::ColumnFamilyHandle*>& shards,
2821 KeyValueDB::IteratorBounds bounds_)
2822 : db(db), keyless(db->comparator), prefix(prefix), bounds(std::move(bounds_)),
2823 iterate_lower_bound(make_slice(bounds.lower_bound)),
2824 iterate_upper_bound(make_slice(bounds.upper_bound))
2825 {
2826 iters.reserve(shards.size());
2827 auto options = rocksdb::ReadOptions();
2828 if (db->cct->_conf->osd_rocksdb_iterator_bounds_enabled) {
2829 if (bounds.lower_bound) {
2830 options.iterate_lower_bound = &iterate_lower_bound;
2831 }
2832 if (bounds.upper_bound) {
2833 options.iterate_upper_bound = &iterate_upper_bound;
2834 }
2835 }
2836 for (auto& s : shards) {
2837 iters.push_back(db->db->NewIterator(options, s));
2838 }
2839 }
2840 ~ShardMergeIteratorImpl() {
2841 for (auto& it : iters) {
2842 delete it;
2843 }
2844 }
2845 int seek_to_first() override {
2846 for (auto& it : iters) {
2847 it->SeekToFirst();
2848 if (!it->status().ok()) {
2849 return -1;
2850 }
2851 }
2852 //all iterators seeked, sort
2853 std::sort(iters.begin(), iters.end(), keyless);
2854 return 0;
2855 }
2856 int seek_to_last() override {
2857 for (auto& it : iters) {
2858 it->SeekToLast();
2859 if (!it->status().ok()) {
2860 return -1;
2861 }
2862 }
2863 for (size_t i = 1; i < iters.size(); i++) {
2864 if (iters[0]->Valid()) {
2865 if (iters[i]->Valid()) {
2866 if (keyless(iters[0], iters[i])) {
2867 std::swap(iters[0], iters[i]);
2868 }
2869 } else {
2870 //iters[i] empty
2871 }
2872 } else {
2873 if (iters[i]->Valid()) {
2874 std::swap(iters[0], iters[i]);
2875 }
2876 }
2877 //it might happen that cf was empty
2878 if (iters[i]->Valid()) {
2879 iters[i]->Next();
2880 }
2881 }
2882 //no need to sort, as at most 1 iterator is valid now
2883 return 0;
2884 }
2885 int upper_bound(const string &after) override {
2886 rocksdb::Slice slice_bound(after);
2887 for (auto& it : iters) {
2888 it->Seek(slice_bound);
2889 if (it->Valid() && it->key() == after) {
2890 it->Next();
2891 }
2892 if (!it->status().ok()) {
2893 return -1;
2894 }
2895 }
2896 std::sort(iters.begin(), iters.end(), keyless);
2897 return 0;
2898 }
2899 int lower_bound(const string &to) override {
2900 rocksdb::Slice slice_bound(to);
2901 for (auto& it : iters) {
2902 it->Seek(slice_bound);
2903 if (!it->status().ok()) {
2904 return -1;
2905 }
2906 }
2907 std::sort(iters.begin(), iters.end(), keyless);
2908 return 0;
2909 }
2910 int next() override {
2911 int r = -1;
2912 if (iters[0]->Valid()) {
2913 iters[0]->Next();
2914 if (iters[0]->status().ok()) {
2915 r = 0;
2916 //bubble up
2917 for (size_t i = 0; i < iters.size() - 1; i++) {
2918 if (keyless(iters[i], iters[i + 1])) {
2919 //matches, fixed
2920 break;
2921 }
2922 std::swap(iters[i], iters[i + 1]);
2923 }
2924 }
2925 }
2926 return r;
2927 }
2928 // iters are sorted, so
2929 // a[0] < b[0] < c[0] < d[0]
2930 // a[0] > a[-1], a[0] > b[-1], a[0] > c[-1], a[0] > d[-1]
2931 // so, prev() will be one of:
2932 // a[-1], b[-1], c[-1], d[-1]
2933 // prev() will be the one that is *largest* of them
2934 //
2935 // alg:
2936 // 1. go prev() on each iterator we can
2937 // 2. select largest key from those iterators
2938 // 3. go next() on all iterators except (2)
2939 // 4. sort
2940 int prev() override {
2941 std::vector<rocksdb::Iterator*> prev_done;
2942 //1
2943 for (auto it: iters) {
2944 if (it->Valid()) {
2945 it->Prev();
2946 if (it->Valid()) {
2947 prev_done.push_back(it);
2948 } else {
2949 it->SeekToFirst();
2950 }
2951 } else {
2952 it->SeekToLast();
2953 if (it->Valid()) {
2954 prev_done.push_back(it);
2955 }
2956 }
2957 }
2958 if (prev_done.size() == 0) {
2959 /* there is no previous element */
2960 if (iters[0]->Valid()) {
2961 iters[0]->Prev();
2962 ceph_assert(!iters[0]->Valid());
2963 }
2964 return 0;
2965 }
2966 //2,3
2967 rocksdb::Iterator* highest = prev_done[0];
2968 for (size_t i = 1; i < prev_done.size(); i++) {
2969 if (keyless(highest, prev_done[i])) {
2970 highest->Next();
2971 highest = prev_done[i];
2972 } else {
2973 prev_done[i]->Next();
2974 }
2975 }
2976 //4
2977 //insert highest in the beginning, and shift values until we pick highest
2978 //untouched rest is sorted - we just prev()/next() them
2979 rocksdb::Iterator* hold = highest;
2980 for (size_t i = 0; i < iters.size(); i++) {
2981 std::swap(hold, iters[i]);
2982 if (hold == highest) break;
2983 }
2984 ceph_assert(hold == highest);
2985 return 0;
2986 }
2987 bool valid() override {
2988 return iters[0]->Valid();
2989 }
2990 string key() override {
2991 return iters[0]->key().ToString();
2992 }
2993 std::pair<std::string, std::string> raw_key() override {
2994 return make_pair(prefix, key());
2995 }
2996 bufferlist value() override {
2997 return to_bufferlist(iters[0]->value());
2998 }
2999 bufferptr value_as_ptr() override {
3000 rocksdb::Slice val = iters[0]->value();
3001 return bufferptr(val.data(), val.size());
3002 }
3003 int status() override {
3004 return iters[0]->status().ok() ? 0 : -1;
3005 }
3006 };
3007
3008 KeyValueDB::Iterator RocksDBStore::get_iterator(const std::string& prefix, IteratorOpts opts, IteratorBounds bounds)
3009 {
3010 auto cf_it = cf_handles.find(prefix);
3011 if (cf_it != cf_handles.end()) {
3012 rocksdb::ColumnFamilyHandle* cf = nullptr;
3013 if (cf_it->second.handles.size() == 1) {
3014 cf = cf_it->second.handles[0];
3015 } else if (cct->_conf->osd_rocksdb_iterator_bounds_enabled) {
3016 cf = check_cf_handle_bounds(cf_it, bounds);
3017 }
3018 if (cf) {
3019 return std::make_shared<CFIteratorImpl>(
3020 this,
3021 prefix,
3022 cf,
3023 std::move(bounds));
3024 } else {
3025 return std::make_shared<ShardMergeIteratorImpl>(
3026 this,
3027 prefix,
3028 cf_it->second.handles,
3029 std::move(bounds));
3030 }
3031 } else {
3032 // use wholespace engine if no cfs are configured
3033 // or use default cf otherwise as there is no
3034 // matching cf for the specified prefix.
3035 auto w_it = cf_handles.size() == 0 || prefix.empty() ?
3036 get_wholespace_iterator(opts) :
3037 get_default_cf_iterator();
3038 return KeyValueDB::make_iterator(prefix, w_it);
3039 }
3040 }
3041
3042 RocksDBStore::WholeSpaceIterator RocksDBStore::new_shard_iterator(rocksdb::ColumnFamilyHandle* cf)
3043 {
3044 return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
3045 this,
3046 cf,
3047 0);
3048 }
3049
3050 KeyValueDB::Iterator RocksDBStore::new_shard_iterator(rocksdb::ColumnFamilyHandle* cf,
3051 const std::string& prefix,
3052 IteratorBounds bounds)
3053 {
3054 return std::make_shared<CFIteratorImpl>(
3055 this,
3056 prefix,
3057 cf,
3058 std::move(bounds));
3059 }
3060
3061 RocksDBStore::WholeSpaceIterator RocksDBStore::get_wholespace_iterator(IteratorOpts opts)
3062 {
3063 if (cf_handles.size() == 0) {
3064 return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
3065 this, default_cf, opts);
3066 } else {
3067 return std::make_shared<WholeMergeIteratorImpl>(this);
3068 }
3069 }
3070
3071 RocksDBStore::WholeSpaceIterator RocksDBStore::get_default_cf_iterator()
3072 {
3073 return std::make_shared<RocksDBWholeSpaceIteratorImpl>(this, default_cf, 0);
3074 }
3075
3076 int RocksDBStore::prepare_for_reshard(const std::string& new_sharding,
3077 RocksDBStore::columns_t& to_process_columns)
3078 {
3079 //0. lock db from opening
3080 //1. list existing columns
3081 //2. apply merge operator to (main + columns) opts
3082 //3. prepare std::vector<rocksdb::ColumnFamilyDescriptor> existing_cfs
3083 //4. open db, acquire existing column handles
3084 //5. calculate missing columns
3085 //6. create missing columns
3086 //7. construct cf_handles according to new sharding
3087 //8. check is all cf_handles are filled
3088
3089 bool b;
3090 std::vector<ColumnFamily> new_sharding_def;
3091 char const* error_position;
3092 std::string error_msg;
3093 b = parse_sharding_def(new_sharding, new_sharding_def, &error_position, &error_msg);
3094 if (!b) {
3095 dout(1) << __func__ << " bad sharding: " << dendl;
3096 dout(1) << __func__ << new_sharding << dendl;
3097 dout(1) << __func__ << std::string(error_position - &new_sharding[0], ' ') << "^" << error_msg << dendl;
3098 return -EINVAL;
3099 }
3100
3101 //0. lock db from opening
3102 std::string stored_sharding_text;
3103 rocksdb::ReadFileToString(env,
3104 sharding_def_file,
3105 &stored_sharding_text);
3106 if (stored_sharding_text.find(resharding_column_lock) == string::npos) {
3107 rocksdb::Status status;
3108 if (stored_sharding_text.size() != 0)
3109 stored_sharding_text += " ";
3110 stored_sharding_text += resharding_column_lock;
3111 env->CreateDir(sharding_def_dir);
3112 status = rocksdb::WriteStringToFile(env, stored_sharding_text,
3113 sharding_def_file, true);
3114 if (!status.ok()) {
3115 derr << __func__ << " cannot write to " << sharding_def_file << dendl;
3116 return -EIO;
3117 }
3118 }
3119
3120 //1. list existing columns
3121
3122 rocksdb::Status status;
3123 std::vector<std::string> existing_columns;
3124 rocksdb::Options opt;
3125 int r = load_rocksdb_options(false, opt);
3126 if (r) {
3127 dout(1) << __func__ << " load rocksdb options failed" << dendl;
3128 return r;
3129 }
3130 status = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(opt), path, &existing_columns);
3131 if (!status.ok()) {
3132 derr << "Unable to list column families: " << status.ToString() << dendl;
3133 return -EINVAL;
3134 }
3135 dout(5) << "existing columns = " << existing_columns << dendl;
3136
3137 //2. apply merge operator to (main + columns) opts
3138 //3. prepare std::vector<rocksdb::ColumnFamilyDescriptor> cfs_to_open
3139
3140 std::vector<rocksdb::ColumnFamilyDescriptor> cfs_to_open;
3141 for (const auto& full_name : existing_columns) {
3142 //split col_name to <prefix>-<number>
3143 std::string base_name;
3144 size_t pos = full_name.find('-');
3145 if (std::string::npos == pos)
3146 base_name = full_name;
3147 else
3148 base_name = full_name.substr(0,pos);
3149
3150 rocksdb::ColumnFamilyOptions cf_opt(opt);
3151 // search if we have options for this column
3152 std::string options;
3153 for (const auto& nsd : new_sharding_def) {
3154 if (nsd.name == base_name) {
3155 options = nsd.options;
3156 break;
3157 }
3158 }
3159 int r = update_column_family_options(base_name, options, &cf_opt);
3160 if (r != 0) {
3161 return r;
3162 }
3163 cfs_to_open.emplace_back(full_name, cf_opt);
3164 }
3165
3166 //4. open db, acquire existing column handles
3167 std::vector<rocksdb::ColumnFamilyHandle*> handles;
3168 status = rocksdb::DB::Open(rocksdb::DBOptions(opt),
3169 path, cfs_to_open, &handles, &db);
3170 if (!status.ok()) {
3171 derr << status.ToString() << dendl;
3172 return -EINVAL;
3173 }
3174 for (size_t i = 0; i < cfs_to_open.size(); i++) {
3175 dout(10) << "column " << cfs_to_open[i].name << " handle " << (void*)handles[i] << dendl;
3176 }
3177
3178 //5. calculate missing columns
3179 std::vector<std::string> new_sharding_columns;
3180 std::vector<std::string> missing_columns;
3181 sharding_def_to_columns(new_sharding_def,
3182 new_sharding_columns);
3183 dout(5) << "target columns = " << new_sharding_columns << dendl;
3184 for (const auto& n : new_sharding_columns) {
3185 bool found = false;
3186 for (const auto& e : existing_columns) {
3187 if (n == e) {
3188 found = true;
3189 break;
3190 }
3191 }
3192 if (!found) {
3193 missing_columns.push_back(n);
3194 }
3195 }
3196 dout(5) << "missing columns = " << missing_columns << dendl;
3197
3198 //6. create missing columns
3199 for (const auto& full_name : missing_columns) {
3200 std::string base_name;
3201 size_t pos = full_name.find('-');
3202 if (std::string::npos == pos)
3203 base_name = full_name;
3204 else
3205 base_name = full_name.substr(0,pos);
3206
3207 rocksdb::ColumnFamilyOptions cf_opt(opt);
3208 // search if we have options for this column
3209 std::string options;
3210 for (const auto& nsd : new_sharding_def) {
3211 if (nsd.name == base_name) {
3212 options = nsd.options;
3213 break;
3214 }
3215 }
3216 int r = update_column_family_options(base_name, options, &cf_opt);
3217 if (r != 0) {
3218 return r;
3219 }
3220 rocksdb::ColumnFamilyHandle *cf;
3221 status = db->CreateColumnFamily(cf_opt, full_name, &cf);
3222 if (!status.ok()) {
3223 derr << __func__ << " Failed to create rocksdb column family: "
3224 << full_name << dendl;
3225 return -EINVAL;
3226 }
3227 dout(10) << "created column " << full_name << " handle = " << (void*)cf << dendl;
3228 existing_columns.push_back(full_name);
3229 handles.push_back(cf);
3230 }
3231
3232 //7. construct cf_handles according to new sharding
3233 for (size_t i = 0; i < existing_columns.size(); i++) {
3234 std::string full_name = existing_columns[i];
3235 rocksdb::ColumnFamilyHandle *cf = handles[i];
3236 std::string base_name;
3237 size_t shard_idx = 0;
3238 size_t pos = full_name.find('-');
3239 dout(10) << "processing column " << full_name << dendl;
3240 if (std::string::npos == pos) {
3241 base_name = full_name;
3242 } else {
3243 base_name = full_name.substr(0,pos);
3244 shard_idx = atoi(full_name.substr(pos+1).c_str());
3245 }
3246 if (rocksdb::kDefaultColumnFamilyName == base_name) {
3247 default_cf = handles[i];
3248 must_close_default_cf = true;
3249 std::unique_ptr<rocksdb::ColumnFamilyHandle, cf_deleter_t> ptr{
3250 cf, [](rocksdb::ColumnFamilyHandle*) {}};
3251 to_process_columns.emplace(full_name, std::move(ptr));
3252 } else {
3253 for (const auto& nsd : new_sharding_def) {
3254 if (nsd.name == base_name) {
3255 if (shard_idx < nsd.shard_cnt) {
3256 add_column_family(base_name, nsd.hash_l, nsd.hash_h, shard_idx, cf);
3257 } else {
3258 //ignore columns with index larger then shard count
3259 }
3260 break;
3261 }
3262 }
3263 std::unique_ptr<rocksdb::ColumnFamilyHandle, cf_deleter_t> ptr{
3264 cf, [this](rocksdb::ColumnFamilyHandle* handle) {
3265 db->DestroyColumnFamilyHandle(handle);
3266 }};
3267 to_process_columns.emplace(full_name, std::move(ptr));
3268 }
3269 }
3270
3271 //8. check if all cf_handles are filled
3272 for (const auto& col : cf_handles) {
3273 for (size_t i = 0; i < col.second.handles.size(); i++) {
3274 if (col.second.handles[i] == nullptr) {
3275 derr << "missing handle for column " << col.first << " shard " << i << dendl;
3276 return -EIO;
3277 }
3278 }
3279 }
3280 return 0;
3281 }
3282
3283 int RocksDBStore::reshard_cleanup(const RocksDBStore::columns_t& current_columns)
3284 {
3285 std::vector<std::string> new_sharding_columns;
3286 for (const auto& [name, handle] : cf_handles) {
3287 if (handle.handles.size() == 1) {
3288 new_sharding_columns.push_back(name);
3289 } else {
3290 for (size_t i = 0; i < handle.handles.size(); i++) {
3291 new_sharding_columns.push_back(name + "-" + std::to_string(i));
3292 }
3293 }
3294 }
3295
3296 for (auto& [name, handle] : current_columns) {
3297 auto found = std::find(new_sharding_columns.begin(),
3298 new_sharding_columns.end(),
3299 name) != new_sharding_columns.end();
3300 if (found || name == rocksdb::kDefaultColumnFamilyName) {
3301 dout(5) << "Column " << name << " is part of new sharding." << dendl;
3302 continue;
3303 }
3304 dout(5) << "Column " << name << " not part of new sharding. Deleting." << dendl;
3305
3306 // verify that column is empty
3307 std::unique_ptr<rocksdb::Iterator> it{
3308 db->NewIterator(rocksdb::ReadOptions(), handle.get())};
3309 ceph_assert(it);
3310 it->SeekToFirst();
3311 ceph_assert(!it->Valid());
3312
3313 if (rocksdb::Status status = db->DropColumnFamily(handle.get()); !status.ok()) {
3314 derr << __func__ << " Failed to drop column: " << name << dendl;
3315 return -EINVAL;
3316 }
3317 }
3318 return 0;
3319 }
3320
3321 int RocksDBStore::reshard(const std::string& new_sharding, const RocksDBStore::resharding_ctrl* ctrl_in)
3322 {
3323
3324 resharding_ctrl ctrl = ctrl_in ? *ctrl_in : resharding_ctrl();
3325 size_t bytes_in_batch = 0;
3326 size_t keys_in_batch = 0;
3327 size_t bytes_per_iterator = 0;
3328 size_t keys_per_iterator = 0;
3329 size_t keys_processed = 0;
3330 size_t keys_moved = 0;
3331
3332 auto flush_batch = [&](rocksdb::WriteBatch* batch) {
3333 dout(10) << "flushing batch, " << keys_in_batch << " keys, for "
3334 << bytes_in_batch << " bytes" << dendl;
3335 rocksdb::WriteOptions woptions;
3336 woptions.sync = true;
3337 rocksdb::Status s = db->Write(woptions, batch);
3338 ceph_assert(s.ok());
3339 bytes_in_batch = 0;
3340 keys_in_batch = 0;
3341 batch->Clear();
3342 };
3343
3344 auto process_column = [&](rocksdb::ColumnFamilyHandle* handle,
3345 const std::string& fixed_prefix)
3346 {
3347 dout(5) << " column=" << (void*)handle << " prefix=" << fixed_prefix << dendl;
3348 std::unique_ptr<rocksdb::Iterator> it{
3349 db->NewIterator(rocksdb::ReadOptions(), handle)};
3350 ceph_assert(it);
3351
3352 rocksdb::WriteBatch bat;
3353 for (it->SeekToFirst(); it->Valid(); it->Next()) {
3354 rocksdb::Slice raw_key = it->key();
3355 dout(30) << "key=" << pretty_binary_string(raw_key.ToString()) << dendl;
3356 //check if need to refresh iterator
3357 if (bytes_per_iterator >= ctrl.bytes_per_iterator ||
3358 keys_per_iterator >= ctrl.keys_per_iterator) {
3359 dout(8) << "refreshing iterator" << dendl;
3360 bytes_per_iterator = 0;
3361 keys_per_iterator = 0;
3362 std::string raw_key_str = raw_key.ToString();
3363 it.reset(db->NewIterator(rocksdb::ReadOptions(), handle));
3364 ceph_assert(it);
3365 it->Seek(raw_key_str);
3366 ceph_assert(it->Valid());
3367 raw_key = it->key();
3368 }
3369 rocksdb::Slice value = it->value();
3370 std::string prefix, key;
3371 if (fixed_prefix.size() == 0) {
3372 split_key(raw_key, &prefix, &key);
3373 } else {
3374 prefix = fixed_prefix;
3375 key = raw_key.ToString();
3376 }
3377 keys_processed++;
3378 if ((keys_processed % 10000) == 0) {
3379 dout(10) << "processed " << keys_processed << " keys, moved " << keys_moved << dendl;
3380 }
3381 rocksdb::ColumnFamilyHandle* new_handle = get_cf_handle(prefix, key);
3382 if (new_handle == nullptr) {
3383 new_handle = default_cf;
3384 }
3385 if (handle == new_handle) {
3386 continue;
3387 }
3388 std::string new_raw_key;
3389 if (new_handle == default_cf) {
3390 new_raw_key = combine_strings(prefix, key);
3391 } else {
3392 new_raw_key = key;
3393 }
3394 bat.Delete(handle, raw_key);
3395 bat.Put(new_handle, new_raw_key, value);
3396 dout(25) << "moving " << (void*)handle << "/" << pretty_binary_string(raw_key.ToString()) <<
3397 " to " << (void*)new_handle << "/" << pretty_binary_string(new_raw_key) <<
3398 " size " << value.size() << dendl;
3399 keys_moved++;
3400 bytes_in_batch += new_raw_key.size() * 2 + value.size();
3401 keys_in_batch++;
3402 bytes_per_iterator += new_raw_key.size() * 2 + value.size();
3403 keys_per_iterator++;
3404
3405 //check if need to write batch
3406 if (bytes_in_batch >= ctrl.bytes_per_batch ||
3407 keys_in_batch >= ctrl.keys_per_batch) {
3408 flush_batch(&bat);
3409 if (ctrl.unittest_fail_after_first_batch) {
3410 return -1000;
3411 }
3412 }
3413 }
3414 if (bat.Count() > 0) {
3415 flush_batch(&bat);
3416 }
3417 return 0;
3418 };
3419
3420 auto close_column_handles = make_scope_guard([this] {
3421 cf_handles.clear();
3422 close();
3423 });
3424 columns_t to_process_columns;
3425 int r = prepare_for_reshard(new_sharding, to_process_columns);
3426 if (r != 0) {
3427 dout(1) << "failed to prepare db for reshard" << dendl;
3428 return r;
3429 }
3430
3431 for (auto& [name, handle] : to_process_columns) {
3432 dout(5) << "Processing column=" << name
3433 << " handle=" << handle.get() << dendl;
3434 if (name == rocksdb::kDefaultColumnFamilyName) {
3435 ceph_assert(handle.get() == default_cf);
3436 r = process_column(default_cf, std::string());
3437 } else {
3438 std::string fixed_prefix = name.substr(0, name.find('-'));
3439 dout(10) << "Prefix: " << fixed_prefix << dendl;
3440 r = process_column(handle.get(), fixed_prefix);
3441 }
3442 if (r != 0) {
3443 derr << "Error processing column " << name << dendl;
3444 return r;
3445 }
3446 if (ctrl.unittest_fail_after_processing_column) {
3447 return -1001;
3448 }
3449 }
3450
3451 r = reshard_cleanup(to_process_columns);
3452 if (r != 0) {
3453 dout(5) << "failed to cleanup after reshard" << dendl;
3454 return r;
3455 }
3456
3457 if (ctrl.unittest_fail_after_successful_processing) {
3458 return -1002;
3459 }
3460 env->CreateDir(sharding_def_dir);
3461 if (auto status = rocksdb::WriteStringToFile(env, new_sharding,
3462 sharding_def_file, true);
3463 !status.ok()) {
3464 derr << __func__ << " cannot write to " << sharding_def_file << dendl;
3465 return -EIO;
3466 }
3467
3468 return r;
3469 }
3470
3471 bool RocksDBStore::get_sharding(std::string& sharding) {
3472 rocksdb::Status status;
3473 std::string stored_sharding_text;
3474 bool result = false;
3475 sharding.clear();
3476
3477 status = env->FileExists(sharding_def_file);
3478 if (status.ok()) {
3479 status = rocksdb::ReadFileToString(env,
3480 sharding_def_file,
3481 &stored_sharding_text);
3482 if(status.ok()) {
3483 result = true;
3484 sharding = stored_sharding_text;
3485 }
3486 }
3487 return result;
3488 }