]> git.proxmox.com Git - ceph.git/blob - ceph/src/kv/RocksDBStore.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / kv / RocksDBStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <set>
5 #include <map>
6 #include <string>
7 #include <memory>
8 #include <errno.h>
9 #include <unistd.h>
10 #include <sys/types.h>
11 #include <sys/stat.h>
12
13 #include "rocksdb/db.h"
14 #include "rocksdb/table.h"
15 #include "rocksdb/env.h"
16 #include "rocksdb/slice.h"
17 #include "rocksdb/cache.h"
18 #include "rocksdb/filter_policy.h"
19 #include "rocksdb/utilities/convenience.h"
20 #include "rocksdb/merge_operator.h"
21
22 using std::string;
23 #include "common/perf_counters.h"
24 #include "common/PriorityCache.h"
25 #include "include/str_list.h"
26 #include "include/stringify.h"
27 #include "include/str_map.h"
28 #include "KeyValueDB.h"
29 #include "RocksDBStore.h"
30
31 #include "common/debug.h"
32
33 #define dout_context cct
34 #define dout_subsys ceph_subsys_rocksdb
35 #undef dout_prefix
36 #define dout_prefix *_dout << "rocksdb: "
37
38 static bufferlist to_bufferlist(rocksdb::Slice in) {
39 bufferlist bl;
40 bl.append(bufferptr(in.data(), in.size()));
41 return bl;
42 }
43
44 static rocksdb::SliceParts prepare_sliceparts(const bufferlist &bl,
45 vector<rocksdb::Slice> *slices)
46 {
47 unsigned n = 0;
48 for (auto& buf : bl.buffers()) {
49 (*slices)[n].data_ = buf.c_str();
50 (*slices)[n].size_ = buf.length();
51 n++;
52 }
53 return rocksdb::SliceParts(slices->data(), slices->size());
54 }
55
56
57 //
58 // One of these for the default rocksdb column family, routing each prefix
59 // to the appropriate MergeOperator.
60 //
61 class RocksDBStore::MergeOperatorRouter
62 : public rocksdb::AssociativeMergeOperator
63 {
64 RocksDBStore& store;
65 public:
66 const char *Name() const override {
67 // Construct a name that rocksDB will validate against. We want to
68 // do this in a way that doesn't constrain the ordering of calls
69 // to set_merge_operator, so sort the merge operators and then
70 // construct a name from all of those parts.
71 store.assoc_name.clear();
72 map<std::string,std::string> names;
73
74 for (auto& p : store.merge_ops) {
75 names[p.first] = p.second->name();
76 }
77 for (auto& p : store.cf_handles) {
78 names.erase(p.first);
79 }
80 for (auto& p : names) {
81 store.assoc_name += '.';
82 store.assoc_name += p.first;
83 store.assoc_name += ':';
84 store.assoc_name += p.second;
85 }
86 return store.assoc_name.c_str();
87 }
88
89 explicit MergeOperatorRouter(RocksDBStore &_store) : store(_store) {}
90
91 bool Merge(const rocksdb::Slice& key,
92 const rocksdb::Slice* existing_value,
93 const rocksdb::Slice& value,
94 std::string* new_value,
95 rocksdb::Logger* logger) const override {
96 // for default column family
97 // extract prefix from key and compare against each registered merge op;
98 // even though merge operator for explicit CF is included in merge_ops,
99 // it won't be picked up, since it won't match.
100 for (auto& p : store.merge_ops) {
101 if (p.first.compare(0, p.first.length(),
102 key.data(), p.first.length()) == 0 &&
103 key.data()[p.first.length()] == 0) {
104 if (existing_value) {
105 p.second->merge(existing_value->data(), existing_value->size(),
106 value.data(), value.size(),
107 new_value);
108 } else {
109 p.second->merge_nonexistent(value.data(), value.size(), new_value);
110 }
111 break;
112 }
113 }
114 return true; // OK :)
115 }
116 };
117
118 //
119 // One of these per non-default column family, linked directly to the
120 // merge operator for that CF/prefix (if any).
121 //
122 class RocksDBStore::MergeOperatorLinker
123 : public rocksdb::AssociativeMergeOperator
124 {
125 private:
126 std::shared_ptr<KeyValueDB::MergeOperator> mop;
127 public:
128 explicit MergeOperatorLinker(const std::shared_ptr<KeyValueDB::MergeOperator> &o) : mop(o) {}
129
130 const char *Name() const override {
131 return mop->name();
132 }
133
134 bool Merge(const rocksdb::Slice& key,
135 const rocksdb::Slice* existing_value,
136 const rocksdb::Slice& value,
137 std::string* new_value,
138 rocksdb::Logger* logger) const override {
139 if (existing_value) {
140 mop->merge(existing_value->data(), existing_value->size(),
141 value.data(), value.size(),
142 new_value);
143 } else {
144 mop->merge_nonexistent(value.data(), value.size(), new_value);
145 }
146 return true;
147 }
148 };
149
150 int RocksDBStore::set_merge_operator(
151 const string& prefix,
152 std::shared_ptr<KeyValueDB::MergeOperator> mop)
153 {
154 // If you fail here, it's because you can't do this on an open database
155 ceph_assert(db == nullptr);
156 merge_ops.push_back(std::make_pair(prefix,mop));
157 return 0;
158 }
159
160 class CephRocksdbLogger : public rocksdb::Logger {
161 CephContext *cct;
162 public:
163 explicit CephRocksdbLogger(CephContext *c) : cct(c) {
164 cct->get();
165 }
166 ~CephRocksdbLogger() override {
167 cct->put();
168 }
169
170 // Write an entry to the log file with the specified format.
171 void Logv(const char* format, va_list ap) override {
172 Logv(rocksdb::INFO_LEVEL, format, ap);
173 }
174
175 // Write an entry to the log file with the specified log level
176 // and format. Any log with level under the internal log level
177 // of *this (see @SetInfoLogLevel and @GetInfoLogLevel) will not be
178 // printed.
179 void Logv(const rocksdb::InfoLogLevel log_level, const char* format,
180 va_list ap) override {
181 int v = rocksdb::NUM_INFO_LOG_LEVELS - log_level - 1;
182 dout(ceph::dout::need_dynamic(v));
183 char buf[65536];
184 vsnprintf(buf, sizeof(buf), format, ap);
185 *_dout << buf << dendl;
186 }
187 };
188
189 rocksdb::Logger *create_rocksdb_ceph_logger()
190 {
191 return new CephRocksdbLogger(g_ceph_context);
192 }
193
194 static int string2bool(const string &val, bool &b_val)
195 {
196 if (strcasecmp(val.c_str(), "false") == 0) {
197 b_val = false;
198 return 0;
199 } else if (strcasecmp(val.c_str(), "true") == 0) {
200 b_val = true;
201 return 0;
202 } else {
203 std::string err;
204 int b = strict_strtol(val.c_str(), 10, &err);
205 if (!err.empty())
206 return -EINVAL;
207 b_val = !!b;
208 return 0;
209 }
210 }
211
212 int RocksDBStore::tryInterpret(const string &key, const string &val, rocksdb::Options &opt)
213 {
214 if (key == "compaction_threads") {
215 std::string err;
216 int f = strict_iecstrtoll(val.c_str(), &err);
217 if (!err.empty())
218 return -EINVAL;
219 //Low priority threadpool is used for compaction
220 opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::LOW);
221 } else if (key == "flusher_threads") {
222 std::string err;
223 int f = strict_iecstrtoll(val.c_str(), &err);
224 if (!err.empty())
225 return -EINVAL;
226 //High priority threadpool is used for flusher
227 opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::HIGH);
228 } else if (key == "compact_on_mount") {
229 int ret = string2bool(val, compact_on_mount);
230 if (ret != 0)
231 return ret;
232 } else if (key == "disableWAL") {
233 int ret = string2bool(val, disableWAL);
234 if (ret != 0)
235 return ret;
236 } else {
237 //unrecognize config options.
238 return -EINVAL;
239 }
240 return 0;
241 }
242
243 int RocksDBStore::ParseOptionsFromString(const string &opt_str, rocksdb::Options &opt)
244 {
245 map<string, string> str_map;
246 int r = get_str_map(opt_str, &str_map, ",\n;");
247 if (r < 0)
248 return r;
249 map<string, string>::iterator it;
250 for(it = str_map.begin(); it != str_map.end(); ++it) {
251 string this_opt = it->first + "=" + it->second;
252 rocksdb::Status status = rocksdb::GetOptionsFromString(opt, this_opt , &opt);
253 if (!status.ok()) {
254 //unrecognized by rocksdb, try to interpret by ourselves.
255 r = tryInterpret(it->first, it->second, opt);
256 if (r < 0) {
257 derr << status.ToString() << dendl;
258 return -EINVAL;
259 }
260 }
261 lgeneric_dout(cct, 0) << " set rocksdb option " << it->first
262 << " = " << it->second << dendl;
263 }
264 return 0;
265 }
266
267 int RocksDBStore::init(string _options_str)
268 {
269 options_str = _options_str;
270 rocksdb::Options opt;
271 //try parse options
272 if (options_str.length()) {
273 int r = ParseOptionsFromString(options_str, opt);
274 if (r != 0) {
275 return -EINVAL;
276 }
277 }
278 return 0;
279 }
280
281 int RocksDBStore::create_db_dir()
282 {
283 if (env) {
284 unique_ptr<rocksdb::Directory> dir;
285 env->NewDirectory(path, &dir);
286 } else {
287 int r = ::mkdir(path.c_str(), 0755);
288 if (r < 0)
289 r = -errno;
290 if (r < 0 && r != -EEXIST) {
291 derr << __func__ << " failed to create " << path << ": " << cpp_strerror(r)
292 << dendl;
293 return r;
294 }
295 }
296 return 0;
297 }
298
299 int RocksDBStore::install_cf_mergeop(
300 const string &cf_name,
301 rocksdb::ColumnFamilyOptions *cf_opt)
302 {
303 ceph_assert(cf_opt != nullptr);
304 cf_opt->merge_operator.reset();
305 for (auto& i : merge_ops) {
306 if (i.first == cf_name) {
307 cf_opt->merge_operator.reset(new MergeOperatorLinker(i.second));
308 }
309 }
310 return 0;
311 }
312
313 int RocksDBStore::create_and_open(ostream &out,
314 const vector<ColumnFamily>& cfs)
315 {
316 int r = create_db_dir();
317 if (r < 0)
318 return r;
319 if (cfs.empty()) {
320 return do_open(out, true, false, nullptr);
321 } else {
322 return do_open(out, true, false, &cfs);
323 }
324 }
325
326 int RocksDBStore::load_rocksdb_options(bool create_if_missing, rocksdb::Options& opt)
327 {
328 rocksdb::Status status;
329
330 if (options_str.length()) {
331 int r = ParseOptionsFromString(options_str, opt);
332 if (r != 0) {
333 return -EINVAL;
334 }
335 }
336
337 if (g_conf()->rocksdb_perf) {
338 dbstats = rocksdb::CreateDBStatistics();
339 opt.statistics = dbstats;
340 }
341
342 opt.create_if_missing = create_if_missing;
343 if (kv_options.count("separate_wal_dir")) {
344 opt.wal_dir = path + ".wal";
345 }
346
347 // Since ceph::for_each_substr doesn't return a value and
348 // std::stoull does throw, we may as well just catch everything here.
349 try {
350 if (kv_options.count("db_paths")) {
351 list<string> paths;
352 get_str_list(kv_options["db_paths"], "; \t", paths);
353 for (auto& p : paths) {
354 size_t pos = p.find(',');
355 if (pos == std::string::npos) {
356 derr << __func__ << " invalid db path item " << p << " in "
357 << kv_options["db_paths"] << dendl;
358 return -EINVAL;
359 }
360 string path = p.substr(0, pos);
361 string size_str = p.substr(pos + 1);
362 uint64_t size = atoll(size_str.c_str());
363 if (!size) {
364 derr << __func__ << " invalid db path item " << p << " in "
365 << kv_options["db_paths"] << dendl;
366 return -EINVAL;
367 }
368 opt.db_paths.push_back(rocksdb::DbPath(path, size));
369 dout(10) << __func__ << " db_path " << path << " size " << size << dendl;
370 }
371 }
372 } catch (const std::system_error& e) {
373 return -e.code().value();
374 }
375
376 if (g_conf()->rocksdb_log_to_ceph_log) {
377 opt.info_log.reset(new CephRocksdbLogger(g_ceph_context));
378 }
379
380 if (priv) {
381 dout(10) << __func__ << " using custom Env " << priv << dendl;
382 opt.env = static_cast<rocksdb::Env*>(priv);
383 }
384
385 // caches
386 if (!set_cache_flag) {
387 cache_size = g_conf()->rocksdb_cache_size;
388 }
389 uint64_t row_cache_size = cache_size * g_conf()->rocksdb_cache_row_ratio;
390 uint64_t block_cache_size = cache_size - row_cache_size;
391
392 if (g_conf()->rocksdb_cache_type == "binned_lru") {
393 bbt_opts.block_cache = rocksdb_cache::NewBinnedLRUCache(
394 cct,
395 block_cache_size,
396 g_conf()->rocksdb_cache_shard_bits);
397 } else if (g_conf()->rocksdb_cache_type == "lru") {
398 bbt_opts.block_cache = rocksdb::NewLRUCache(
399 block_cache_size,
400 g_conf()->rocksdb_cache_shard_bits);
401 } else if (g_conf()->rocksdb_cache_type == "clock") {
402 bbt_opts.block_cache = rocksdb::NewClockCache(
403 block_cache_size,
404 g_conf()->rocksdb_cache_shard_bits);
405 if (!bbt_opts.block_cache) {
406 derr << "rocksdb_cache_type '" << g_conf()->rocksdb_cache_type
407 << "' chosen, but RocksDB not compiled with LibTBB. "
408 << dendl;
409 return -EINVAL;
410 }
411 } else {
412 derr << "unrecognized rocksdb_cache_type '" << g_conf()->rocksdb_cache_type
413 << "'" << dendl;
414 return -EINVAL;
415 }
416 bbt_opts.block_size = g_conf()->rocksdb_block_size;
417
418 if (row_cache_size > 0)
419 opt.row_cache = rocksdb::NewLRUCache(row_cache_size,
420 g_conf()->rocksdb_cache_shard_bits);
421 uint64_t bloom_bits = g_conf().get_val<uint64_t>("rocksdb_bloom_bits_per_key");
422 if (bloom_bits > 0) {
423 dout(10) << __func__ << " set bloom filter bits per key to "
424 << bloom_bits << dendl;
425 bbt_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(bloom_bits));
426 }
427 using std::placeholders::_1;
428 if (g_conf().with_val<std::string>("rocksdb_index_type",
429 std::bind(std::equal_to<std::string>(), _1,
430 "binary_search")))
431 bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kBinarySearch;
432 if (g_conf().with_val<std::string>("rocksdb_index_type",
433 std::bind(std::equal_to<std::string>(), _1,
434 "hash_search")))
435 bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kHashSearch;
436 if (g_conf().with_val<std::string>("rocksdb_index_type",
437 std::bind(std::equal_to<std::string>(), _1,
438 "two_level")))
439 bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
440 if (!bbt_opts.no_block_cache) {
441 bbt_opts.cache_index_and_filter_blocks =
442 g_conf().get_val<bool>("rocksdb_cache_index_and_filter_blocks");
443 bbt_opts.cache_index_and_filter_blocks_with_high_priority =
444 g_conf().get_val<bool>("rocksdb_cache_index_and_filter_blocks_with_high_priority");
445 bbt_opts.pin_l0_filter_and_index_blocks_in_cache =
446 g_conf().get_val<bool>("rocksdb_pin_l0_filter_and_index_blocks_in_cache");
447 }
448 bbt_opts.partition_filters = g_conf().get_val<bool>("rocksdb_partition_filters");
449 if (g_conf().get_val<Option::size_t>("rocksdb_metadata_block_size") > 0)
450 bbt_opts.metadata_block_size = g_conf().get_val<Option::size_t>("rocksdb_metadata_block_size");
451
452 opt.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbt_opts));
453 dout(10) << __func__ << " block size " << g_conf()->rocksdb_block_size
454 << ", block_cache size " << byte_u_t(block_cache_size)
455 << ", row_cache size " << byte_u_t(row_cache_size)
456 << "; shards "
457 << (1 << g_conf()->rocksdb_cache_shard_bits)
458 << ", type " << g_conf()->rocksdb_cache_type
459 << dendl;
460
461 opt.merge_operator.reset(new MergeOperatorRouter(*this));
462
463 return 0;
464 }
465
466 int RocksDBStore::do_open(ostream &out,
467 bool create_if_missing,
468 bool open_readonly,
469 const vector<ColumnFamily>* cfs)
470 {
471 ceph_assert(!(create_if_missing && open_readonly));
472 rocksdb::Options opt;
473 int r = load_rocksdb_options(create_if_missing, opt);
474 if (r) {
475 dout(1) << __func__ << " load rocksdb options failed" << dendl;
476 return r;
477 }
478 rocksdb::Status status;
479 if (create_if_missing) {
480 status = rocksdb::DB::Open(opt, path, &db);
481 if (!status.ok()) {
482 derr << status.ToString() << dendl;
483 return -EINVAL;
484 }
485 // create and open column families
486 if (cfs) {
487 for (auto& p : *cfs) {
488 // copy default CF settings, block cache, merge operators as
489 // the base for new CF
490 rocksdb::ColumnFamilyOptions cf_opt(opt);
491 // user input options will override the base options
492 status = rocksdb::GetColumnFamilyOptionsFromString(
493 cf_opt, p.option, &cf_opt);
494 if (!status.ok()) {
495 derr << __func__ << " invalid db column family option string for CF: "
496 << p.name << dendl;
497 return -EINVAL;
498 }
499 install_cf_mergeop(p.name, &cf_opt);
500 rocksdb::ColumnFamilyHandle *cf;
501 status = db->CreateColumnFamily(cf_opt, p.name, &cf);
502 if (!status.ok()) {
503 derr << __func__ << " Failed to create rocksdb column family: "
504 << p.name << dendl;
505 return -EINVAL;
506 }
507 // store the new CF handle
508 add_column_family(p.name, static_cast<void*>(cf));
509 }
510 }
511 default_cf = db->DefaultColumnFamily();
512 } else {
513 std::vector<string> existing_cfs;
514 status = rocksdb::DB::ListColumnFamilies(
515 rocksdb::DBOptions(opt),
516 path,
517 &existing_cfs);
518 dout(1) << __func__ << " column families: " << existing_cfs << dendl;
519 if (existing_cfs.empty()) {
520 // no column families
521 if (open_readonly) {
522 status = rocksdb::DB::Open(opt, path, &db);
523 } else {
524 status = rocksdb::DB::OpenForReadOnly(opt, path, &db);
525 }
526 if (!status.ok()) {
527 derr << status.ToString() << dendl;
528 return -EINVAL;
529 }
530 default_cf = db->DefaultColumnFamily();
531 } else {
532 // we cannot change column families for a created database. so, map
533 // what options we are given to whatever cf's already exist.
534 std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
535 for (auto& n : existing_cfs) {
536 // copy default CF settings, block cache, merge operators as
537 // the base for new CF
538 rocksdb::ColumnFamilyOptions cf_opt(opt);
539 bool found = false;
540 if (cfs) {
541 for (auto& i : *cfs) {
542 if (i.name == n) {
543 found = true;
544 status = rocksdb::GetColumnFamilyOptionsFromString(
545 cf_opt, i.option, &cf_opt);
546 if (!status.ok()) {
547 derr << __func__ << " invalid db column family options for CF '"
548 << i.name << "': " << i.option << dendl;
549 return -EINVAL;
550 }
551 }
552 }
553 }
554 if (n != rocksdb::kDefaultColumnFamilyName) {
555 install_cf_mergeop(n, &cf_opt);
556 }
557 column_families.push_back(rocksdb::ColumnFamilyDescriptor(n, cf_opt));
558 if (!found && n != rocksdb::kDefaultColumnFamilyName) {
559 dout(1) << __func__ << " column family '" << n
560 << "' exists but not expected" << dendl;
561 }
562 }
563 std::vector<rocksdb::ColumnFamilyHandle*> handles;
564 if (open_readonly) {
565 status = rocksdb::DB::OpenForReadOnly(rocksdb::DBOptions(opt),
566 path, column_families,
567 &handles, &db);
568 } else {
569 status = rocksdb::DB::Open(rocksdb::DBOptions(opt),
570 path, column_families, &handles, &db);
571 }
572 if (!status.ok()) {
573 derr << status.ToString() << dendl;
574 return -EINVAL;
575 }
576 for (unsigned i = 0; i < existing_cfs.size(); ++i) {
577 if (existing_cfs[i] == rocksdb::kDefaultColumnFamilyName) {
578 default_cf = handles[i];
579 must_close_default_cf = true;
580 } else {
581 add_column_family(existing_cfs[i], static_cast<void*>(handles[i]));
582 }
583 }
584 }
585 }
586 ceph_assert(default_cf != nullptr);
587
588 PerfCountersBuilder plb(g_ceph_context, "rocksdb", l_rocksdb_first, l_rocksdb_last);
589 plb.add_u64_counter(l_rocksdb_gets, "get", "Gets");
590 plb.add_u64_counter(l_rocksdb_txns, "submit_transaction", "Submit transactions");
591 plb.add_u64_counter(l_rocksdb_txns_sync, "submit_transaction_sync", "Submit transactions sync");
592 plb.add_time_avg(l_rocksdb_get_latency, "get_latency", "Get latency");
593 plb.add_time_avg(l_rocksdb_submit_latency, "submit_latency", "Submit Latency");
594 plb.add_time_avg(l_rocksdb_submit_sync_latency, "submit_sync_latency", "Submit Sync Latency");
595 plb.add_u64_counter(l_rocksdb_compact, "compact", "Compactions");
596 plb.add_u64_counter(l_rocksdb_compact_range, "compact_range", "Compactions by range");
597 plb.add_u64_counter(l_rocksdb_compact_queue_merge, "compact_queue_merge", "Mergings of ranges in compaction queue");
598 plb.add_u64(l_rocksdb_compact_queue_len, "compact_queue_len", "Length of compaction queue");
599 plb.add_time_avg(l_rocksdb_write_wal_time, "rocksdb_write_wal_time", "Rocksdb write wal time");
600 plb.add_time_avg(l_rocksdb_write_memtable_time, "rocksdb_write_memtable_time", "Rocksdb write memtable time");
601 plb.add_time_avg(l_rocksdb_write_delay_time, "rocksdb_write_delay_time", "Rocksdb write delay time");
602 plb.add_time_avg(l_rocksdb_write_pre_and_post_process_time,
603 "rocksdb_write_pre_and_post_time", "total time spent on writing a record, excluding write process");
604 logger = plb.create_perf_counters();
605 cct->get_perfcounters_collection()->add(logger);
606
607 if (compact_on_mount) {
608 derr << "Compacting rocksdb store..." << dendl;
609 compact();
610 derr << "Finished compacting rocksdb store" << dendl;
611 }
612 return 0;
613 }
614
615 int RocksDBStore::_test_init(const string& dir)
616 {
617 rocksdb::Options options;
618 options.create_if_missing = true;
619 rocksdb::DB *db;
620 rocksdb::Status status = rocksdb::DB::Open(options, dir, &db);
621 delete db;
622 db = nullptr;
623 return status.ok() ? 0 : -EIO;
624 }
625
626 RocksDBStore::~RocksDBStore()
627 {
628 close();
629 delete logger;
630
631 // Ensure db is destroyed before dependent db_cache and filterpolicy
632 for (auto& p : cf_handles) {
633 db->DestroyColumnFamilyHandle(
634 static_cast<rocksdb::ColumnFamilyHandle*>(p.second));
635 p.second = nullptr;
636 }
637 if (must_close_default_cf) {
638 db->DestroyColumnFamilyHandle(default_cf);
639 must_close_default_cf = false;
640 }
641 default_cf = nullptr;
642 delete db;
643 db = nullptr;
644
645 if (priv) {
646 delete static_cast<rocksdb::Env*>(priv);
647 }
648 }
649
650 void RocksDBStore::close()
651 {
652 // stop compaction thread
653 compact_queue_lock.Lock();
654 if (compact_thread.is_started()) {
655 compact_queue_stop = true;
656 compact_queue_cond.Signal();
657 compact_queue_lock.Unlock();
658 compact_thread.join();
659 } else {
660 compact_queue_lock.Unlock();
661 }
662
663 if (logger)
664 cct->get_perfcounters_collection()->remove(logger);
665 }
666
667 int RocksDBStore::repair(std::ostream &out)
668 {
669 rocksdb::Options opt;
670 int r = load_rocksdb_options(false, opt);
671 if (r) {
672 dout(1) << __func__ << " load rocksdb options failed" << dendl;
673 out << "load rocksdb options failed" << std::endl;
674 return r;
675 }
676 rocksdb::Status status = rocksdb::RepairDB(path, opt);
677 if (status.ok()) {
678 return 0;
679 } else {
680 out << "repair rocksdb failed : " << status.ToString() << std::endl;
681 return 1;
682 }
683 }
684
685 void RocksDBStore::split_stats(const std::string &s, char delim, std::vector<std::string> &elems) {
686 std::stringstream ss;
687 ss.str(s);
688 std::string item;
689 while (std::getline(ss, item, delim)) {
690 elems.push_back(item);
691 }
692 }
693
694 int64_t RocksDBStore::estimate_prefix_size(const string& prefix)
695 {
696 auto cf = get_cf_handle(prefix);
697 uint64_t size = 0;
698 uint8_t flags =
699 //rocksdb::DB::INCLUDE_MEMTABLES | // do not include memtables...
700 rocksdb::DB::INCLUDE_FILES;
701 if (cf) {
702 string start(1, '\x00');
703 string limit("\xff\xff\xff\xff");
704 rocksdb::Range r(start, limit);
705 db->GetApproximateSizes(cf, &r, 1, &size, flags);
706 } else {
707 string limit = prefix + "\xff\xff\xff\xff";
708 rocksdb::Range r(prefix, limit);
709 db->GetApproximateSizes(default_cf,
710 &r, 1, &size, flags);
711 }
712 return size;
713 }
714
715 void RocksDBStore::get_statistics(Formatter *f)
716 {
717 if (!g_conf()->rocksdb_perf) {
718 dout(20) << __func__ << " RocksDB perf is disabled, can't probe for stats"
719 << dendl;
720 return;
721 }
722
723 if (g_conf()->rocksdb_collect_compaction_stats) {
724 std::string stat_str;
725 bool status = db->GetProperty("rocksdb.stats", &stat_str);
726 if (status) {
727 f->open_object_section("rocksdb_statistics");
728 f->dump_string("rocksdb_compaction_statistics", "");
729 vector<string> stats;
730 split_stats(stat_str, '\n', stats);
731 for (auto st :stats) {
732 f->dump_string("", st);
733 }
734 f->close_section();
735 }
736 }
737 if (g_conf()->rocksdb_collect_extended_stats) {
738 if (dbstats) {
739 f->open_object_section("rocksdb_extended_statistics");
740 string stat_str = dbstats->ToString();
741 vector<string> stats;
742 split_stats(stat_str, '\n', stats);
743 f->dump_string("rocksdb_extended_statistics", "");
744 for (auto st :stats) {
745 f->dump_string(".", st);
746 }
747 f->close_section();
748 }
749 f->open_object_section("rocksdbstore_perf_counters");
750 logger->dump_formatted(f,0);
751 f->close_section();
752 }
753 if (g_conf()->rocksdb_collect_memory_stats) {
754 f->open_object_section("rocksdb_memtable_statistics");
755 std::string str;
756 if (!bbt_opts.no_block_cache) {
757 str.append(stringify(bbt_opts.block_cache->GetUsage()));
758 f->dump_string("block_cache_usage", str.data());
759 str.clear();
760 str.append(stringify(bbt_opts.block_cache->GetPinnedUsage()));
761 f->dump_string("block_cache_pinned_blocks_usage", str);
762 str.clear();
763 }
764 db->GetProperty("rocksdb.cur-size-all-mem-tables", &str);
765 f->dump_string("rocksdb_memtable_usage", str);
766 str.clear();
767 db->GetProperty("rocksdb.estimate-table-readers-mem", &str);
768 f->dump_string("rocksdb_index_filter_blocks_usage", str);
769 f->close_section();
770 }
771 }
772
773 int RocksDBStore::submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t)
774 {
775 // enable rocksdb breakdown
776 // considering performance overhead, default is disabled
777 if (g_conf()->rocksdb_perf) {
778 rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
779 rocksdb::get_perf_context()->Reset();
780 }
781
782 RocksDBTransactionImpl * _t =
783 static_cast<RocksDBTransactionImpl *>(t.get());
784 woptions.disableWAL = disableWAL;
785 lgeneric_subdout(cct, rocksdb, 30) << __func__;
786 RocksWBHandler bat_txc;
787 _t->bat.Iterate(&bat_txc);
788 *_dout << " Rocksdb transaction: " << bat_txc.seen << dendl;
789
790 rocksdb::Status s = db->Write(woptions, &_t->bat);
791 if (!s.ok()) {
792 RocksWBHandler rocks_txc;
793 _t->bat.Iterate(&rocks_txc);
794 derr << __func__ << " error: " << s.ToString() << " code = " << s.code()
795 << " Rocksdb transaction: " << rocks_txc.seen << dendl;
796 }
797
798 if (g_conf()->rocksdb_perf) {
799 utime_t write_memtable_time;
800 utime_t write_delay_time;
801 utime_t write_wal_time;
802 utime_t write_pre_and_post_process_time;
803 write_wal_time.set_from_double(
804 static_cast<double>(rocksdb::get_perf_context()->write_wal_time)/1000000000);
805 write_memtable_time.set_from_double(
806 static_cast<double>(rocksdb::get_perf_context()->write_memtable_time)/1000000000);
807 write_delay_time.set_from_double(
808 static_cast<double>(rocksdb::get_perf_context()->write_delay_time)/1000000000);
809 write_pre_and_post_process_time.set_from_double(
810 static_cast<double>(rocksdb::get_perf_context()->write_pre_and_post_process_time)/1000000000);
811 logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time);
812 logger->tinc(l_rocksdb_write_delay_time, write_delay_time);
813 logger->tinc(l_rocksdb_write_wal_time, write_wal_time);
814 logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time);
815 }
816
817 return s.ok() ? 0 : -1;
818 }
819
820 int RocksDBStore::submit_transaction(KeyValueDB::Transaction t)
821 {
822 utime_t start = ceph_clock_now();
823 rocksdb::WriteOptions woptions;
824 woptions.sync = false;
825
826 int result = submit_common(woptions, t);
827
828 utime_t lat = ceph_clock_now() - start;
829 logger->inc(l_rocksdb_txns);
830 logger->tinc(l_rocksdb_submit_latency, lat);
831
832 return result;
833 }
834
835 int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t)
836 {
837 utime_t start = ceph_clock_now();
838 rocksdb::WriteOptions woptions;
839 // if disableWAL, sync can't set
840 woptions.sync = !disableWAL;
841
842 int result = submit_common(woptions, t);
843
844 utime_t lat = ceph_clock_now() - start;
845 logger->inc(l_rocksdb_txns_sync);
846 logger->tinc(l_rocksdb_submit_sync_latency, lat);
847
848 return result;
849 }
850
851 RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db)
852 {
853 db = _db;
854 }
855
856 void RocksDBStore::RocksDBTransactionImpl::put_bat(
857 rocksdb::WriteBatch& bat,
858 rocksdb::ColumnFamilyHandle *cf,
859 const string &key,
860 const bufferlist &to_set_bl)
861 {
862 // bufferlist::c_str() is non-constant, so we can't call c_str()
863 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
864 bat.Put(cf,
865 rocksdb::Slice(key),
866 rocksdb::Slice(to_set_bl.buffers().front().c_str(),
867 to_set_bl.length()));
868 } else {
869 rocksdb::Slice key_slice(key);
870 vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
871 bat.Put(cf,
872 rocksdb::SliceParts(&key_slice, 1),
873 prepare_sliceparts(to_set_bl, &value_slices));
874 }
875 }
876
877 void RocksDBStore::RocksDBTransactionImpl::set(
878 const string &prefix,
879 const string &k,
880 const bufferlist &to_set_bl)
881 {
882 auto cf = db->get_cf_handle(prefix);
883 if (cf) {
884 put_bat(bat, cf, k, to_set_bl);
885 } else {
886 string key = combine_strings(prefix, k);
887 put_bat(bat, db->default_cf, key, to_set_bl);
888 }
889 }
890
891 void RocksDBStore::RocksDBTransactionImpl::set(
892 const string &prefix,
893 const char *k, size_t keylen,
894 const bufferlist &to_set_bl)
895 {
896 auto cf = db->get_cf_handle(prefix);
897 if (cf) {
898 string key(k, keylen); // fixme?
899 put_bat(bat, cf, key, to_set_bl);
900 } else {
901 string key;
902 combine_strings(prefix, k, keylen, &key);
903 put_bat(bat, cf, key, to_set_bl);
904 }
905 }
906
907 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
908 const string &k)
909 {
910 auto cf = db->get_cf_handle(prefix);
911 if (cf) {
912 bat.Delete(cf, rocksdb::Slice(k));
913 } else {
914 bat.Delete(db->default_cf, combine_strings(prefix, k));
915 }
916 }
917
918 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
919 const char *k,
920 size_t keylen)
921 {
922 auto cf = db->get_cf_handle(prefix);
923 if (cf) {
924 bat.Delete(cf, rocksdb::Slice(k, keylen));
925 } else {
926 string key;
927 combine_strings(prefix, k, keylen, &key);
928 bat.Delete(db->default_cf, rocksdb::Slice(key));
929 }
930 }
931
932 void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix,
933 const string &k)
934 {
935 auto cf = db->get_cf_handle(prefix);
936 if (cf) {
937 bat.SingleDelete(cf, k);
938 } else {
939 bat.SingleDelete(db->default_cf, combine_strings(prefix, k));
940 }
941 }
942
943 void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
944 {
945 auto cf = db->get_cf_handle(prefix);
946 if (cf) {
947 if (db->enable_rmrange) {
948 string endprefix("\xff\xff\xff\xff"); // FIXME: this is cheating...
949 if (db->max_items_rmrange) {
950 uint64_t cnt = db->max_items_rmrange;
951 bat.SetSavePoint();
952 auto it = db->get_iterator(prefix);
953 for (it->seek_to_first();
954 it->valid();
955 it->next()) {
956 if (!cnt) {
957 bat.RollbackToSavePoint();
958 bat.DeleteRange(cf, string(), endprefix);
959 return;
960 }
961 bat.Delete(cf, rocksdb::Slice(it->key()));
962 --cnt;
963 }
964 bat.PopSavePoint();
965 } else {
966 bat.DeleteRange(cf, string(), endprefix);
967 }
968 } else {
969 auto it = db->get_iterator(prefix);
970 for (it->seek_to_first();
971 it->valid();
972 it->next()) {
973 bat.Delete(cf, rocksdb::Slice(it->key()));
974 }
975 }
976 } else {
977 if (db->enable_rmrange) {
978 string endprefix = prefix;
979 endprefix.push_back('\x01');
980 if (db->max_items_rmrange) {
981 uint64_t cnt = db->max_items_rmrange;
982 bat.SetSavePoint();
983 auto it = db->get_iterator(prefix);
984 for (it->seek_to_first();
985 it->valid();
986 it->next()) {
987 if (!cnt) {
988 bat.RollbackToSavePoint();
989 bat.DeleteRange(db->default_cf,
990 combine_strings(prefix, string()),
991 combine_strings(endprefix, string()));
992 return;
993 }
994 bat.Delete(db->default_cf, combine_strings(prefix, it->key()));
995 --cnt;
996 }
997 bat.PopSavePoint();
998 } else {
999 bat.DeleteRange(db->default_cf,
1000 combine_strings(prefix, string()),
1001 combine_strings(endprefix, string()));
1002 }
1003 } else {
1004 auto it = db->get_iterator(prefix);
1005 for (it->seek_to_first();
1006 it->valid();
1007 it->next()) {
1008 bat.Delete(db->default_cf, combine_strings(prefix, it->key()));
1009 }
1010 }
1011 }
1012 }
1013
1014 void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix,
1015 const string &start,
1016 const string &end)
1017 {
1018 auto cf = db->get_cf_handle(prefix);
1019 if (cf) {
1020 if (db->enable_rmrange) {
1021 if (db->max_items_rmrange) {
1022 uint64_t cnt = db->max_items_rmrange;
1023 auto it = db->get_iterator(prefix);
1024 bat.SetSavePoint();
1025 it->lower_bound(start);
1026 while (it->valid()) {
1027 if (it->key() >= end) {
1028 break;
1029 }
1030 if (!cnt) {
1031 bat.RollbackToSavePoint();
1032 bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end));
1033 return;
1034 }
1035 bat.Delete(cf, rocksdb::Slice(it->key()));
1036 it->next();
1037 --cnt;
1038 }
1039 bat.PopSavePoint();
1040 } else {
1041 bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end));
1042 }
1043 } else {
1044 auto it = db->get_iterator(prefix);
1045 it->lower_bound(start);
1046 while (it->valid()) {
1047 if (it->key() >= end) {
1048 break;
1049 }
1050 bat.Delete(cf, rocksdb::Slice(it->key()));
1051 it->next();
1052 }
1053 }
1054 } else {
1055 if (db->enable_rmrange) {
1056 if (db->max_items_rmrange) {
1057 uint64_t cnt = db->max_items_rmrange;
1058 auto it = db->get_iterator(prefix);
1059 bat.SetSavePoint();
1060 it->lower_bound(start);
1061 while (it->valid()) {
1062 if (it->key() >= end) {
1063 break;
1064 }
1065 if (!cnt) {
1066 bat.RollbackToSavePoint();
1067 bat.DeleteRange(
1068 db->default_cf,
1069 rocksdb::Slice(combine_strings(prefix, start)),
1070 rocksdb::Slice(combine_strings(prefix, end)));
1071 return;
1072 }
1073 bat.Delete(db->default_cf,
1074 combine_strings(prefix, it->key()));
1075 it->next();
1076 --cnt;
1077 }
1078 bat.PopSavePoint();
1079 } else {
1080 bat.DeleteRange(
1081 db->default_cf,
1082 rocksdb::Slice(combine_strings(prefix, start)),
1083 rocksdb::Slice(combine_strings(prefix, end)));
1084 }
1085 } else {
1086 auto it = db->get_iterator(prefix);
1087 it->lower_bound(start);
1088 while (it->valid()) {
1089 if (it->key() >= end) {
1090 break;
1091 }
1092 bat.Delete(db->default_cf,
1093 combine_strings(prefix, it->key()));
1094 it->next();
1095 }
1096 }
1097 }
1098 }
1099
1100 void RocksDBStore::RocksDBTransactionImpl::merge(
1101 const string &prefix,
1102 const string &k,
1103 const bufferlist &to_set_bl)
1104 {
1105 auto cf = db->get_cf_handle(prefix);
1106 if (cf) {
1107 // bufferlist::c_str() is non-constant, so we can't call c_str()
1108 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
1109 bat.Merge(
1110 cf,
1111 rocksdb::Slice(k),
1112 rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length()));
1113 } else {
1114 // make a copy
1115 rocksdb::Slice key_slice(k);
1116 vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
1117 bat.Merge(cf, rocksdb::SliceParts(&key_slice, 1),
1118 prepare_sliceparts(to_set_bl, &value_slices));
1119 }
1120 } else {
1121 string key = combine_strings(prefix, k);
1122 // bufferlist::c_str() is non-constant, so we can't call c_str()
1123 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
1124 bat.Merge(
1125 db->default_cf,
1126 rocksdb::Slice(key),
1127 rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length()));
1128 } else {
1129 // make a copy
1130 rocksdb::Slice key_slice(key);
1131 vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
1132 bat.Merge(
1133 db->default_cf,
1134 rocksdb::SliceParts(&key_slice, 1),
1135 prepare_sliceparts(to_set_bl, &value_slices));
1136 }
1137 }
1138 }
1139
1140 int RocksDBStore::get(
1141 const string &prefix,
1142 const std::set<string> &keys,
1143 std::map<string, bufferlist> *out)
1144 {
1145 utime_t start = ceph_clock_now();
1146 auto cf = get_cf_handle(prefix);
1147 if (cf) {
1148 for (auto& key : keys) {
1149 std::string value;
1150 auto status = db->Get(rocksdb::ReadOptions(),
1151 cf,
1152 rocksdb::Slice(key),
1153 &value);
1154 if (status.ok()) {
1155 (*out)[key].append(value);
1156 } else if (status.IsIOError()) {
1157 ceph_abort_msg(status.getState());
1158 }
1159 }
1160 } else {
1161 for (auto& key : keys) {
1162 std::string value;
1163 string k = combine_strings(prefix, key);
1164 auto status = db->Get(rocksdb::ReadOptions(),
1165 default_cf,
1166 rocksdb::Slice(k),
1167 &value);
1168 if (status.ok()) {
1169 (*out)[key].append(value);
1170 } else if (status.IsIOError()) {
1171 ceph_abort_msg(status.getState());
1172 }
1173 }
1174 }
1175 utime_t lat = ceph_clock_now() - start;
1176 logger->inc(l_rocksdb_gets);
1177 logger->tinc(l_rocksdb_get_latency, lat);
1178 return 0;
1179 }
1180
1181 int RocksDBStore::get(
1182 const string &prefix,
1183 const string &key,
1184 bufferlist *out)
1185 {
1186 ceph_assert(out && (out->length() == 0));
1187 utime_t start = ceph_clock_now();
1188 int r = 0;
1189 string value;
1190 rocksdb::Status s;
1191 auto cf = get_cf_handle(prefix);
1192 if (cf) {
1193 s = db->Get(rocksdb::ReadOptions(),
1194 cf,
1195 rocksdb::Slice(key),
1196 &value);
1197 } else {
1198 string k = combine_strings(prefix, key);
1199 s = db->Get(rocksdb::ReadOptions(),
1200 default_cf,
1201 rocksdb::Slice(k),
1202 &value);
1203 }
1204 if (s.ok()) {
1205 out->append(value);
1206 } else if (s.IsNotFound()) {
1207 r = -ENOENT;
1208 } else {
1209 ceph_abort_msg(s.getState());
1210 }
1211 utime_t lat = ceph_clock_now() - start;
1212 logger->inc(l_rocksdb_gets);
1213 logger->tinc(l_rocksdb_get_latency, lat);
1214 return r;
1215 }
1216
1217 int RocksDBStore::get(
1218 const string& prefix,
1219 const char *key,
1220 size_t keylen,
1221 bufferlist *out)
1222 {
1223 ceph_assert(out && (out->length() == 0));
1224 utime_t start = ceph_clock_now();
1225 int r = 0;
1226 string value;
1227 rocksdb::Status s;
1228 auto cf = get_cf_handle(prefix);
1229 if (cf) {
1230 s = db->Get(rocksdb::ReadOptions(),
1231 cf,
1232 rocksdb::Slice(key, keylen),
1233 &value);
1234 } else {
1235 string k;
1236 combine_strings(prefix, key, keylen, &k);
1237 s = db->Get(rocksdb::ReadOptions(),
1238 default_cf,
1239 rocksdb::Slice(k),
1240 &value);
1241 }
1242 if (s.ok()) {
1243 out->append(value);
1244 } else if (s.IsNotFound()) {
1245 r = -ENOENT;
1246 } else {
1247 ceph_abort_msg(s.getState());
1248 }
1249 utime_t lat = ceph_clock_now() - start;
1250 logger->inc(l_rocksdb_gets);
1251 logger->tinc(l_rocksdb_get_latency, lat);
1252 return r;
1253 }
1254
1255 int RocksDBStore::split_key(rocksdb::Slice in, string *prefix, string *key)
1256 {
1257 size_t prefix_len = 0;
1258
1259 // Find separator inside Slice
1260 char* separator = (char*) memchr(in.data(), 0, in.size());
1261 if (separator == NULL)
1262 return -EINVAL;
1263 prefix_len = size_t(separator - in.data());
1264 if (prefix_len >= in.size())
1265 return -EINVAL;
1266
1267 // Fetch prefix and/or key directly from Slice
1268 if (prefix)
1269 *prefix = string(in.data(), prefix_len);
1270 if (key)
1271 *key = string(separator+1, in.size()-prefix_len-1);
1272 return 0;
1273 }
1274
1275 void RocksDBStore::compact()
1276 {
1277 logger->inc(l_rocksdb_compact);
1278 rocksdb::CompactRangeOptions options;
1279 db->CompactRange(options, default_cf, nullptr, nullptr);
1280 for (auto cf : cf_handles) {
1281 db->CompactRange(
1282 options,
1283 static_cast<rocksdb::ColumnFamilyHandle*>(cf.second),
1284 nullptr, nullptr);
1285 }
1286 }
1287
1288
1289 void RocksDBStore::compact_thread_entry()
1290 {
1291 compact_queue_lock.Lock();
1292 while (!compact_queue_stop) {
1293 while (!compact_queue.empty()) {
1294 pair<string,string> range = compact_queue.front();
1295 compact_queue.pop_front();
1296 logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
1297 compact_queue_lock.Unlock();
1298 logger->inc(l_rocksdb_compact_range);
1299 if (range.first.empty() && range.second.empty()) {
1300 compact();
1301 } else {
1302 compact_range(range.first, range.second);
1303 }
1304 compact_queue_lock.Lock();
1305 continue;
1306 }
1307 compact_queue_cond.Wait(compact_queue_lock);
1308 }
1309 compact_queue_lock.Unlock();
1310 }
1311
1312 void RocksDBStore::compact_range_async(const string& start, const string& end)
1313 {
1314 std::lock_guard l(compact_queue_lock);
1315
1316 // try to merge adjacent ranges. this is O(n), but the queue should
1317 // be short. note that we do not cover all overlap cases and merge
1318 // opportunities here, but we capture the ones we currently need.
1319 list< pair<string,string> >::iterator p = compact_queue.begin();
1320 while (p != compact_queue.end()) {
1321 if (p->first == start && p->second == end) {
1322 // dup; no-op
1323 return;
1324 }
1325 if (p->first <= end && p->first > start) {
1326 // merge with existing range to the right
1327 compact_queue.push_back(make_pair(start, p->second));
1328 compact_queue.erase(p);
1329 logger->inc(l_rocksdb_compact_queue_merge);
1330 break;
1331 }
1332 if (p->second >= start && p->second < end) {
1333 // merge with existing range to the left
1334 compact_queue.push_back(make_pair(p->first, end));
1335 compact_queue.erase(p);
1336 logger->inc(l_rocksdb_compact_queue_merge);
1337 break;
1338 }
1339 ++p;
1340 }
1341 if (p == compact_queue.end()) {
1342 // no merge, new entry.
1343 compact_queue.push_back(make_pair(start, end));
1344 logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
1345 }
1346 compact_queue_cond.Signal();
1347 if (!compact_thread.is_started()) {
1348 compact_thread.create("rstore_compact");
1349 }
1350 }
1351 bool RocksDBStore::check_omap_dir(string &omap_dir)
1352 {
1353 rocksdb::Options options;
1354 options.create_if_missing = true;
1355 rocksdb::DB *db;
1356 rocksdb::Status status = rocksdb::DB::Open(options, omap_dir, &db);
1357 delete db;
1358 db = nullptr;
1359 return status.ok();
1360 }
1361 void RocksDBStore::compact_range(const string& start, const string& end)
1362 {
1363 rocksdb::CompactRangeOptions options;
1364 rocksdb::Slice cstart(start);
1365 rocksdb::Slice cend(end);
1366 db->CompactRange(options, &cstart, &cend);
1367 }
1368
1369 RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl()
1370 {
1371 delete dbiter;
1372 }
1373 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first()
1374 {
1375 dbiter->SeekToFirst();
1376 ceph_assert(!dbiter->status().IsIOError());
1377 return dbiter->status().ok() ? 0 : -1;
1378 }
1379 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string &prefix)
1380 {
1381 rocksdb::Slice slice_prefix(prefix);
1382 dbiter->Seek(slice_prefix);
1383 ceph_assert(!dbiter->status().IsIOError());
1384 return dbiter->status().ok() ? 0 : -1;
1385 }
1386 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last()
1387 {
1388 dbiter->SeekToLast();
1389 ceph_assert(!dbiter->status().IsIOError());
1390 return dbiter->status().ok() ? 0 : -1;
1391 }
1392 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string &prefix)
1393 {
1394 string limit = past_prefix(prefix);
1395 rocksdb::Slice slice_limit(limit);
1396 dbiter->Seek(slice_limit);
1397
1398 if (!dbiter->Valid()) {
1399 dbiter->SeekToLast();
1400 } else {
1401 dbiter->Prev();
1402 }
1403 return dbiter->status().ok() ? 0 : -1;
1404 }
1405 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after)
1406 {
1407 lower_bound(prefix, after);
1408 if (valid()) {
1409 pair<string,string> key = raw_key();
1410 if (key.first == prefix && key.second == after)
1411 next();
1412 }
1413 return dbiter->status().ok() ? 0 : -1;
1414 }
1415 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to)
1416 {
1417 string bound = combine_strings(prefix, to);
1418 rocksdb::Slice slice_bound(bound);
1419 dbiter->Seek(slice_bound);
1420 return dbiter->status().ok() ? 0 : -1;
1421 }
1422 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid()
1423 {
1424 return dbiter->Valid();
1425 }
1426 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next()
1427 {
1428 if (valid()) {
1429 dbiter->Next();
1430 }
1431 ceph_assert(!dbiter->status().IsIOError());
1432 return dbiter->status().ok() ? 0 : -1;
1433 }
1434 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev()
1435 {
1436 if (valid()) {
1437 dbiter->Prev();
1438 }
1439 ceph_assert(!dbiter->status().IsIOError());
1440 return dbiter->status().ok() ? 0 : -1;
1441 }
1442 string RocksDBStore::RocksDBWholeSpaceIteratorImpl::key()
1443 {
1444 string out_key;
1445 split_key(dbiter->key(), 0, &out_key);
1446 return out_key;
1447 }
1448 pair<string,string> RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key()
1449 {
1450 string prefix, key;
1451 split_key(dbiter->key(), &prefix, &key);
1452 return make_pair(prefix, key);
1453 }
1454
1455 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key_is_prefixed(const string &prefix) {
1456 // Look for "prefix\0" right in rocksb::Slice
1457 rocksdb::Slice key = dbiter->key();
1458 if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) {
1459 return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0;
1460 } else {
1461 return false;
1462 }
1463 }
1464
1465 bufferlist RocksDBStore::RocksDBWholeSpaceIteratorImpl::value()
1466 {
1467 return to_bufferlist(dbiter->value());
1468 }
1469
1470 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::key_size()
1471 {
1472 return dbiter->key().size();
1473 }
1474
1475 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_size()
1476 {
1477 return dbiter->value().size();
1478 }
1479
1480 bufferptr RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_as_ptr()
1481 {
1482 rocksdb::Slice val = dbiter->value();
1483 return bufferptr(val.data(), val.size());
1484 }
1485
1486 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status()
1487 {
1488 return dbiter->status().ok() ? 0 : -1;
1489 }
1490
1491 string RocksDBStore::past_prefix(const string &prefix)
1492 {
1493 string limit = prefix;
1494 limit.push_back(1);
1495 return limit;
1496 }
1497
1498 RocksDBStore::WholeSpaceIterator RocksDBStore::get_wholespace_iterator()
1499 {
1500 return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
1501 db->NewIterator(rocksdb::ReadOptions(), default_cf));
1502 }
1503
1504 class CFIteratorImpl : public KeyValueDB::IteratorImpl {
1505 protected:
1506 string prefix;
1507 rocksdb::Iterator *dbiter;
1508 public:
1509 explicit CFIteratorImpl(const std::string& p,
1510 rocksdb::Iterator *iter)
1511 : prefix(p), dbiter(iter) { }
1512 ~CFIteratorImpl() {
1513 delete dbiter;
1514 }
1515
1516 int seek_to_first() override {
1517 dbiter->SeekToFirst();
1518 return dbiter->status().ok() ? 0 : -1;
1519 }
1520 int seek_to_last() override {
1521 dbiter->SeekToLast();
1522 return dbiter->status().ok() ? 0 : -1;
1523 }
1524 int upper_bound(const string &after) override {
1525 lower_bound(after);
1526 if (valid() && (key() == after)) {
1527 next();
1528 }
1529 return dbiter->status().ok() ? 0 : -1;
1530 }
1531 int lower_bound(const string &to) override {
1532 rocksdb::Slice slice_bound(to);
1533 dbiter->Seek(slice_bound);
1534 return dbiter->status().ok() ? 0 : -1;
1535 }
1536 int next() override {
1537 if (valid()) {
1538 dbiter->Next();
1539 }
1540 return dbiter->status().ok() ? 0 : -1;
1541 }
1542 int prev() override {
1543 if (valid()) {
1544 dbiter->Prev();
1545 }
1546 return dbiter->status().ok() ? 0 : -1;
1547 }
1548 bool valid() override {
1549 return dbiter->Valid();
1550 }
1551 string key() override {
1552 return dbiter->key().ToString();
1553 }
1554 std::pair<std::string, std::string> raw_key() override {
1555 return make_pair(prefix, key());
1556 }
1557 bufferlist value() override {
1558 return to_bufferlist(dbiter->value());
1559 }
1560 bufferptr value_as_ptr() override {
1561 rocksdb::Slice val = dbiter->value();
1562 return bufferptr(val.data(), val.size());
1563 }
1564 int status() override {
1565 return dbiter->status().ok() ? 0 : -1;
1566 }
1567 };
1568
1569 KeyValueDB::Iterator RocksDBStore::get_iterator(const std::string& prefix)
1570 {
1571 rocksdb::ColumnFamilyHandle *cf_handle =
1572 static_cast<rocksdb::ColumnFamilyHandle*>(get_cf_handle(prefix));
1573 if (cf_handle) {
1574 return std::make_shared<CFIteratorImpl>(
1575 prefix,
1576 db->NewIterator(rocksdb::ReadOptions(), cf_handle));
1577 } else {
1578 return KeyValueDB::get_iterator(prefix);
1579 }
1580 }