]> git.proxmox.com Git - ceph.git/blame - ceph/src/kv/RocksDBStore.cc
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / kv / RocksDBStore.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include <set>
5#include <map>
6#include <string>
7#include <memory>
8#include <errno.h>
9#include <unistd.h>
10#include <sys/types.h>
11#include <sys/stat.h>
12
13#include "rocksdb/db.h"
14#include "rocksdb/table.h"
15#include "rocksdb/env.h"
16#include "rocksdb/slice.h"
17#include "rocksdb/cache.h"
18#include "rocksdb/filter_policy.h"
19#include "rocksdb/utilities/convenience.h"
20#include "rocksdb/merge_operator.h"
91327a77 21
7c673cae
FG
22using std::string;
23#include "common/perf_counters.h"
91327a77 24#include "common/PriorityCache.h"
7c673cae
FG
25#include "include/str_list.h"
26#include "include/stringify.h"
27#include "include/str_map.h"
28#include "KeyValueDB.h"
29#include "RocksDBStore.h"
30
31#include "common/debug.h"
32
33#define dout_context cct
34#define dout_subsys ceph_subsys_rocksdb
35#undef dout_prefix
36#define dout_prefix *_dout << "rocksdb: "
37
11fdf7f2
TL
38static bufferlist to_bufferlist(rocksdb::Slice in) {
39 bufferlist bl;
40 bl.append(bufferptr(in.data(), in.size()));
41 return bl;
42}
43
c07f9fc5
FG
44static rocksdb::SliceParts prepare_sliceparts(const bufferlist &bl,
45 vector<rocksdb::Slice> *slices)
31f18b77
FG
46{
47 unsigned n = 0;
c07f9fc5
FG
48 for (auto& buf : bl.buffers()) {
49 (*slices)[n].data_ = buf.c_str();
50 (*slices)[n].size_ = buf.length();
51 n++;
31f18b77 52 }
c07f9fc5 53 return rocksdb::SliceParts(slices->data(), slices->size());
31f18b77
FG
54}
55
11fdf7f2 56
7c673cae 57//
11fdf7f2
TL
58// One of these for the default rocksdb column family, routing each prefix
59// to the appropriate MergeOperator.
7c673cae 60//
11fdf7f2
TL
61class RocksDBStore::MergeOperatorRouter
62 : public rocksdb::AssociativeMergeOperator
63{
7c673cae 64 RocksDBStore& store;
11fdf7f2 65public:
7c673cae
FG
66 const char *Name() const override {
67 // Construct a name that rocksDB will validate against. We want to
68 // do this in a way that doesn't constrain the ordering of calls
69 // to set_merge_operator, so sort the merge operators and then
70 // construct a name from all of those parts.
71 store.assoc_name.clear();
72 map<std::string,std::string> names;
11fdf7f2
TL
73
74 for (auto& p : store.merge_ops) {
75 names[p.first] = p.second->name();
76 }
77 for (auto& p : store.cf_handles) {
78 names.erase(p.first);
79 }
7c673cae
FG
80 for (auto& p : names) {
81 store.assoc_name += '.';
82 store.assoc_name += p.first;
83 store.assoc_name += ':';
84 store.assoc_name += p.second;
85 }
86 return store.assoc_name.c_str();
87 }
88
11fdf7f2 89 explicit MergeOperatorRouter(RocksDBStore &_store) : store(_store) {}
7c673cae
FG
90
91 bool Merge(const rocksdb::Slice& key,
11fdf7f2
TL
92 const rocksdb::Slice* existing_value,
93 const rocksdb::Slice& value,
94 std::string* new_value,
95 rocksdb::Logger* logger) const override {
96 // for default column family
97 // extract prefix from key and compare against each registered merge op;
98 // even though merge operator for explicit CF is included in merge_ops,
99 // it won't be picked up, since it won't match.
7c673cae
FG
100 for (auto& p : store.merge_ops) {
101 if (p.first.compare(0, p.first.length(),
102 key.data(), p.first.length()) == 0 &&
103 key.data()[p.first.length()] == 0) {
11fdf7f2
TL
104 if (existing_value) {
105 p.second->merge(existing_value->data(), existing_value->size(),
7c673cae
FG
106 value.data(), value.size(),
107 new_value);
11fdf7f2
TL
108 } else {
109 p.second->merge_nonexistent(value.data(), value.size(), new_value);
110 }
111 break;
7c673cae
FG
112 }
113 }
114 return true; // OK :)
115 }
11fdf7f2
TL
116};
117
118//
119// One of these per non-default column family, linked directly to the
120// merge operator for that CF/prefix (if any).
121//
122class RocksDBStore::MergeOperatorLinker
123 : public rocksdb::AssociativeMergeOperator
124{
125private:
126 std::shared_ptr<KeyValueDB::MergeOperator> mop;
127public:
128 explicit MergeOperatorLinker(const std::shared_ptr<KeyValueDB::MergeOperator> &o) : mop(o) {}
7c673cae 129
11fdf7f2
TL
130 const char *Name() const override {
131 return mop->name();
132 }
133
134 bool Merge(const rocksdb::Slice& key,
135 const rocksdb::Slice* existing_value,
136 const rocksdb::Slice& value,
137 std::string* new_value,
138 rocksdb::Logger* logger) const override {
139 if (existing_value) {
140 mop->merge(existing_value->data(), existing_value->size(),
141 value.data(), value.size(),
142 new_value);
143 } else {
144 mop->merge_nonexistent(value.data(), value.size(), new_value);
145 }
146 return true;
147 }
7c673cae
FG
148};
149
150int RocksDBStore::set_merge_operator(
151 const string& prefix,
152 std::shared_ptr<KeyValueDB::MergeOperator> mop)
153{
154 // If you fail here, it's because you can't do this on an open database
11fdf7f2 155 ceph_assert(db == nullptr);
7c673cae
FG
156 merge_ops.push_back(std::make_pair(prefix,mop));
157 return 0;
158}
159
160class CephRocksdbLogger : public rocksdb::Logger {
161 CephContext *cct;
162public:
163 explicit CephRocksdbLogger(CephContext *c) : cct(c) {
164 cct->get();
165 }
166 ~CephRocksdbLogger() override {
167 cct->put();
168 }
169
170 // Write an entry to the log file with the specified format.
171 void Logv(const char* format, va_list ap) override {
172 Logv(rocksdb::INFO_LEVEL, format, ap);
173 }
174
175 // Write an entry to the log file with the specified log level
176 // and format. Any log with level under the internal log level
177 // of *this (see @SetInfoLogLevel and @GetInfoLogLevel) will not be
178 // printed.
179 void Logv(const rocksdb::InfoLogLevel log_level, const char* format,
180 va_list ap) override {
181 int v = rocksdb::NUM_INFO_LOG_LEVELS - log_level - 1;
11fdf7f2 182 dout(ceph::dout::need_dynamic(v));
7c673cae
FG
183 char buf[65536];
184 vsnprintf(buf, sizeof(buf), format, ap);
185 *_dout << buf << dendl;
186 }
187};
188
189rocksdb::Logger *create_rocksdb_ceph_logger()
190{
191 return new CephRocksdbLogger(g_ceph_context);
192}
193
c07f9fc5 194static int string2bool(const string &val, bool &b_val)
7c673cae
FG
195{
196 if (strcasecmp(val.c_str(), "false") == 0) {
197 b_val = false;
198 return 0;
199 } else if (strcasecmp(val.c_str(), "true") == 0) {
200 b_val = true;
201 return 0;
202 } else {
203 std::string err;
204 int b = strict_strtol(val.c_str(), 10, &err);
205 if (!err.empty())
206 return -EINVAL;
207 b_val = !!b;
208 return 0;
209 }
210}
211
c07f9fc5 212int RocksDBStore::tryInterpret(const string &key, const string &val, rocksdb::Options &opt)
7c673cae
FG
213{
214 if (key == "compaction_threads") {
215 std::string err;
1adf2230 216 int f = strict_iecstrtoll(val.c_str(), &err);
7c673cae
FG
217 if (!err.empty())
218 return -EINVAL;
219 //Low priority threadpool is used for compaction
220 opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::LOW);
221 } else if (key == "flusher_threads") {
222 std::string err;
1adf2230 223 int f = strict_iecstrtoll(val.c_str(), &err);
7c673cae
FG
224 if (!err.empty())
225 return -EINVAL;
226 //High priority threadpool is used for flusher
227 opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::HIGH);
228 } else if (key == "compact_on_mount") {
229 int ret = string2bool(val, compact_on_mount);
230 if (ret != 0)
231 return ret;
232 } else if (key == "disableWAL") {
233 int ret = string2bool(val, disableWAL);
234 if (ret != 0)
235 return ret;
236 } else {
237 //unrecognize config options.
238 return -EINVAL;
239 }
240 return 0;
241}
242
c07f9fc5 243int RocksDBStore::ParseOptionsFromString(const string &opt_str, rocksdb::Options &opt)
7c673cae
FG
244{
245 map<string, string> str_map;
246 int r = get_str_map(opt_str, &str_map, ",\n;");
247 if (r < 0)
248 return r;
249 map<string, string>::iterator it;
250 for(it = str_map.begin(); it != str_map.end(); ++it) {
251 string this_opt = it->first + "=" + it->second;
252 rocksdb::Status status = rocksdb::GetOptionsFromString(opt, this_opt , &opt);
253 if (!status.ok()) {
254 //unrecognized by rocksdb, try to interpret by ourselves.
255 r = tryInterpret(it->first, it->second, opt);
256 if (r < 0) {
257 derr << status.ToString() << dendl;
258 return -EINVAL;
259 }
260 }
261 lgeneric_dout(cct, 0) << " set rocksdb option " << it->first
262 << " = " << it->second << dendl;
263 }
264 return 0;
265}
266
267int RocksDBStore::init(string _options_str)
268{
269 options_str = _options_str;
270 rocksdb::Options opt;
271 //try parse options
272 if (options_str.length()) {
273 int r = ParseOptionsFromString(options_str, opt);
274 if (r != 0) {
275 return -EINVAL;
276 }
277 }
278 return 0;
279}
280
11fdf7f2 281int RocksDBStore::create_db_dir()
7c673cae
FG
282{
283 if (env) {
284 unique_ptr<rocksdb::Directory> dir;
285 env->NewDirectory(path, &dir);
286 } else {
287 int r = ::mkdir(path.c_str(), 0755);
288 if (r < 0)
289 r = -errno;
290 if (r < 0 && r != -EEXIST) {
291 derr << __func__ << " failed to create " << path << ": " << cpp_strerror(r)
292 << dendl;
293 return r;
294 }
295 }
11fdf7f2
TL
296 return 0;
297}
298
299int RocksDBStore::install_cf_mergeop(
300 const string &cf_name,
301 rocksdb::ColumnFamilyOptions *cf_opt)
302{
303 ceph_assert(cf_opt != nullptr);
304 cf_opt->merge_operator.reset();
305 for (auto& i : merge_ops) {
306 if (i.first == cf_name) {
307 cf_opt->merge_operator.reset(new MergeOperatorLinker(i.second));
308 }
309 }
310 return 0;
7c673cae
FG
311}
312
11fdf7f2
TL
313int RocksDBStore::create_and_open(ostream &out,
314 const vector<ColumnFamily>& cfs)
315{
316 int r = create_db_dir();
317 if (r < 0)
318 return r;
319 if (cfs.empty()) {
320 return do_open(out, true, false, nullptr);
321 } else {
322 return do_open(out, true, false, &cfs);
323 }
324}
325
326int RocksDBStore::load_rocksdb_options(bool create_if_missing, rocksdb::Options& opt)
7c673cae 327{
7c673cae
FG
328 rocksdb::Status status;
329
330 if (options_str.length()) {
331 int r = ParseOptionsFromString(options_str, opt);
332 if (r != 0) {
333 return -EINVAL;
334 }
335 }
336
11fdf7f2 337 if (g_conf()->rocksdb_perf) {
7c673cae
FG
338 dbstats = rocksdb::CreateDBStatistics();
339 opt.statistics = dbstats;
340 }
341
342 opt.create_if_missing = create_if_missing;
11fdf7f2 343 if (kv_options.count("separate_wal_dir")) {
7c673cae
FG
344 opt.wal_dir = path + ".wal";
345 }
11fdf7f2
TL
346
347 // Since ceph::for_each_substr doesn't return a value and
348 // std::stoull does throw, we may as well just catch everything here.
349 try {
350 if (kv_options.count("db_paths")) {
351 list<string> paths;
352 get_str_list(kv_options["db_paths"], "; \t", paths);
353 for (auto& p : paths) {
354 size_t pos = p.find(',');
355 if (pos == std::string::npos) {
356 derr << __func__ << " invalid db path item " << p << " in "
357 << kv_options["db_paths"] << dendl;
358 return -EINVAL;
359 }
360 string path = p.substr(0, pos);
361 string size_str = p.substr(pos + 1);
362 uint64_t size = atoll(size_str.c_str());
363 if (!size) {
364 derr << __func__ << " invalid db path item " << p << " in "
365 << kv_options["db_paths"] << dendl;
366 return -EINVAL;
367 }
368 opt.db_paths.push_back(rocksdb::DbPath(path, size));
369 dout(10) << __func__ << " db_path " << path << " size " << size << dendl;
7c673cae 370 }
7c673cae 371 }
11fdf7f2
TL
372 } catch (const std::system_error& e) {
373 return -e.code().value();
7c673cae
FG
374 }
375
11fdf7f2 376 if (g_conf()->rocksdb_log_to_ceph_log) {
7c673cae
FG
377 opt.info_log.reset(new CephRocksdbLogger(g_ceph_context));
378 }
379
380 if (priv) {
381 dout(10) << __func__ << " using custom Env " << priv << dendl;
382 opt.env = static_cast<rocksdb::Env*>(priv);
383 }
384
eafe8130
TL
385 opt.env->SetAllowNonOwnerAccess(false);
386
31f18b77 387 // caches
224ce89b 388 if (!set_cache_flag) {
11fdf7f2 389 cache_size = g_conf()->rocksdb_cache_size;
31f18b77 390 }
11fdf7f2 391 uint64_t row_cache_size = cache_size * g_conf()->rocksdb_cache_row_ratio;
31f18b77 392 uint64_t block_cache_size = cache_size - row_cache_size;
224ce89b 393
11fdf7f2 394 if (g_conf()->rocksdb_cache_type == "binned_lru") {
91327a77 395 bbt_opts.block_cache = rocksdb_cache::NewBinnedLRUCache(
11fdf7f2 396 cct,
91327a77 397 block_cache_size,
11fdf7f2
TL
398 g_conf()->rocksdb_cache_shard_bits);
399 } else if (g_conf()->rocksdb_cache_type == "lru") {
91327a77
AA
400 bbt_opts.block_cache = rocksdb::NewLRUCache(
401 block_cache_size,
11fdf7f2
TL
402 g_conf()->rocksdb_cache_shard_bits);
403 } else if (g_conf()->rocksdb_cache_type == "clock") {
91327a77
AA
404 bbt_opts.block_cache = rocksdb::NewClockCache(
405 block_cache_size,
11fdf7f2 406 g_conf()->rocksdb_cache_shard_bits);
91327a77 407 if (!bbt_opts.block_cache) {
11fdf7f2 408 derr << "rocksdb_cache_type '" << g_conf()->rocksdb_cache_type
91327a77
AA
409 << "' chosen, but RocksDB not compiled with LibTBB. "
410 << dendl;
224ce89b
WB
411 return -EINVAL;
412 }
91327a77 413 } else {
11fdf7f2 414 derr << "unrecognized rocksdb_cache_type '" << g_conf()->rocksdb_cache_type
91327a77
AA
415 << "'" << dendl;
416 return -EINVAL;
31f18b77 417 }
11fdf7f2 418 bbt_opts.block_size = g_conf()->rocksdb_block_size;
31f18b77 419
224ce89b
WB
420 if (row_cache_size > 0)
421 opt.row_cache = rocksdb::NewLRUCache(row_cache_size,
11fdf7f2
TL
422 g_conf()->rocksdb_cache_shard_bits);
423 uint64_t bloom_bits = g_conf().get_val<uint64_t>("rocksdb_bloom_bits_per_key");
c07f9fc5 424 if (bloom_bits > 0) {
7c673cae 425 dout(10) << __func__ << " set bloom filter bits per key to "
c07f9fc5
FG
426 << bloom_bits << dendl;
427 bbt_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(bloom_bits));
428 }
11fdf7f2
TL
429 using std::placeholders::_1;
430 if (g_conf().with_val<std::string>("rocksdb_index_type",
431 std::bind(std::equal_to<std::string>(), _1,
432 "binary_search")))
c07f9fc5 433 bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kBinarySearch;
11fdf7f2
TL
434 if (g_conf().with_val<std::string>("rocksdb_index_type",
435 std::bind(std::equal_to<std::string>(), _1,
436 "hash_search")))
c07f9fc5 437 bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kHashSearch;
11fdf7f2
TL
438 if (g_conf().with_val<std::string>("rocksdb_index_type",
439 std::bind(std::equal_to<std::string>(), _1,
440 "two_level")))
c07f9fc5 441 bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
11fdf7f2
TL
442 if (!bbt_opts.no_block_cache) {
443 bbt_opts.cache_index_and_filter_blocks =
444 g_conf().get_val<bool>("rocksdb_cache_index_and_filter_blocks");
445 bbt_opts.cache_index_and_filter_blocks_with_high_priority =
446 g_conf().get_val<bool>("rocksdb_cache_index_and_filter_blocks_with_high_priority");
447 bbt_opts.pin_l0_filter_and_index_blocks_in_cache =
448 g_conf().get_val<bool>("rocksdb_pin_l0_filter_and_index_blocks_in_cache");
449 }
450 bbt_opts.partition_filters = g_conf().get_val<bool>("rocksdb_partition_filters");
451 if (g_conf().get_val<Option::size_t>("rocksdb_metadata_block_size") > 0)
452 bbt_opts.metadata_block_size = g_conf().get_val<Option::size_t>("rocksdb_metadata_block_size");
c07f9fc5 453
7c673cae 454 opt.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbt_opts));
11fdf7f2 455 dout(10) << __func__ << " block size " << g_conf()->rocksdb_block_size
1adf2230
AA
456 << ", block_cache size " << byte_u_t(block_cache_size)
457 << ", row_cache size " << byte_u_t(row_cache_size)
31f18b77 458 << "; shards "
11fdf7f2
TL
459 << (1 << g_conf()->rocksdb_cache_shard_bits)
460 << ", type " << g_conf()->rocksdb_cache_type
31f18b77 461 << dendl;
7c673cae
FG
462
463 opt.merge_operator.reset(new MergeOperatorRouter(*this));
11fdf7f2
TL
464
465 return 0;
466}
467
468int RocksDBStore::do_open(ostream &out,
469 bool create_if_missing,
470 bool open_readonly,
471 const vector<ColumnFamily>* cfs)
472{
473 ceph_assert(!(create_if_missing && open_readonly));
474 rocksdb::Options opt;
475 int r = load_rocksdb_options(create_if_missing, opt);
476 if (r) {
477 dout(1) << __func__ << " load rocksdb options failed" << dendl;
478 return r;
479 }
480 rocksdb::Status status;
481 if (create_if_missing) {
482 status = rocksdb::DB::Open(opt, path, &db);
483 if (!status.ok()) {
484 derr << status.ToString() << dendl;
485 return -EINVAL;
486 }
487 // create and open column families
488 if (cfs) {
489 for (auto& p : *cfs) {
490 // copy default CF settings, block cache, merge operators as
491 // the base for new CF
492 rocksdb::ColumnFamilyOptions cf_opt(opt);
493 // user input options will override the base options
494 status = rocksdb::GetColumnFamilyOptionsFromString(
495 cf_opt, p.option, &cf_opt);
496 if (!status.ok()) {
497 derr << __func__ << " invalid db column family option string for CF: "
498 << p.name << dendl;
499 return -EINVAL;
500 }
501 install_cf_mergeop(p.name, &cf_opt);
502 rocksdb::ColumnFamilyHandle *cf;
503 status = db->CreateColumnFamily(cf_opt, p.name, &cf);
504 if (!status.ok()) {
505 derr << __func__ << " Failed to create rocksdb column family: "
506 << p.name << dendl;
507 return -EINVAL;
508 }
509 // store the new CF handle
510 add_column_family(p.name, static_cast<void*>(cf));
511 }
512 }
513 default_cf = db->DefaultColumnFamily();
514 } else {
515 std::vector<string> existing_cfs;
516 status = rocksdb::DB::ListColumnFamilies(
517 rocksdb::DBOptions(opt),
518 path,
519 &existing_cfs);
520 dout(1) << __func__ << " column families: " << existing_cfs << dendl;
521 if (existing_cfs.empty()) {
522 // no column families
523 if (open_readonly) {
524 status = rocksdb::DB::Open(opt, path, &db);
525 } else {
526 status = rocksdb::DB::OpenForReadOnly(opt, path, &db);
527 }
528 if (!status.ok()) {
529 derr << status.ToString() << dendl;
530 return -EINVAL;
531 }
532 default_cf = db->DefaultColumnFamily();
533 } else {
534 // we cannot change column families for a created database. so, map
535 // what options we are given to whatever cf's already exist.
536 std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
537 for (auto& n : existing_cfs) {
538 // copy default CF settings, block cache, merge operators as
539 // the base for new CF
540 rocksdb::ColumnFamilyOptions cf_opt(opt);
541 bool found = false;
542 if (cfs) {
543 for (auto& i : *cfs) {
544 if (i.name == n) {
545 found = true;
546 status = rocksdb::GetColumnFamilyOptionsFromString(
547 cf_opt, i.option, &cf_opt);
548 if (!status.ok()) {
549 derr << __func__ << " invalid db column family options for CF '"
550 << i.name << "': " << i.option << dendl;
551 return -EINVAL;
552 }
553 }
554 }
555 }
556 if (n != rocksdb::kDefaultColumnFamilyName) {
557 install_cf_mergeop(n, &cf_opt);
558 }
559 column_families.push_back(rocksdb::ColumnFamilyDescriptor(n, cf_opt));
560 if (!found && n != rocksdb::kDefaultColumnFamilyName) {
561 dout(1) << __func__ << " column family '" << n
562 << "' exists but not expected" << dendl;
563 }
564 }
565 std::vector<rocksdb::ColumnFamilyHandle*> handles;
566 if (open_readonly) {
567 status = rocksdb::DB::OpenForReadOnly(rocksdb::DBOptions(opt),
568 path, column_families,
569 &handles, &db);
570 } else {
571 status = rocksdb::DB::Open(rocksdb::DBOptions(opt),
572 path, column_families, &handles, &db);
573 }
574 if (!status.ok()) {
575 derr << status.ToString() << dendl;
576 return -EINVAL;
577 }
578 for (unsigned i = 0; i < existing_cfs.size(); ++i) {
579 if (existing_cfs[i] == rocksdb::kDefaultColumnFamilyName) {
580 default_cf = handles[i];
581 must_close_default_cf = true;
582 } else {
583 add_column_family(existing_cfs[i], static_cast<void*>(handles[i]));
584 }
585 }
586 }
7c673cae 587 }
11fdf7f2 588 ceph_assert(default_cf != nullptr);
7c673cae
FG
589
590 PerfCountersBuilder plb(g_ceph_context, "rocksdb", l_rocksdb_first, l_rocksdb_last);
591 plb.add_u64_counter(l_rocksdb_gets, "get", "Gets");
592 plb.add_u64_counter(l_rocksdb_txns, "submit_transaction", "Submit transactions");
593 plb.add_u64_counter(l_rocksdb_txns_sync, "submit_transaction_sync", "Submit transactions sync");
594 plb.add_time_avg(l_rocksdb_get_latency, "get_latency", "Get latency");
595 plb.add_time_avg(l_rocksdb_submit_latency, "submit_latency", "Submit Latency");
596 plb.add_time_avg(l_rocksdb_submit_sync_latency, "submit_sync_latency", "Submit Sync Latency");
597 plb.add_u64_counter(l_rocksdb_compact, "compact", "Compactions");
598 plb.add_u64_counter(l_rocksdb_compact_range, "compact_range", "Compactions by range");
599 plb.add_u64_counter(l_rocksdb_compact_queue_merge, "compact_queue_merge", "Mergings of ranges in compaction queue");
600 plb.add_u64(l_rocksdb_compact_queue_len, "compact_queue_len", "Length of compaction queue");
601 plb.add_time_avg(l_rocksdb_write_wal_time, "rocksdb_write_wal_time", "Rocksdb write wal time");
602 plb.add_time_avg(l_rocksdb_write_memtable_time, "rocksdb_write_memtable_time", "Rocksdb write memtable time");
603 plb.add_time_avg(l_rocksdb_write_delay_time, "rocksdb_write_delay_time", "Rocksdb write delay time");
604 plb.add_time_avg(l_rocksdb_write_pre_and_post_process_time,
605 "rocksdb_write_pre_and_post_time", "total time spent on writing a record, excluding write process");
606 logger = plb.create_perf_counters();
607 cct->get_perfcounters_collection()->add(logger);
608
609 if (compact_on_mount) {
610 derr << "Compacting rocksdb store..." << dendl;
611 compact();
612 derr << "Finished compacting rocksdb store" << dendl;
613 }
614 return 0;
615}
616
617int RocksDBStore::_test_init(const string& dir)
618{
619 rocksdb::Options options;
620 options.create_if_missing = true;
621 rocksdb::DB *db;
622 rocksdb::Status status = rocksdb::DB::Open(options, dir, &db);
623 delete db;
624 db = nullptr;
625 return status.ok() ? 0 : -EIO;
626}
627
628RocksDBStore::~RocksDBStore()
629{
630 close();
631 delete logger;
632
633 // Ensure db is destroyed before dependent db_cache and filterpolicy
11fdf7f2
TL
634 for (auto& p : cf_handles) {
635 db->DestroyColumnFamilyHandle(
636 static_cast<rocksdb::ColumnFamilyHandle*>(p.second));
637 p.second = nullptr;
638 }
639 if (must_close_default_cf) {
640 db->DestroyColumnFamilyHandle(default_cf);
641 must_close_default_cf = false;
642 }
643 default_cf = nullptr;
7c673cae
FG
644 delete db;
645 db = nullptr;
646
647 if (priv) {
648 delete static_cast<rocksdb::Env*>(priv);
649 }
650}
651
652void RocksDBStore::close()
653{
654 // stop compaction thread
655 compact_queue_lock.Lock();
656 if (compact_thread.is_started()) {
92f5a8d4 657 dout(1) << __func__ << " waiting for compaction thread to stop" << dendl;
7c673cae
FG
658 compact_queue_stop = true;
659 compact_queue_cond.Signal();
660 compact_queue_lock.Unlock();
661 compact_thread.join();
92f5a8d4 662 dout(1) << __func__ << " compaction thread to stopped" << dendl;
7c673cae
FG
663 } else {
664 compact_queue_lock.Unlock();
665 }
666
667 if (logger)
668 cct->get_perfcounters_collection()->remove(logger);
669}
670
11fdf7f2
TL
671int RocksDBStore::repair(std::ostream &out)
672{
673 rocksdb::Options opt;
674 int r = load_rocksdb_options(false, opt);
675 if (r) {
676 dout(1) << __func__ << " load rocksdb options failed" << dendl;
677 out << "load rocksdb options failed" << std::endl;
678 return r;
679 }
680 rocksdb::Status status = rocksdb::RepairDB(path, opt);
681 if (status.ok()) {
682 return 0;
683 } else {
684 out << "repair rocksdb failed : " << status.ToString() << std::endl;
685 return 1;
686 }
687}
688
7c673cae
FG
689void RocksDBStore::split_stats(const std::string &s, char delim, std::vector<std::string> &elems) {
690 std::stringstream ss;
691 ss.str(s);
692 std::string item;
693 while (std::getline(ss, item, delim)) {
694 elems.push_back(item);
695 }
696}
697
11fdf7f2
TL
698int64_t RocksDBStore::estimate_prefix_size(const string& prefix)
699{
700 auto cf = get_cf_handle(prefix);
701 uint64_t size = 0;
702 uint8_t flags =
703 //rocksdb::DB::INCLUDE_MEMTABLES | // do not include memtables...
704 rocksdb::DB::INCLUDE_FILES;
705 if (cf) {
706 string start(1, '\x00');
707 string limit("\xff\xff\xff\xff");
708 rocksdb::Range r(start, limit);
709 db->GetApproximateSizes(cf, &r, 1, &size, flags);
710 } else {
711 string limit = prefix + "\xff\xff\xff\xff";
712 rocksdb::Range r(prefix, limit);
713 db->GetApproximateSizes(default_cf,
714 &r, 1, &size, flags);
715 }
716 return size;
717}
718
7c673cae
FG
719void RocksDBStore::get_statistics(Formatter *f)
720{
11fdf7f2
TL
721 if (!g_conf()->rocksdb_perf) {
722 dout(20) << __func__ << " RocksDB perf is disabled, can't probe for stats"
7c673cae
FG
723 << dendl;
724 return;
725 }
726
11fdf7f2 727 if (g_conf()->rocksdb_collect_compaction_stats) {
7c673cae
FG
728 std::string stat_str;
729 bool status = db->GetProperty("rocksdb.stats", &stat_str);
730 if (status) {
731 f->open_object_section("rocksdb_statistics");
732 f->dump_string("rocksdb_compaction_statistics", "");
733 vector<string> stats;
734 split_stats(stat_str, '\n', stats);
735 for (auto st :stats) {
736 f->dump_string("", st);
737 }
738 f->close_section();
739 }
740 }
11fdf7f2 741 if (g_conf()->rocksdb_collect_extended_stats) {
7c673cae
FG
742 if (dbstats) {
743 f->open_object_section("rocksdb_extended_statistics");
744 string stat_str = dbstats->ToString();
745 vector<string> stats;
746 split_stats(stat_str, '\n', stats);
747 f->dump_string("rocksdb_extended_statistics", "");
748 for (auto st :stats) {
749 f->dump_string(".", st);
750 }
751 f->close_section();
752 }
753 f->open_object_section("rocksdbstore_perf_counters");
754 logger->dump_formatted(f,0);
755 f->close_section();
756 }
11fdf7f2 757 if (g_conf()->rocksdb_collect_memory_stats) {
7c673cae 758 f->open_object_section("rocksdb_memtable_statistics");
11fdf7f2
TL
759 std::string str;
760 if (!bbt_opts.no_block_cache) {
761 str.append(stringify(bbt_opts.block_cache->GetUsage()));
762 f->dump_string("block_cache_usage", str.data());
763 str.clear();
764 str.append(stringify(bbt_opts.block_cache->GetPinnedUsage()));
765 f->dump_string("block_cache_pinned_blocks_usage", str);
766 str.clear();
767 }
7c673cae
FG
768 db->GetProperty("rocksdb.cur-size-all-mem-tables", &str);
769 f->dump_string("rocksdb_memtable_usage", str);
11fdf7f2
TL
770 str.clear();
771 db->GetProperty("rocksdb.estimate-table-readers-mem", &str);
772 f->dump_string("rocksdb_index_filter_blocks_usage", str);
7c673cae
FG
773 f->close_section();
774 }
775}
776
11fdf7f2 777int RocksDBStore::submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t)
7c673cae 778{
7c673cae
FG
779 // enable rocksdb breakdown
780 // considering performance overhead, default is disabled
11fdf7f2 781 if (g_conf()->rocksdb_perf) {
7c673cae 782 rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
11fdf7f2 783 rocksdb::get_perf_context()->Reset();
7c673cae
FG
784 }
785
786 RocksDBTransactionImpl * _t =
787 static_cast<RocksDBTransactionImpl *>(t.get());
7c673cae
FG
788 woptions.disableWAL = disableWAL;
789 lgeneric_subdout(cct, rocksdb, 30) << __func__;
790 RocksWBHandler bat_txc;
791 _t->bat.Iterate(&bat_txc);
792 *_dout << " Rocksdb transaction: " << bat_txc.seen << dendl;
793
794 rocksdb::Status s = db->Write(woptions, &_t->bat);
795 if (!s.ok()) {
796 RocksWBHandler rocks_txc;
797 _t->bat.Iterate(&rocks_txc);
798 derr << __func__ << " error: " << s.ToString() << " code = " << s.code()
799 << " Rocksdb transaction: " << rocks_txc.seen << dendl;
800 }
7c673cae 801
11fdf7f2 802 if (g_conf()->rocksdb_perf) {
7c673cae
FG
803 utime_t write_memtable_time;
804 utime_t write_delay_time;
805 utime_t write_wal_time;
806 utime_t write_pre_and_post_process_time;
807 write_wal_time.set_from_double(
11fdf7f2 808 static_cast<double>(rocksdb::get_perf_context()->write_wal_time)/1000000000);
7c673cae 809 write_memtable_time.set_from_double(
11fdf7f2 810 static_cast<double>(rocksdb::get_perf_context()->write_memtable_time)/1000000000);
7c673cae 811 write_delay_time.set_from_double(
11fdf7f2 812 static_cast<double>(rocksdb::get_perf_context()->write_delay_time)/1000000000);
7c673cae 813 write_pre_and_post_process_time.set_from_double(
11fdf7f2 814 static_cast<double>(rocksdb::get_perf_context()->write_pre_and_post_process_time)/1000000000);
7c673cae
FG
815 logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time);
816 logger->tinc(l_rocksdb_write_delay_time, write_delay_time);
817 logger->tinc(l_rocksdb_write_wal_time, write_wal_time);
818 logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time);
819 }
820
7c673cae
FG
821 return s.ok() ? 0 : -1;
822}
823
11fdf7f2 824int RocksDBStore::submit_transaction(KeyValueDB::Transaction t)
7c673cae
FG
825{
826 utime_t start = ceph_clock_now();
7c673cae 827 rocksdb::WriteOptions woptions;
11fdf7f2 828 woptions.sync = false;
7c673cae 829
11fdf7f2 830 int result = submit_common(woptions, t);
7c673cae 831
11fdf7f2
TL
832 utime_t lat = ceph_clock_now() - start;
833 logger->inc(l_rocksdb_txns);
834 logger->tinc(l_rocksdb_submit_latency, lat);
835
836 return result;
837}
7c673cae 838
11fdf7f2
TL
839int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t)
840{
841 utime_t start = ceph_clock_now();
842 rocksdb::WriteOptions woptions;
843 // if disableWAL, sync can't set
844 woptions.sync = !disableWAL;
845
846 int result = submit_common(woptions, t);
847
848 utime_t lat = ceph_clock_now() - start;
7c673cae
FG
849 logger->inc(l_rocksdb_txns_sync);
850 logger->tinc(l_rocksdb_submit_sync_latency, lat);
851
11fdf7f2 852 return result;
7c673cae
FG
853}
854
855RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db)
856{
857 db = _db;
858}
859
11fdf7f2
TL
860void RocksDBStore::RocksDBTransactionImpl::put_bat(
861 rocksdb::WriteBatch& bat,
862 rocksdb::ColumnFamilyHandle *cf,
863 const string &key,
7c673cae
FG
864 const bufferlist &to_set_bl)
865{
7c673cae
FG
866 // bufferlist::c_str() is non-constant, so we can't call c_str()
867 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
11fdf7f2
TL
868 bat.Put(cf,
869 rocksdb::Slice(key),
870 rocksdb::Slice(to_set_bl.buffers().front().c_str(),
871 to_set_bl.length()));
7c673cae 872 } else {
31f18b77 873 rocksdb::Slice key_slice(key);
c07f9fc5 874 vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
11fdf7f2
TL
875 bat.Put(cf,
876 rocksdb::SliceParts(&key_slice, 1),
c07f9fc5 877 prepare_sliceparts(to_set_bl, &value_slices));
7c673cae
FG
878 }
879}
880
881void RocksDBStore::RocksDBTransactionImpl::set(
882 const string &prefix,
11fdf7f2 883 const string &k,
7c673cae
FG
884 const bufferlist &to_set_bl)
885{
11fdf7f2
TL
886 auto cf = db->get_cf_handle(prefix);
887 if (cf) {
888 put_bat(bat, cf, k, to_set_bl);
889 } else {
890 string key = combine_strings(prefix, k);
891 put_bat(bat, db->default_cf, key, to_set_bl);
892 }
893}
7c673cae 894
11fdf7f2
TL
895void RocksDBStore::RocksDBTransactionImpl::set(
896 const string &prefix,
897 const char *k, size_t keylen,
898 const bufferlist &to_set_bl)
899{
900 auto cf = db->get_cf_handle(prefix);
901 if (cf) {
902 string key(k, keylen); // fixme?
903 put_bat(bat, cf, key, to_set_bl);
7c673cae 904 } else {
11fdf7f2
TL
905 string key;
906 combine_strings(prefix, k, keylen, &key);
907 put_bat(bat, cf, key, to_set_bl);
7c673cae
FG
908 }
909}
910
911void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
912 const string &k)
913{
11fdf7f2
TL
914 auto cf = db->get_cf_handle(prefix);
915 if (cf) {
916 bat.Delete(cf, rocksdb::Slice(k));
917 } else {
918 bat.Delete(db->default_cf, combine_strings(prefix, k));
919 }
7c673cae
FG
920}
921
922void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
923 const char *k,
924 size_t keylen)
925{
11fdf7f2
TL
926 auto cf = db->get_cf_handle(prefix);
927 if (cf) {
928 bat.Delete(cf, rocksdb::Slice(k, keylen));
929 } else {
930 string key;
931 combine_strings(prefix, k, keylen, &key);
932 bat.Delete(db->default_cf, rocksdb::Slice(key));
933 }
7c673cae
FG
934}
935
936void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix,
937 const string &k)
938{
11fdf7f2
TL
939 auto cf = db->get_cf_handle(prefix);
940 if (cf) {
941 bat.SingleDelete(cf, k);
942 } else {
943 bat.SingleDelete(db->default_cf, combine_strings(prefix, k));
944 }
7c673cae
FG
945}
946
947void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
948{
11fdf7f2
TL
949 auto cf = db->get_cf_handle(prefix);
950 if (cf) {
951 if (db->enable_rmrange) {
952 string endprefix("\xff\xff\xff\xff"); // FIXME: this is cheating...
494da23a
TL
953 if (db->max_items_rmrange) {
954 uint64_t cnt = db->max_items_rmrange;
955 bat.SetSavePoint();
956 auto it = db->get_iterator(prefix);
957 for (it->seek_to_first();
958 it->valid();
959 it->next()) {
960 if (!cnt) {
961 bat.RollbackToSavePoint();
962 bat.DeleteRange(cf, string(), endprefix);
963 return;
964 }
965 bat.Delete(cf, rocksdb::Slice(it->key()));
966 --cnt;
967 }
968 bat.PopSavePoint();
969 } else {
970 bat.DeleteRange(cf, string(), endprefix);
971 }
11fdf7f2
TL
972 } else {
973 auto it = db->get_iterator(prefix);
974 for (it->seek_to_first();
975 it->valid();
976 it->next()) {
977 bat.Delete(cf, rocksdb::Slice(it->key()));
978 }
979 }
31f18b77 980 } else {
11fdf7f2
TL
981 if (db->enable_rmrange) {
982 string endprefix = prefix;
983 endprefix.push_back('\x01');
494da23a
TL
984 if (db->max_items_rmrange) {
985 uint64_t cnt = db->max_items_rmrange;
986 bat.SetSavePoint();
987 auto it = db->get_iterator(prefix);
988 for (it->seek_to_first();
989 it->valid();
990 it->next()) {
991 if (!cnt) {
992 bat.RollbackToSavePoint();
993 bat.DeleteRange(db->default_cf,
994 combine_strings(prefix, string()),
995 combine_strings(endprefix, string()));
996 return;
997 }
998 bat.Delete(db->default_cf, combine_strings(prefix, it->key()));
999 --cnt;
1000 }
1001 bat.PopSavePoint();
1002 } else {
1003 bat.DeleteRange(db->default_cf,
1004 combine_strings(prefix, string()),
1005 combine_strings(endprefix, string()));
1006 }
11fdf7f2
TL
1007 } else {
1008 auto it = db->get_iterator(prefix);
1009 for (it->seek_to_first();
1010 it->valid();
1011 it->next()) {
1012 bat.Delete(db->default_cf, combine_strings(prefix, it->key()));
1013 }
31f18b77 1014 }
7c673cae
FG
1015 }
1016}
1017
1018void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix,
1019 const string &start,
1020 const string &end)
1021{
11fdf7f2
TL
1022 auto cf = db->get_cf_handle(prefix);
1023 if (cf) {
1024 if (db->enable_rmrange) {
494da23a
TL
1025 if (db->max_items_rmrange) {
1026 uint64_t cnt = db->max_items_rmrange;
1027 auto it = db->get_iterator(prefix);
1028 bat.SetSavePoint();
1029 it->lower_bound(start);
1030 while (it->valid()) {
1031 if (it->key() >= end) {
1032 break;
1033 }
1034 if (!cnt) {
1035 bat.RollbackToSavePoint();
1036 bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end));
1037 return;
1038 }
1039 bat.Delete(cf, rocksdb::Slice(it->key()));
1040 it->next();
1041 --cnt;
1042 }
1043 bat.PopSavePoint();
1044 } else {
1045 bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end));
1046 }
11fdf7f2
TL
1047 } else {
1048 auto it = db->get_iterator(prefix);
1049 it->lower_bound(start);
1050 while (it->valid()) {
1051 if (it->key() >= end) {
1052 break;
1053 }
1054 bat.Delete(cf, rocksdb::Slice(it->key()));
1055 it->next();
1056 }
1057 }
7c673cae 1058 } else {
11fdf7f2 1059 if (db->enable_rmrange) {
494da23a
TL
1060 if (db->max_items_rmrange) {
1061 uint64_t cnt = db->max_items_rmrange;
1062 auto it = db->get_iterator(prefix);
1063 bat.SetSavePoint();
1064 it->lower_bound(start);
1065 while (it->valid()) {
1066 if (it->key() >= end) {
1067 break;
1068 }
1069 if (!cnt) {
1070 bat.RollbackToSavePoint();
1071 bat.DeleteRange(
1072 db->default_cf,
1073 rocksdb::Slice(combine_strings(prefix, start)),
1074 rocksdb::Slice(combine_strings(prefix, end)));
1075 return;
1076 }
1077 bat.Delete(db->default_cf,
1078 combine_strings(prefix, it->key()));
1079 it->next();
1080 --cnt;
1081 }
1082 bat.PopSavePoint();
1083 } else {
1084 bat.DeleteRange(
1085 db->default_cf,
1086 rocksdb::Slice(combine_strings(prefix, start)),
1087 rocksdb::Slice(combine_strings(prefix, end)));
1088 }
11fdf7f2
TL
1089 } else {
1090 auto it = db->get_iterator(prefix);
1091 it->lower_bound(start);
1092 while (it->valid()) {
1093 if (it->key() >= end) {
1094 break;
1095 }
1096 bat.Delete(db->default_cf,
1097 combine_strings(prefix, it->key()));
1098 it->next();
7c673cae 1099 }
7c673cae
FG
1100 }
1101 }
1102}
1103
1104void RocksDBStore::RocksDBTransactionImpl::merge(
1105 const string &prefix,
1106 const string &k,
1107 const bufferlist &to_set_bl)
1108{
11fdf7f2
TL
1109 auto cf = db->get_cf_handle(prefix);
1110 if (cf) {
1111 // bufferlist::c_str() is non-constant, so we can't call c_str()
1112 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
1113 bat.Merge(
1114 cf,
1115 rocksdb::Slice(k),
1116 rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length()));
1117 } else {
1118 // make a copy
1119 rocksdb::Slice key_slice(k);
1120 vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
1121 bat.Merge(cf, rocksdb::SliceParts(&key_slice, 1),
1122 prepare_sliceparts(to_set_bl, &value_slices));
1123 }
7c673cae 1124 } else {
11fdf7f2
TL
1125 string key = combine_strings(prefix, k);
1126 // bufferlist::c_str() is non-constant, so we can't call c_str()
1127 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
1128 bat.Merge(
1129 db->default_cf,
1130 rocksdb::Slice(key),
1131 rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length()));
1132 } else {
1133 // make a copy
1134 rocksdb::Slice key_slice(key);
1135 vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
1136 bat.Merge(
1137 db->default_cf,
1138 rocksdb::SliceParts(&key_slice, 1),
1139 prepare_sliceparts(to_set_bl, &value_slices));
1140 }
7c673cae
FG
1141 }
1142}
1143
7c673cae
FG
1144int RocksDBStore::get(
1145 const string &prefix,
1146 const std::set<string> &keys,
1147 std::map<string, bufferlist> *out)
1148{
1149 utime_t start = ceph_clock_now();
11fdf7f2
TL
1150 auto cf = get_cf_handle(prefix);
1151 if (cf) {
1152 for (auto& key : keys) {
1153 std::string value;
1154 auto status = db->Get(rocksdb::ReadOptions(),
1155 cf,
1156 rocksdb::Slice(key),
1157 &value);
1158 if (status.ok()) {
1159 (*out)[key].append(value);
1160 } else if (status.IsIOError()) {
1161 ceph_abort_msg(status.getState());
1162 }
1163 }
1164 } else {
1165 for (auto& key : keys) {
1166 std::string value;
1167 string k = combine_strings(prefix, key);
1168 auto status = db->Get(rocksdb::ReadOptions(),
1169 default_cf,
1170 rocksdb::Slice(k),
1171 &value);
1172 if (status.ok()) {
1173 (*out)[key].append(value);
1174 } else if (status.IsIOError()) {
1175 ceph_abort_msg(status.getState());
1176 }
224ce89b 1177 }
7c673cae
FG
1178 }
1179 utime_t lat = ceph_clock_now() - start;
1180 logger->inc(l_rocksdb_gets);
1181 logger->tinc(l_rocksdb_get_latency, lat);
1182 return 0;
1183}
1184
1185int RocksDBStore::get(
1186 const string &prefix,
1187 const string &key,
1188 bufferlist *out)
1189{
11fdf7f2 1190 ceph_assert(out && (out->length() == 0));
7c673cae
FG
1191 utime_t start = ceph_clock_now();
1192 int r = 0;
11fdf7f2 1193 string value;
7c673cae 1194 rocksdb::Status s;
11fdf7f2
TL
1195 auto cf = get_cf_handle(prefix);
1196 if (cf) {
1197 s = db->Get(rocksdb::ReadOptions(),
1198 cf,
1199 rocksdb::Slice(key),
1200 &value);
1201 } else {
1202 string k = combine_strings(prefix, key);
1203 s = db->Get(rocksdb::ReadOptions(),
1204 default_cf,
1205 rocksdb::Slice(k),
1206 &value);
1207 }
7c673cae
FG
1208 if (s.ok()) {
1209 out->append(value);
224ce89b 1210 } else if (s.IsNotFound()) {
7c673cae 1211 r = -ENOENT;
224ce89b 1212 } else {
11fdf7f2 1213 ceph_abort_msg(s.getState());
7c673cae
FG
1214 }
1215 utime_t lat = ceph_clock_now() - start;
1216 logger->inc(l_rocksdb_gets);
1217 logger->tinc(l_rocksdb_get_latency, lat);
1218 return r;
1219}
1220
1221int RocksDBStore::get(
1222 const string& prefix,
1223 const char *key,
1224 size_t keylen,
1225 bufferlist *out)
1226{
11fdf7f2 1227 ceph_assert(out && (out->length() == 0));
7c673cae
FG
1228 utime_t start = ceph_clock_now();
1229 int r = 0;
11fdf7f2 1230 string value;
7c673cae 1231 rocksdb::Status s;
11fdf7f2
TL
1232 auto cf = get_cf_handle(prefix);
1233 if (cf) {
1234 s = db->Get(rocksdb::ReadOptions(),
1235 cf,
1236 rocksdb::Slice(key, keylen),
1237 &value);
1238 } else {
1239 string k;
1240 combine_strings(prefix, key, keylen, &k);
1241 s = db->Get(rocksdb::ReadOptions(),
1242 default_cf,
1243 rocksdb::Slice(k),
1244 &value);
1245 }
7c673cae
FG
1246 if (s.ok()) {
1247 out->append(value);
224ce89b 1248 } else if (s.IsNotFound()) {
7c673cae 1249 r = -ENOENT;
224ce89b 1250 } else {
11fdf7f2 1251 ceph_abort_msg(s.getState());
7c673cae
FG
1252 }
1253 utime_t lat = ceph_clock_now() - start;
1254 logger->inc(l_rocksdb_gets);
1255 logger->tinc(l_rocksdb_get_latency, lat);
1256 return r;
1257}
1258
1259int RocksDBStore::split_key(rocksdb::Slice in, string *prefix, string *key)
1260{
1261 size_t prefix_len = 0;
1262
1263 // Find separator inside Slice
1264 char* separator = (char*) memchr(in.data(), 0, in.size());
1265 if (separator == NULL)
1266 return -EINVAL;
1267 prefix_len = size_t(separator - in.data());
1268 if (prefix_len >= in.size())
1269 return -EINVAL;
1270
1271 // Fetch prefix and/or key directly from Slice
1272 if (prefix)
1273 *prefix = string(in.data(), prefix_len);
1274 if (key)
1275 *key = string(separator+1, in.size()-prefix_len-1);
1276 return 0;
1277}
1278
1279void RocksDBStore::compact()
1280{
1281 logger->inc(l_rocksdb_compact);
1282 rocksdb::CompactRangeOptions options;
11fdf7f2
TL
1283 db->CompactRange(options, default_cf, nullptr, nullptr);
1284 for (auto cf : cf_handles) {
1285 db->CompactRange(
1286 options,
1287 static_cast<rocksdb::ColumnFamilyHandle*>(cf.second),
1288 nullptr, nullptr);
1289 }
7c673cae
FG
1290}
1291
1292
1293void RocksDBStore::compact_thread_entry()
1294{
1295 compact_queue_lock.Lock();
92f5a8d4 1296 dout(10) << __func__ << " enter" << dendl;
7c673cae 1297 while (!compact_queue_stop) {
92f5a8d4 1298 if (!compact_queue.empty()) {
7c673cae
FG
1299 pair<string,string> range = compact_queue.front();
1300 compact_queue.pop_front();
1301 logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
1302 compact_queue_lock.Unlock();
1303 logger->inc(l_rocksdb_compact_range);
11fdf7f2
TL
1304 if (range.first.empty() && range.second.empty()) {
1305 compact();
1306 } else {
1307 compact_range(range.first, range.second);
1308 }
7c673cae
FG
1309 compact_queue_lock.Lock();
1310 continue;
1311 }
92f5a8d4 1312 dout(10) << __func__ << " waiting" << dendl;
7c673cae
FG
1313 compact_queue_cond.Wait(compact_queue_lock);
1314 }
1315 compact_queue_lock.Unlock();
1316}
1317
1318void RocksDBStore::compact_range_async(const string& start, const string& end)
1319{
11fdf7f2 1320 std::lock_guard l(compact_queue_lock);
7c673cae
FG
1321
1322 // try to merge adjacent ranges. this is O(n), but the queue should
1323 // be short. note that we do not cover all overlap cases and merge
1324 // opportunities here, but we capture the ones we currently need.
1325 list< pair<string,string> >::iterator p = compact_queue.begin();
1326 while (p != compact_queue.end()) {
1327 if (p->first == start && p->second == end) {
1328 // dup; no-op
1329 return;
1330 }
1331 if (p->first <= end && p->first > start) {
1332 // merge with existing range to the right
1333 compact_queue.push_back(make_pair(start, p->second));
1334 compact_queue.erase(p);
1335 logger->inc(l_rocksdb_compact_queue_merge);
1336 break;
1337 }
1338 if (p->second >= start && p->second < end) {
1339 // merge with existing range to the left
1340 compact_queue.push_back(make_pair(p->first, end));
1341 compact_queue.erase(p);
1342 logger->inc(l_rocksdb_compact_queue_merge);
1343 break;
1344 }
1345 ++p;
1346 }
1347 if (p == compact_queue.end()) {
1348 // no merge, new entry.
1349 compact_queue.push_back(make_pair(start, end));
1350 logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
1351 }
1352 compact_queue_cond.Signal();
1353 if (!compact_thread.is_started()) {
1354 compact_thread.create("rstore_compact");
1355 }
1356}
1357bool RocksDBStore::check_omap_dir(string &omap_dir)
1358{
1359 rocksdb::Options options;
1360 options.create_if_missing = true;
1361 rocksdb::DB *db;
1362 rocksdb::Status status = rocksdb::DB::Open(options, omap_dir, &db);
1363 delete db;
1364 db = nullptr;
1365 return status.ok();
1366}
1367void RocksDBStore::compact_range(const string& start, const string& end)
1368{
1369 rocksdb::CompactRangeOptions options;
1370 rocksdb::Slice cstart(start);
1371 rocksdb::Slice cend(end);
1372 db->CompactRange(options, &cstart, &cend);
1373}
91327a77 1374
7c673cae
FG
1375RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl()
1376{
1377 delete dbiter;
1378}
1379int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first()
1380{
1381 dbiter->SeekToFirst();
11fdf7f2 1382 ceph_assert(!dbiter->status().IsIOError());
7c673cae
FG
1383 return dbiter->status().ok() ? 0 : -1;
1384}
1385int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string &prefix)
1386{
1387 rocksdb::Slice slice_prefix(prefix);
1388 dbiter->Seek(slice_prefix);
11fdf7f2 1389 ceph_assert(!dbiter->status().IsIOError());
7c673cae
FG
1390 return dbiter->status().ok() ? 0 : -1;
1391}
1392int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last()
1393{
1394 dbiter->SeekToLast();
11fdf7f2 1395 ceph_assert(!dbiter->status().IsIOError());
7c673cae
FG
1396 return dbiter->status().ok() ? 0 : -1;
1397}
1398int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string &prefix)
1399{
1400 string limit = past_prefix(prefix);
1401 rocksdb::Slice slice_limit(limit);
1402 dbiter->Seek(slice_limit);
1403
1404 if (!dbiter->Valid()) {
1405 dbiter->SeekToLast();
1406 } else {
1407 dbiter->Prev();
1408 }
1409 return dbiter->status().ok() ? 0 : -1;
1410}
1411int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after)
1412{
1413 lower_bound(prefix, after);
1414 if (valid()) {
1415 pair<string,string> key = raw_key();
1416 if (key.first == prefix && key.second == after)
1417 next();
1418 }
1419 return dbiter->status().ok() ? 0 : -1;
1420}
1421int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to)
1422{
1423 string bound = combine_strings(prefix, to);
1424 rocksdb::Slice slice_bound(bound);
1425 dbiter->Seek(slice_bound);
1426 return dbiter->status().ok() ? 0 : -1;
1427}
1428bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid()
1429{
1430 return dbiter->Valid();
1431}
1432int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next()
1433{
1434 if (valid()) {
1435 dbiter->Next();
1436 }
11fdf7f2 1437 ceph_assert(!dbiter->status().IsIOError());
7c673cae
FG
1438 return dbiter->status().ok() ? 0 : -1;
1439}
1440int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev()
1441{
1442 if (valid()) {
1443 dbiter->Prev();
1444 }
11fdf7f2 1445 ceph_assert(!dbiter->status().IsIOError());
7c673cae
FG
1446 return dbiter->status().ok() ? 0 : -1;
1447}
1448string RocksDBStore::RocksDBWholeSpaceIteratorImpl::key()
1449{
1450 string out_key;
1451 split_key(dbiter->key(), 0, &out_key);
1452 return out_key;
1453}
1454pair<string,string> RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key()
1455{
1456 string prefix, key;
1457 split_key(dbiter->key(), &prefix, &key);
1458 return make_pair(prefix, key);
1459}
1460
1461bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key_is_prefixed(const string &prefix) {
1462 // Look for "prefix\0" right in rocksb::Slice
1463 rocksdb::Slice key = dbiter->key();
1464 if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) {
1465 return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0;
1466 } else {
1467 return false;
1468 }
1469}
1470
1471bufferlist RocksDBStore::RocksDBWholeSpaceIteratorImpl::value()
1472{
1473 return to_bufferlist(dbiter->value());
1474}
1475
1476size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::key_size()
1477{
1478 return dbiter->key().size();
1479}
1480
1481size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_size()
1482{
1483 return dbiter->value().size();
1484}
1485
1486bufferptr RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_as_ptr()
1487{
1488 rocksdb::Slice val = dbiter->value();
1489 return bufferptr(val.data(), val.size());
1490}
1491
1492int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status()
1493{
1494 return dbiter->status().ok() ? 0 : -1;
1495}
1496
1497string RocksDBStore::past_prefix(const string &prefix)
1498{
1499 string limit = prefix;
1500 limit.push_back(1);
1501 return limit;
1502}
1503
11fdf7f2 1504RocksDBStore::WholeSpaceIterator RocksDBStore::get_wholespace_iterator()
7c673cae
FG
1505{
1506 return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
11fdf7f2 1507 db->NewIterator(rocksdb::ReadOptions(), default_cf));
7c673cae
FG
1508}
1509
11fdf7f2
TL
1510class CFIteratorImpl : public KeyValueDB::IteratorImpl {
1511protected:
1512 string prefix;
1513 rocksdb::Iterator *dbiter;
1514public:
1515 explicit CFIteratorImpl(const std::string& p,
1516 rocksdb::Iterator *iter)
1517 : prefix(p), dbiter(iter) { }
1518 ~CFIteratorImpl() {
1519 delete dbiter;
1520 }
1521
1522 int seek_to_first() override {
1523 dbiter->SeekToFirst();
1524 return dbiter->status().ok() ? 0 : -1;
1525 }
1526 int seek_to_last() override {
1527 dbiter->SeekToLast();
1528 return dbiter->status().ok() ? 0 : -1;
1529 }
1530 int upper_bound(const string &after) override {
1531 lower_bound(after);
1532 if (valid() && (key() == after)) {
1533 next();
1534 }
1535 return dbiter->status().ok() ? 0 : -1;
1536 }
1537 int lower_bound(const string &to) override {
1538 rocksdb::Slice slice_bound(to);
1539 dbiter->Seek(slice_bound);
1540 return dbiter->status().ok() ? 0 : -1;
1541 }
1542 int next() override {
1543 if (valid()) {
1544 dbiter->Next();
1545 }
1546 return dbiter->status().ok() ? 0 : -1;
1547 }
1548 int prev() override {
1549 if (valid()) {
1550 dbiter->Prev();
1551 }
1552 return dbiter->status().ok() ? 0 : -1;
1553 }
1554 bool valid() override {
1555 return dbiter->Valid();
1556 }
1557 string key() override {
1558 return dbiter->key().ToString();
1559 }
1560 std::pair<std::string, std::string> raw_key() override {
1561 return make_pair(prefix, key());
1562 }
1563 bufferlist value() override {
1564 return to_bufferlist(dbiter->value());
1565 }
1566 bufferptr value_as_ptr() override {
1567 rocksdb::Slice val = dbiter->value();
1568 return bufferptr(val.data(), val.size());
1569 }
1570 int status() override {
1571 return dbiter->status().ok() ? 0 : -1;
1572 }
1573};
1574
1575KeyValueDB::Iterator RocksDBStore::get_iterator(const std::string& prefix)
1576{
1577 rocksdb::ColumnFamilyHandle *cf_handle =
1578 static_cast<rocksdb::ColumnFamilyHandle*>(get_cf_handle(prefix));
1579 if (cf_handle) {
1580 return std::make_shared<CFIteratorImpl>(
1581 prefix,
1582 db->NewIterator(rocksdb::ReadOptions(), cf_handle));
1583 } else {
1584 return KeyValueDB::get_iterator(prefix);
1585 }
1586}