#include "rocksdb/filter_policy.h"
#include "rocksdb/utilities/convenience.h"
#include "rocksdb/merge_operator.h"
+
using std::string;
#include "common/perf_counters.h"
-#include "common/debug.h"
+#include "common/PriorityCache.h"
#include "include/str_list.h"
#include "include/stringify.h"
#include "include/str_map.h"
#undef dout_prefix
#define dout_prefix *_dout << "rocksdb: "
+static bufferlist to_bufferlist(rocksdb::Slice in) {
+ bufferlist bl;
+ bl.append(bufferptr(in.data(), in.size()));
+ return bl;
+}
+
+static rocksdb::SliceParts prepare_sliceparts(const bufferlist &bl,
+ vector<rocksdb::Slice> *slices)
+{
+ unsigned n = 0;
+ for (auto& buf : bl.buffers()) {
+ (*slices)[n].data_ = buf.c_str();
+ (*slices)[n].size_ = buf.length();
+ n++;
+ }
+ return rocksdb::SliceParts(slices->data(), slices->size());
+}
+
+
//
-// One of these per rocksdb instance, implements the merge operator prefix stuff
+// One of these for the default rocksdb column family, routing each prefix
+// to the appropriate MergeOperator.
//
-class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperator {
+class RocksDBStore::MergeOperatorRouter
+ : public rocksdb::AssociativeMergeOperator
+{
RocksDBStore& store;
- public:
+public:
const char *Name() const override {
// Construct a name that rocksDB will validate against. We want to
// do this in a way that doesn't constrain the ordering of calls
// construct a name from all of those parts.
store.assoc_name.clear();
map<std::string,std::string> names;
- for (auto& p : store.merge_ops) names[p.first] = p.second->name();
+
+ for (auto& p : store.merge_ops) {
+ names[p.first] = p.second->name();
+ }
+ for (auto& p : store.cf_handles) {
+ names.erase(p.first);
+ }
for (auto& p : names) {
store.assoc_name += '.';
store.assoc_name += p.first;
return store.assoc_name.c_str();
}
- MergeOperatorRouter(RocksDBStore &_store) : store(_store) {}
+ explicit MergeOperatorRouter(RocksDBStore &_store) : store(_store) {}
bool Merge(const rocksdb::Slice& key,
- const rocksdb::Slice* existing_value,
- const rocksdb::Slice& value,
- std::string* new_value,
- rocksdb::Logger* logger) const override {
- // Check each prefix
+ const rocksdb::Slice* existing_value,
+ const rocksdb::Slice& value,
+ std::string* new_value,
+ rocksdb::Logger* logger) const override {
+ // for default column family
+ // extract prefix from key and compare against each registered merge op;
+ // even though merge operator for explicit CF is included in merge_ops,
+ // it won't be picked up, since it won't match.
for (auto& p : store.merge_ops) {
if (p.first.compare(0, p.first.length(),
key.data(), p.first.length()) == 0 &&
key.data()[p.first.length()] == 0) {
- if (existing_value) {
- p.second->merge(existing_value->data(), existing_value->size(),
+ if (existing_value) {
+ p.second->merge(existing_value->data(), existing_value->size(),
value.data(), value.size(),
new_value);
- } else {
- p.second->merge_nonexistent(value.data(), value.size(), new_value);
- }
- break;
+ } else {
+ p.second->merge_nonexistent(value.data(), value.size(), new_value);
+ }
+ break;
}
}
return true; // OK :)
}
+};
+
+//
+// One of these per non-default column family, linked directly to the
+// merge operator for that CF/prefix (if any).
+//
+class RocksDBStore::MergeOperatorLinker
+ : public rocksdb::AssociativeMergeOperator
+{
+private:
+ std::shared_ptr<KeyValueDB::MergeOperator> mop;
+public:
+ explicit MergeOperatorLinker(const std::shared_ptr<KeyValueDB::MergeOperator> &o) : mop(o) {}
+ const char *Name() const override {
+ return mop->name();
+ }
+
+ bool Merge(const rocksdb::Slice& key,
+ const rocksdb::Slice* existing_value,
+ const rocksdb::Slice& value,
+ std::string* new_value,
+ rocksdb::Logger* logger) const override {
+ if (existing_value) {
+ mop->merge(existing_value->data(), existing_value->size(),
+ value.data(), value.size(),
+ new_value);
+ } else {
+ mop->merge_nonexistent(value.data(), value.size(), new_value);
+ }
+ return true;
+ }
};
int RocksDBStore::set_merge_operator(
std::shared_ptr<KeyValueDB::MergeOperator> mop)
{
// If you fail here, it's because you can't do this on an open database
- assert(db == nullptr);
+ ceph_assert(db == nullptr);
merge_ops.push_back(std::make_pair(prefix,mop));
return 0;
}
void Logv(const rocksdb::InfoLogLevel log_level, const char* format,
va_list ap) override {
int v = rocksdb::NUM_INFO_LOG_LEVELS - log_level - 1;
- dout(v);
+ dout(ceph::dout::need_dynamic(v));
char buf[65536];
vsnprintf(buf, sizeof(buf), format, ap);
*_dout << buf << dendl;
return new CephRocksdbLogger(g_ceph_context);
}
-int string2bool(string val, bool &b_val)
+static int string2bool(const string &val, bool &b_val)
{
if (strcasecmp(val.c_str(), "false") == 0) {
b_val = false;
}
}
-int RocksDBStore::tryInterpret(const string key, const string val, rocksdb::Options &opt)
+int RocksDBStore::tryInterpret(const string &key, const string &val, rocksdb::Options &opt)
{
if (key == "compaction_threads") {
std::string err;
- int f = strict_sistrtoll(val.c_str(), &err);
+ int f = strict_iecstrtoll(val.c_str(), &err);
if (!err.empty())
return -EINVAL;
//Low priority threadpool is used for compaction
opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::LOW);
} else if (key == "flusher_threads") {
std::string err;
- int f = strict_sistrtoll(val.c_str(), &err);
+ int f = strict_iecstrtoll(val.c_str(), &err);
if (!err.empty())
return -EINVAL;
//High priority threadpool is used for flusher
return 0;
}
-int RocksDBStore::ParseOptionsFromString(const string opt_str, rocksdb::Options &opt)
+int RocksDBStore::ParseOptionsFromString(const string &opt_str, rocksdb::Options &opt)
{
map<string, string> str_map;
int r = get_str_map(opt_str, &str_map, ",\n;");
return 0;
}
-int RocksDBStore::create_and_open(ostream &out)
+int RocksDBStore::create_db_dir()
{
if (env) {
unique_ptr<rocksdb::Directory> dir;
return r;
}
}
- return do_open(out, true);
+ return 0;
}
-int RocksDBStore::do_open(ostream &out, bool create_if_missing)
+int RocksDBStore::install_cf_mergeop(
+ const string &cf_name,
+ rocksdb::ColumnFamilyOptions *cf_opt)
+{
+ ceph_assert(cf_opt != nullptr);
+ cf_opt->merge_operator.reset();
+ for (auto& i : merge_ops) {
+ if (i.first == cf_name) {
+ cf_opt->merge_operator.reset(new MergeOperatorLinker(i.second));
+ }
+ }
+ return 0;
+}
+
+int RocksDBStore::create_and_open(ostream &out,
+ const vector<ColumnFamily>& cfs)
+{
+ int r = create_db_dir();
+ if (r < 0)
+ return r;
+ if (cfs.empty()) {
+ return do_open(out, true, false, nullptr);
+ } else {
+ return do_open(out, true, false, &cfs);
+ }
+}
+
+int RocksDBStore::load_rocksdb_options(bool create_if_missing, rocksdb::Options& opt)
{
- rocksdb::Options opt;
rocksdb::Status status;
if (options_str.length()) {
}
}
- if (g_conf->rocksdb_perf) {
+ if (g_conf()->rocksdb_perf) {
dbstats = rocksdb::CreateDBStatistics();
opt.statistics = dbstats;
}
opt.create_if_missing = create_if_missing;
- if (g_conf->rocksdb_separate_wal_dir) {
+ if (kv_options.count("separate_wal_dir")) {
opt.wal_dir = path + ".wal";
}
- if (g_conf->get_val<std::string>("rocksdb_db_paths").length()) {
- list<string> paths;
- get_str_list(g_conf->get_val<std::string>("rocksdb_db_paths"), "; \t", paths);
- for (auto& p : paths) {
- size_t pos = p.find(',');
- if (pos == std::string::npos) {
- derr << __func__ << " invalid db path item " << p << " in "
- << g_conf->get_val<std::string>("rocksdb_db_paths") << dendl;
- return -EINVAL;
- }
- string path = p.substr(0, pos);
- string size_str = p.substr(pos + 1);
- uint64_t size = atoll(size_str.c_str());
- if (!size) {
- derr << __func__ << " invalid db path item " << p << " in "
- << g_conf->get_val<std::string>("rocksdb_db_paths") << dendl;
- return -EINVAL;
+
+ // Since ceph::for_each_substr doesn't return a value and
+ // std::stoull does throw, we may as well just catch everything here.
+ try {
+ if (kv_options.count("db_paths")) {
+ list<string> paths;
+ get_str_list(kv_options["db_paths"], "; \t", paths);
+ for (auto& p : paths) {
+ size_t pos = p.find(',');
+ if (pos == std::string::npos) {
+ derr << __func__ << " invalid db path item " << p << " in "
+ << kv_options["db_paths"] << dendl;
+ return -EINVAL;
+ }
+ string path = p.substr(0, pos);
+ string size_str = p.substr(pos + 1);
+ uint64_t size = atoll(size_str.c_str());
+ if (!size) {
+ derr << __func__ << " invalid db path item " << p << " in "
+ << kv_options["db_paths"] << dendl;
+ return -EINVAL;
+ }
+ opt.db_paths.push_back(rocksdb::DbPath(path, size));
+ dout(10) << __func__ << " db_path " << path << " size " << size << dendl;
}
- opt.db_paths.push_back(rocksdb::DbPath(path, size));
- dout(10) << __func__ << " db_path " << path << " size " << size << dendl;
}
+ } catch (const std::system_error& e) {
+ return -e.code().value();
}
- if (g_conf->rocksdb_log_to_ceph_log) {
+ if (g_conf()->rocksdb_log_to_ceph_log) {
opt.info_log.reset(new CephRocksdbLogger(g_ceph_context));
}
opt.env = static_cast<rocksdb::Env*>(priv);
}
- auto cache = rocksdb::NewLRUCache(g_conf->rocksdb_cache_size, g_conf->rocksdb_cache_shard_bits);
- bbt_opts.block_size = g_conf->rocksdb_block_size;
- bbt_opts.block_cache = cache;
- if (g_conf->kstore_rocksdb_bloom_bits_per_key > 0) {
+ // caches
+ if (!set_cache_flag) {
+ cache_size = g_conf()->rocksdb_cache_size;
+ }
+ uint64_t row_cache_size = cache_size * g_conf()->rocksdb_cache_row_ratio;
+ uint64_t block_cache_size = cache_size - row_cache_size;
+
+ if (g_conf()->rocksdb_cache_type == "binned_lru") {
+ bbt_opts.block_cache = rocksdb_cache::NewBinnedLRUCache(
+ cct,
+ block_cache_size,
+ g_conf()->rocksdb_cache_shard_bits);
+ } else if (g_conf()->rocksdb_cache_type == "lru") {
+ bbt_opts.block_cache = rocksdb::NewLRUCache(
+ block_cache_size,
+ g_conf()->rocksdb_cache_shard_bits);
+ } else if (g_conf()->rocksdb_cache_type == "clock") {
+ bbt_opts.block_cache = rocksdb::NewClockCache(
+ block_cache_size,
+ g_conf()->rocksdb_cache_shard_bits);
+ if (!bbt_opts.block_cache) {
+ derr << "rocksdb_cache_type '" << g_conf()->rocksdb_cache_type
+ << "' chosen, but RocksDB not compiled with LibTBB. "
+ << dendl;
+ return -EINVAL;
+ }
+ } else {
+ derr << "unrecognized rocksdb_cache_type '" << g_conf()->rocksdb_cache_type
+ << "'" << dendl;
+ return -EINVAL;
+ }
+ bbt_opts.block_size = g_conf()->rocksdb_block_size;
+
+ if (row_cache_size > 0)
+ opt.row_cache = rocksdb::NewLRUCache(row_cache_size,
+ g_conf()->rocksdb_cache_shard_bits);
+ uint64_t bloom_bits = g_conf().get_val<uint64_t>("rocksdb_bloom_bits_per_key");
+ if (bloom_bits > 0) {
dout(10) << __func__ << " set bloom filter bits per key to "
- << g_conf->kstore_rocksdb_bloom_bits_per_key << dendl;
- bbt_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(g_conf->kstore_rocksdb_bloom_bits_per_key));
+ << bloom_bits << dendl;
+ bbt_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(bloom_bits));
+ }
+ using std::placeholders::_1;
+ if (g_conf().with_val<std::string>("rocksdb_index_type",
+ std::bind(std::equal_to<std::string>(), _1,
+ "binary_search")))
+ bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kBinarySearch;
+ if (g_conf().with_val<std::string>("rocksdb_index_type",
+ std::bind(std::equal_to<std::string>(), _1,
+ "hash_search")))
+ bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kHashSearch;
+ if (g_conf().with_val<std::string>("rocksdb_index_type",
+ std::bind(std::equal_to<std::string>(), _1,
+ "two_level")))
+ bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
+ if (!bbt_opts.no_block_cache) {
+ bbt_opts.cache_index_and_filter_blocks =
+ g_conf().get_val<bool>("rocksdb_cache_index_and_filter_blocks");
+ bbt_opts.cache_index_and_filter_blocks_with_high_priority =
+ g_conf().get_val<bool>("rocksdb_cache_index_and_filter_blocks_with_high_priority");
+ bbt_opts.pin_l0_filter_and_index_blocks_in_cache =
+ g_conf().get_val<bool>("rocksdb_pin_l0_filter_and_index_blocks_in_cache");
}
+ bbt_opts.partition_filters = g_conf().get_val<bool>("rocksdb_partition_filters");
+ if (g_conf().get_val<Option::size_t>("rocksdb_metadata_block_size") > 0)
+ bbt_opts.metadata_block_size = g_conf().get_val<Option::size_t>("rocksdb_metadata_block_size");
+
opt.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbt_opts));
- dout(10) << __func__ << " set block size to " << g_conf->rocksdb_block_size
- << " cache size to " << g_conf->rocksdb_cache_size
- << " num of cache shards to " << (1 << g_conf->rocksdb_cache_shard_bits) << dendl;
+ dout(10) << __func__ << " block size " << g_conf()->rocksdb_block_size
+ << ", block_cache size " << byte_u_t(block_cache_size)
+ << ", row_cache size " << byte_u_t(row_cache_size)
+ << "; shards "
+ << (1 << g_conf()->rocksdb_cache_shard_bits)
+ << ", type " << g_conf()->rocksdb_cache_type
+ << dendl;
opt.merge_operator.reset(new MergeOperatorRouter(*this));
- status = rocksdb::DB::Open(opt, path, &db);
- if (!status.ok()) {
- derr << status.ToString() << dendl;
- return -EINVAL;
+
+ return 0;
+}
+
+int RocksDBStore::do_open(ostream &out,
+ bool create_if_missing,
+ bool open_readonly,
+ const vector<ColumnFamily>* cfs)
+{
+ ceph_assert(!(create_if_missing && open_readonly));
+ rocksdb::Options opt;
+ int r = load_rocksdb_options(create_if_missing, opt);
+ if (r) {
+ dout(1) << __func__ << " load rocksdb options failed" << dendl;
+ return r;
+ }
+ rocksdb::Status status;
+ if (create_if_missing) {
+ status = rocksdb::DB::Open(opt, path, &db);
+ if (!status.ok()) {
+ derr << status.ToString() << dendl;
+ return -EINVAL;
+ }
+ // create and open column families
+ if (cfs) {
+ for (auto& p : *cfs) {
+ // copy default CF settings, block cache, merge operators as
+ // the base for new CF
+ rocksdb::ColumnFamilyOptions cf_opt(opt);
+ // user input options will override the base options
+ status = rocksdb::GetColumnFamilyOptionsFromString(
+ cf_opt, p.option, &cf_opt);
+ if (!status.ok()) {
+ derr << __func__ << " invalid db column family option string for CF: "
+ << p.name << dendl;
+ return -EINVAL;
+ }
+ install_cf_mergeop(p.name, &cf_opt);
+ rocksdb::ColumnFamilyHandle *cf;
+ status = db->CreateColumnFamily(cf_opt, p.name, &cf);
+ if (!status.ok()) {
+ derr << __func__ << " Failed to create rocksdb column family: "
+ << p.name << dendl;
+ return -EINVAL;
+ }
+ // store the new CF handle
+ add_column_family(p.name, static_cast<void*>(cf));
+ }
+ }
+ default_cf = db->DefaultColumnFamily();
+ } else {
+ std::vector<string> existing_cfs;
+ status = rocksdb::DB::ListColumnFamilies(
+ rocksdb::DBOptions(opt),
+ path,
+ &existing_cfs);
+ dout(1) << __func__ << " column families: " << existing_cfs << dendl;
+ if (existing_cfs.empty()) {
+ // no column families
+ if (open_readonly) {
+ status = rocksdb::DB::Open(opt, path, &db);
+ } else {
+ status = rocksdb::DB::OpenForReadOnly(opt, path, &db);
+ }
+ if (!status.ok()) {
+ derr << status.ToString() << dendl;
+ return -EINVAL;
+ }
+ default_cf = db->DefaultColumnFamily();
+ } else {
+ // we cannot change column families for a created database. so, map
+ // what options we are given to whatever cf's already exist.
+ std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
+ for (auto& n : existing_cfs) {
+ // copy default CF settings, block cache, merge operators as
+ // the base for new CF
+ rocksdb::ColumnFamilyOptions cf_opt(opt);
+ bool found = false;
+ if (cfs) {
+ for (auto& i : *cfs) {
+ if (i.name == n) {
+ found = true;
+ status = rocksdb::GetColumnFamilyOptionsFromString(
+ cf_opt, i.option, &cf_opt);
+ if (!status.ok()) {
+ derr << __func__ << " invalid db column family options for CF '"
+ << i.name << "': " << i.option << dendl;
+ return -EINVAL;
+ }
+ }
+ }
+ }
+ if (n != rocksdb::kDefaultColumnFamilyName) {
+ install_cf_mergeop(n, &cf_opt);
+ }
+ column_families.push_back(rocksdb::ColumnFamilyDescriptor(n, cf_opt));
+ if (!found && n != rocksdb::kDefaultColumnFamilyName) {
+ dout(1) << __func__ << " column family '" << n
+ << "' exists but not expected" << dendl;
+ }
+ }
+ std::vector<rocksdb::ColumnFamilyHandle*> handles;
+ if (open_readonly) {
+ status = rocksdb::DB::OpenForReadOnly(rocksdb::DBOptions(opt),
+ path, column_families,
+ &handles, &db);
+ } else {
+ status = rocksdb::DB::Open(rocksdb::DBOptions(opt),
+ path, column_families, &handles, &db);
+ }
+ if (!status.ok()) {
+ derr << status.ToString() << dendl;
+ return -EINVAL;
+ }
+ for (unsigned i = 0; i < existing_cfs.size(); ++i) {
+ if (existing_cfs[i] == rocksdb::kDefaultColumnFamilyName) {
+ default_cf = handles[i];
+ must_close_default_cf = true;
+ } else {
+ add_column_family(existing_cfs[i], static_cast<void*>(handles[i]));
+ }
+ }
+ }
}
+ ceph_assert(default_cf != nullptr);
PerfCountersBuilder plb(g_ceph_context, "rocksdb", l_rocksdb_first, l_rocksdb_last);
plb.add_u64_counter(l_rocksdb_gets, "get", "Gets");
delete logger;
// Ensure db is destroyed before dependent db_cache and filterpolicy
+ for (auto& p : cf_handles) {
+ db->DestroyColumnFamilyHandle(
+ static_cast<rocksdb::ColumnFamilyHandle*>(p.second));
+ p.second = nullptr;
+ }
+ if (must_close_default_cf) {
+ db->DestroyColumnFamilyHandle(default_cf);
+ must_close_default_cf = false;
+ }
+ default_cf = nullptr;
delete db;
db = nullptr;
cct->get_perfcounters_collection()->remove(logger);
}
+int RocksDBStore::repair(std::ostream &out)
+{
+ rocksdb::Options opt;
+ int r = load_rocksdb_options(false, opt);
+ if (r) {
+ dout(1) << __func__ << " load rocksdb options failed" << dendl;
+ out << "load rocksdb options failed" << std::endl;
+ return r;
+ }
+ rocksdb::Status status = rocksdb::RepairDB(path, opt);
+ if (status.ok()) {
+ return 0;
+ } else {
+ out << "repair rocksdb failed : " << status.ToString() << std::endl;
+ return 1;
+ }
+}
+
void RocksDBStore::split_stats(const std::string &s, char delim, std::vector<std::string> &elems) {
std::stringstream ss;
ss.str(s);
}
}
+int64_t RocksDBStore::estimate_prefix_size(const string& prefix)
+{
+ auto cf = get_cf_handle(prefix);
+ uint64_t size = 0;
+ uint8_t flags =
+ //rocksdb::DB::INCLUDE_MEMTABLES | // do not include memtables...
+ rocksdb::DB::INCLUDE_FILES;
+ if (cf) {
+ string start(1, '\x00');
+ string limit("\xff\xff\xff\xff");
+ rocksdb::Range r(start, limit);
+ db->GetApproximateSizes(cf, &r, 1, &size, flags);
+ } else {
+ string limit = prefix + "\xff\xff\xff\xff";
+ rocksdb::Range r(prefix, limit);
+ db->GetApproximateSizes(default_cf,
+ &r, 1, &size, flags);
+ }
+ return size;
+}
+
void RocksDBStore::get_statistics(Formatter *f)
{
- if (!g_conf->rocksdb_perf) {
- dout(20) << __func__ << "RocksDB perf is disabled, can't probe for stats"
+ if (!g_conf()->rocksdb_perf) {
+ dout(20) << __func__ << " RocksDB perf is disabled, can't probe for stats"
<< dendl;
return;
}
- if (g_conf->rocksdb_collect_compaction_stats) {
+ if (g_conf()->rocksdb_collect_compaction_stats) {
std::string stat_str;
bool status = db->GetProperty("rocksdb.stats", &stat_str);
if (status) {
f->close_section();
}
}
- if (g_conf->rocksdb_collect_extended_stats) {
+ if (g_conf()->rocksdb_collect_extended_stats) {
if (dbstats) {
f->open_object_section("rocksdb_extended_statistics");
string stat_str = dbstats->ToString();
logger->dump_formatted(f,0);
f->close_section();
}
- if (g_conf->rocksdb_collect_memory_stats) {
+ if (g_conf()->rocksdb_collect_memory_stats) {
f->open_object_section("rocksdb_memtable_statistics");
- std::string str(stringify(bbt_opts.block_cache->GetUsage()));
- f->dump_string("block_cache_usage", str.data());
- str.clear();
- str.append(stringify(bbt_opts.block_cache->GetPinnedUsage()));
- f->dump_string("block_cache_pinned_blocks_usage", str);
- str.clear();
+ std::string str;
+ if (!bbt_opts.no_block_cache) {
+ str.append(stringify(bbt_opts.block_cache->GetUsage()));
+ f->dump_string("block_cache_usage", str.data());
+ str.clear();
+ str.append(stringify(bbt_opts.block_cache->GetPinnedUsage()));
+ f->dump_string("block_cache_pinned_blocks_usage", str);
+ str.clear();
+ }
db->GetProperty("rocksdb.cur-size-all-mem-tables", &str);
f->dump_string("rocksdb_memtable_usage", str);
+ str.clear();
+ db->GetProperty("rocksdb.estimate-table-readers-mem", &str);
+ f->dump_string("rocksdb_index_filter_blocks_usage", str);
f->close_section();
}
}
-int RocksDBStore::submit_transaction(KeyValueDB::Transaction t)
+int RocksDBStore::submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t)
{
- utime_t start = ceph_clock_now();
// enable rocksdb breakdown
// considering performance overhead, default is disabled
- if (g_conf->rocksdb_perf) {
+ if (g_conf()->rocksdb_perf) {
rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
- rocksdb::perf_context.Reset();
+ rocksdb::get_perf_context()->Reset();
}
RocksDBTransactionImpl * _t =
static_cast<RocksDBTransactionImpl *>(t.get());
- rocksdb::WriteOptions woptions;
woptions.disableWAL = disableWAL;
lgeneric_subdout(cct, rocksdb, 30) << __func__;
RocksWBHandler bat_txc;
derr << __func__ << " error: " << s.ToString() << " code = " << s.code()
<< " Rocksdb transaction: " << rocks_txc.seen << dendl;
}
- utime_t lat = ceph_clock_now() - start;
- if (g_conf->rocksdb_perf) {
+ if (g_conf()->rocksdb_perf) {
utime_t write_memtable_time;
utime_t write_delay_time;
utime_t write_wal_time;
utime_t write_pre_and_post_process_time;
write_wal_time.set_from_double(
- static_cast<double>(rocksdb::perf_context.write_wal_time)/1000000000);
+ static_cast<double>(rocksdb::get_perf_context()->write_wal_time)/1000000000);
write_memtable_time.set_from_double(
- static_cast<double>(rocksdb::perf_context.write_memtable_time)/1000000000);
+ static_cast<double>(rocksdb::get_perf_context()->write_memtable_time)/1000000000);
write_delay_time.set_from_double(
- static_cast<double>(rocksdb::perf_context.write_delay_time)/1000000000);
+ static_cast<double>(rocksdb::get_perf_context()->write_delay_time)/1000000000);
write_pre_and_post_process_time.set_from_double(
- static_cast<double>(rocksdb::perf_context.write_pre_and_post_process_time)/1000000000);
+ static_cast<double>(rocksdb::get_perf_context()->write_pre_and_post_process_time)/1000000000);
logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time);
logger->tinc(l_rocksdb_write_delay_time, write_delay_time);
logger->tinc(l_rocksdb_write_wal_time, write_wal_time);
logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time);
}
- logger->inc(l_rocksdb_txns);
- logger->tinc(l_rocksdb_submit_latency, lat);
-
return s.ok() ? 0 : -1;
}
-int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t)
+int RocksDBStore::submit_transaction(KeyValueDB::Transaction t)
{
utime_t start = ceph_clock_now();
- // enable rocksdb breakdown
- // considering performance overhead, default is disabled
- if (g_conf->rocksdb_perf) {
- rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
- rocksdb::perf_context.Reset();
- }
-
- RocksDBTransactionImpl * _t =
- static_cast<RocksDBTransactionImpl *>(t.get());
rocksdb::WriteOptions woptions;
- woptions.sync = true;
- woptions.disableWAL = disableWAL;
- lgeneric_subdout(cct, rocksdb, 30) << __func__;
- RocksWBHandler bat_txc;
- _t->bat.Iterate(&bat_txc);
- *_dout << " Rocksdb transaction: " << bat_txc.seen << dendl;
+ woptions.sync = false;
- rocksdb::Status s = db->Write(woptions, &_t->bat);
- if (!s.ok()) {
- RocksWBHandler rocks_txc;
- _t->bat.Iterate(&rocks_txc);
- derr << __func__ << " error: " << s.ToString() << " code = " << s.code()
- << " Rocksdb transaction: " << rocks_txc.seen << dendl;
- }
- utime_t lat = ceph_clock_now() - start;
+ int result = submit_common(woptions, t);
- if (g_conf->rocksdb_perf) {
- utime_t write_memtable_time;
- utime_t write_delay_time;
- utime_t write_wal_time;
- utime_t write_pre_and_post_process_time;
- write_wal_time.set_from_double(
- static_cast<double>(rocksdb::perf_context.write_wal_time)/1000000000);
- write_memtable_time.set_from_double(
- static_cast<double>(rocksdb::perf_context.write_memtable_time)/1000000000);
- write_delay_time.set_from_double(
- static_cast<double>(rocksdb::perf_context.write_delay_time)/1000000000);
- write_pre_and_post_process_time.set_from_double(
- static_cast<double>(rocksdb::perf_context.write_pre_and_post_process_time)/1000000000);
- logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time);
- logger->tinc(l_rocksdb_write_delay_time, write_delay_time);
- logger->tinc(l_rocksdb_write_wal_time, write_wal_time);
- logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time);
- }
+ utime_t lat = ceph_clock_now() - start;
+ logger->inc(l_rocksdb_txns);
+ logger->tinc(l_rocksdb_submit_latency, lat);
+
+ return result;
+}
+int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t)
+{
+ utime_t start = ceph_clock_now();
+ rocksdb::WriteOptions woptions;
+ // if disableWAL, sync can't set
+ woptions.sync = !disableWAL;
+
+ int result = submit_common(woptions, t);
+
+ utime_t lat = ceph_clock_now() - start;
logger->inc(l_rocksdb_txns_sync);
logger->tinc(l_rocksdb_submit_sync_latency, lat);
- return s.ok() ? 0 : -1;
+ return result;
}
RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db)
db = _db;
}
+void RocksDBStore::RocksDBTransactionImpl::put_bat(
+ rocksdb::WriteBatch& bat,
+ rocksdb::ColumnFamilyHandle *cf,
+ const string &key,
+ const bufferlist &to_set_bl)
+{
+ // bufferlist::c_str() is non-constant, so we can't call c_str()
+ if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
+ bat.Put(cf,
+ rocksdb::Slice(key),
+ rocksdb::Slice(to_set_bl.buffers().front().c_str(),
+ to_set_bl.length()));
+ } else {
+ rocksdb::Slice key_slice(key);
+ vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
+ bat.Put(cf,
+ rocksdb::SliceParts(&key_slice, 1),
+ prepare_sliceparts(to_set_bl, &value_slices));
+ }
+}
+
void RocksDBStore::RocksDBTransactionImpl::set(
const string &prefix,
const string &k,
const bufferlist &to_set_bl)
{
- string key = combine_strings(prefix, k);
-
- // bufferlist::c_str() is non-constant, so we can't call c_str()
- if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
- bat.Put(rocksdb::Slice(key),
- rocksdb::Slice(to_set_bl.buffers().front().c_str(),
- to_set_bl.length()));
+ auto cf = db->get_cf_handle(prefix);
+ if (cf) {
+ put_bat(bat, cf, k, to_set_bl);
} else {
- // make a copy
- bufferlist val = to_set_bl;
- bat.Put(rocksdb::Slice(key),
- rocksdb::Slice(val.c_str(), val.length()));
+ string key = combine_strings(prefix, k);
+ put_bat(bat, db->default_cf, key, to_set_bl);
}
}
const char *k, size_t keylen,
const bufferlist &to_set_bl)
{
- string key;
- combine_strings(prefix, k, keylen, &key);
-
- // bufferlist::c_str() is non-constant, so we can't call c_str()
- if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
- bat.Put(rocksdb::Slice(key),
- rocksdb::Slice(to_set_bl.buffers().front().c_str(),
- to_set_bl.length()));
+ auto cf = db->get_cf_handle(prefix);
+ if (cf) {
+ string key(k, keylen); // fixme?
+ put_bat(bat, cf, key, to_set_bl);
} else {
- // make a copy
- bufferlist val = to_set_bl;
- bat.Put(rocksdb::Slice(key),
- rocksdb::Slice(val.c_str(), val.length()));
+ string key;
+ combine_strings(prefix, k, keylen, &key);
+ put_bat(bat, cf, key, to_set_bl);
}
}
void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
const string &k)
{
- bat.Delete(combine_strings(prefix, k));
+ auto cf = db->get_cf_handle(prefix);
+ if (cf) {
+ bat.Delete(cf, rocksdb::Slice(k));
+ } else {
+ bat.Delete(db->default_cf, combine_strings(prefix, k));
+ }
}
void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
const char *k,
size_t keylen)
{
- string key;
- combine_strings(prefix, k, keylen, &key);
- bat.Delete(key);
+ auto cf = db->get_cf_handle(prefix);
+ if (cf) {
+ bat.Delete(cf, rocksdb::Slice(k, keylen));
+ } else {
+ string key;
+ combine_strings(prefix, k, keylen, &key);
+ bat.Delete(db->default_cf, rocksdb::Slice(key));
+ }
}
void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix,
const string &k)
{
- bat.SingleDelete(combine_strings(prefix, k));
+ auto cf = db->get_cf_handle(prefix);
+ if (cf) {
+ bat.SingleDelete(cf, k);
+ } else {
+ bat.SingleDelete(db->default_cf, combine_strings(prefix, k));
+ }
}
void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
{
- KeyValueDB::Iterator it = db->get_iterator(prefix);
- for (it->seek_to_first();
- it->valid();
- it->next()) {
- bat.Delete(combine_strings(prefix, it->key()));
+ auto cf = db->get_cf_handle(prefix);
+ if (cf) {
+ if (db->enable_rmrange) {
+ string endprefix("\xff\xff\xff\xff"); // FIXME: this is cheating...
+ if (db->max_items_rmrange) {
+ uint64_t cnt = db->max_items_rmrange;
+ bat.SetSavePoint();
+ auto it = db->get_iterator(prefix);
+ for (it->seek_to_first();
+ it->valid();
+ it->next()) {
+ if (!cnt) {
+ bat.RollbackToSavePoint();
+ bat.DeleteRange(cf, string(), endprefix);
+ return;
+ }
+ bat.Delete(cf, rocksdb::Slice(it->key()));
+ --cnt;
+ }
+ bat.PopSavePoint();
+ } else {
+ bat.DeleteRange(cf, string(), endprefix);
+ }
+ } else {
+ auto it = db->get_iterator(prefix);
+ for (it->seek_to_first();
+ it->valid();
+ it->next()) {
+ bat.Delete(cf, rocksdb::Slice(it->key()));
+ }
+ }
+ } else {
+ if (db->enable_rmrange) {
+ string endprefix = prefix;
+ endprefix.push_back('\x01');
+ if (db->max_items_rmrange) {
+ uint64_t cnt = db->max_items_rmrange;
+ bat.SetSavePoint();
+ auto it = db->get_iterator(prefix);
+ for (it->seek_to_first();
+ it->valid();
+ it->next()) {
+ if (!cnt) {
+ bat.RollbackToSavePoint();
+ bat.DeleteRange(db->default_cf,
+ combine_strings(prefix, string()),
+ combine_strings(endprefix, string()));
+ return;
+ }
+ bat.Delete(db->default_cf, combine_strings(prefix, it->key()));
+ --cnt;
+ }
+ bat.PopSavePoint();
+ } else {
+ bat.DeleteRange(db->default_cf,
+ combine_strings(prefix, string()),
+ combine_strings(endprefix, string()));
+ }
+ } else {
+ auto it = db->get_iterator(prefix);
+ for (it->seek_to_first();
+ it->valid();
+ it->next()) {
+ bat.Delete(db->default_cf, combine_strings(prefix, it->key()));
+ }
+ }
}
}
const string &start,
const string &end)
{
- if (db->enable_rmrange) {
- bat.DeleteRange(combine_strings(prefix, start), combine_strings(prefix, end));
+ auto cf = db->get_cf_handle(prefix);
+ if (cf) {
+ if (db->enable_rmrange) {
+ if (db->max_items_rmrange) {
+ uint64_t cnt = db->max_items_rmrange;
+ auto it = db->get_iterator(prefix);
+ bat.SetSavePoint();
+ it->lower_bound(start);
+ while (it->valid()) {
+ if (it->key() >= end) {
+ break;
+ }
+ if (!cnt) {
+ bat.RollbackToSavePoint();
+ bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end));
+ return;
+ }
+ bat.Delete(cf, rocksdb::Slice(it->key()));
+ it->next();
+ --cnt;
+ }
+ bat.PopSavePoint();
+ } else {
+ bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end));
+ }
+ } else {
+ auto it = db->get_iterator(prefix);
+ it->lower_bound(start);
+ while (it->valid()) {
+ if (it->key() >= end) {
+ break;
+ }
+ bat.Delete(cf, rocksdb::Slice(it->key()));
+ it->next();
+ }
+ }
} else {
- auto it = db->get_iterator(prefix);
- it->lower_bound(start);
- while (it->valid()) {
- if (it->key() >= end) {
- break;
+ if (db->enable_rmrange) {
+ if (db->max_items_rmrange) {
+ uint64_t cnt = db->max_items_rmrange;
+ auto it = db->get_iterator(prefix);
+ bat.SetSavePoint();
+ it->lower_bound(start);
+ while (it->valid()) {
+ if (it->key() >= end) {
+ break;
+ }
+ if (!cnt) {
+ bat.RollbackToSavePoint();
+ bat.DeleteRange(
+ db->default_cf,
+ rocksdb::Slice(combine_strings(prefix, start)),
+ rocksdb::Slice(combine_strings(prefix, end)));
+ return;
+ }
+ bat.Delete(db->default_cf,
+ combine_strings(prefix, it->key()));
+ it->next();
+ --cnt;
+ }
+ bat.PopSavePoint();
+ } else {
+ bat.DeleteRange(
+ db->default_cf,
+ rocksdb::Slice(combine_strings(prefix, start)),
+ rocksdb::Slice(combine_strings(prefix, end)));
+ }
+ } else {
+ auto it = db->get_iterator(prefix);
+ it->lower_bound(start);
+ while (it->valid()) {
+ if (it->key() >= end) {
+ break;
+ }
+ bat.Delete(db->default_cf,
+ combine_strings(prefix, it->key()));
+ it->next();
}
- bat.Delete(combine_strings(prefix, it->key()));
- it->next();
}
}
}
const string &k,
const bufferlist &to_set_bl)
{
- string key = combine_strings(prefix, k);
-
- // bufferlist::c_str() is non-constant, so we can't call c_str()
- if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
- bat.Merge(rocksdb::Slice(key),
- rocksdb::Slice(to_set_bl.buffers().front().c_str(),
- to_set_bl.length()));
+ auto cf = db->get_cf_handle(prefix);
+ if (cf) {
+ // bufferlist::c_str() is non-constant, so we can't call c_str()
+ if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
+ bat.Merge(
+ cf,
+ rocksdb::Slice(k),
+ rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length()));
+ } else {
+ // make a copy
+ rocksdb::Slice key_slice(k);
+ vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
+ bat.Merge(cf, rocksdb::SliceParts(&key_slice, 1),
+ prepare_sliceparts(to_set_bl, &value_slices));
+ }
} else {
- // make a copy
- bufferlist val = to_set_bl;
- bat.Merge(rocksdb::Slice(key),
- rocksdb::Slice(val.c_str(), val.length()));
+ string key = combine_strings(prefix, k);
+ // bufferlist::c_str() is non-constant, so we can't call c_str()
+ if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
+ bat.Merge(
+ db->default_cf,
+ rocksdb::Slice(key),
+ rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length()));
+ } else {
+ // make a copy
+ rocksdb::Slice key_slice(key);
+ vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
+ bat.Merge(
+ db->default_cf,
+ rocksdb::SliceParts(&key_slice, 1),
+ prepare_sliceparts(to_set_bl, &value_slices));
+ }
}
}
-//gets will bypass RocksDB row cache, since it uses iterator
int RocksDBStore::get(
const string &prefix,
const std::set<string> &keys,
std::map<string, bufferlist> *out)
{
utime_t start = ceph_clock_now();
- for (std::set<string>::const_iterator i = keys.begin();
- i != keys.end(); ++i) {
- std::string value;
- std::string bound = combine_strings(prefix, *i);
- auto status = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(bound), &value);
- if (status.ok())
- (*out)[*i].append(value);
+ auto cf = get_cf_handle(prefix);
+ if (cf) {
+ for (auto& key : keys) {
+ std::string value;
+ auto status = db->Get(rocksdb::ReadOptions(),
+ cf,
+ rocksdb::Slice(key),
+ &value);
+ if (status.ok()) {
+ (*out)[key].append(value);
+ } else if (status.IsIOError()) {
+ ceph_abort_msg(status.getState());
+ }
+ }
+ } else {
+ for (auto& key : keys) {
+ std::string value;
+ string k = combine_strings(prefix, key);
+ auto status = db->Get(rocksdb::ReadOptions(),
+ default_cf,
+ rocksdb::Slice(k),
+ &value);
+ if (status.ok()) {
+ (*out)[key].append(value);
+ } else if (status.IsIOError()) {
+ ceph_abort_msg(status.getState());
+ }
+ }
}
utime_t lat = ceph_clock_now() - start;
logger->inc(l_rocksdb_gets);
const string &key,
bufferlist *out)
{
- assert(out && (out->length() == 0));
+ ceph_assert(out && (out->length() == 0));
utime_t start = ceph_clock_now();
int r = 0;
- string value, k;
+ string value;
rocksdb::Status s;
- k = combine_strings(prefix, key);
- s = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(k), &value);
+ auto cf = get_cf_handle(prefix);
+ if (cf) {
+ s = db->Get(rocksdb::ReadOptions(),
+ cf,
+ rocksdb::Slice(key),
+ &value);
+ } else {
+ string k = combine_strings(prefix, key);
+ s = db->Get(rocksdb::ReadOptions(),
+ default_cf,
+ rocksdb::Slice(k),
+ &value);
+ }
if (s.ok()) {
out->append(value);
- } else {
+ } else if (s.IsNotFound()) {
r = -ENOENT;
+ } else {
+ ceph_abort_msg(s.getState());
}
utime_t lat = ceph_clock_now() - start;
logger->inc(l_rocksdb_gets);
size_t keylen,
bufferlist *out)
{
- assert(out && (out->length() == 0));
+ ceph_assert(out && (out->length() == 0));
utime_t start = ceph_clock_now();
int r = 0;
- string value, k;
- combine_strings(prefix, key, keylen, &k);
+ string value;
rocksdb::Status s;
- s = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(k), &value);
+ auto cf = get_cf_handle(prefix);
+ if (cf) {
+ s = db->Get(rocksdb::ReadOptions(),
+ cf,
+ rocksdb::Slice(key, keylen),
+ &value);
+ } else {
+ string k;
+ combine_strings(prefix, key, keylen, &k);
+ s = db->Get(rocksdb::ReadOptions(),
+ default_cf,
+ rocksdb::Slice(k),
+ &value);
+ }
if (s.ok()) {
out->append(value);
- } else {
+ } else if (s.IsNotFound()) {
r = -ENOENT;
+ } else {
+ ceph_abort_msg(s.getState());
}
utime_t lat = ceph_clock_now() - start;
logger->inc(l_rocksdb_gets);
{
logger->inc(l_rocksdb_compact);
rocksdb::CompactRangeOptions options;
- db->CompactRange(options, nullptr, nullptr);
+ db->CompactRange(options, default_cf, nullptr, nullptr);
+ for (auto cf : cf_handles) {
+ db->CompactRange(
+ options,
+ static_cast<rocksdb::ColumnFamilyHandle*>(cf.second),
+ nullptr, nullptr);
+ }
}
logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
compact_queue_lock.Unlock();
logger->inc(l_rocksdb_compact_range);
- compact_range(range.first, range.second);
+ if (range.first.empty() && range.second.empty()) {
+ compact();
+ } else {
+ compact_range(range.first, range.second);
+ }
compact_queue_lock.Lock();
continue;
}
void RocksDBStore::compact_range_async(const string& start, const string& end)
{
- Mutex::Locker l(compact_queue_lock);
+ std::lock_guard l(compact_queue_lock);
// try to merge adjacent ranges. this is O(n), but the queue should
// be short. note that we do not cover all overlap cases and merge
rocksdb::Slice cend(end);
db->CompactRange(options, &cstart, &cend);
}
+
RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl()
{
delete dbiter;
int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first()
{
dbiter->SeekToFirst();
+ ceph_assert(!dbiter->status().IsIOError());
return dbiter->status().ok() ? 0 : -1;
}
int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string &prefix)
{
rocksdb::Slice slice_prefix(prefix);
dbiter->Seek(slice_prefix);
+ ceph_assert(!dbiter->status().IsIOError());
return dbiter->status().ok() ? 0 : -1;
}
int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last()
{
dbiter->SeekToLast();
+ ceph_assert(!dbiter->status().IsIOError());
return dbiter->status().ok() ? 0 : -1;
}
int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string &prefix)
if (valid()) {
dbiter->Next();
}
+ ceph_assert(!dbiter->status().IsIOError());
return dbiter->status().ok() ? 0 : -1;
}
int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev()
if (valid()) {
dbiter->Prev();
}
+ ceph_assert(!dbiter->status().IsIOError());
return dbiter->status().ok() ? 0 : -1;
}
string RocksDBStore::RocksDBWholeSpaceIteratorImpl::key()
return limit;
}
-RocksDBStore::WholeSpaceIterator RocksDBStore::_get_iterator()
+RocksDBStore::WholeSpaceIterator RocksDBStore::get_wholespace_iterator()
{
return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
- db->NewIterator(rocksdb::ReadOptions()));
+ db->NewIterator(rocksdb::ReadOptions(), default_cf));
}
+class CFIteratorImpl : public KeyValueDB::IteratorImpl {
+protected:
+ string prefix;
+ rocksdb::Iterator *dbiter;
+public:
+ explicit CFIteratorImpl(const std::string& p,
+ rocksdb::Iterator *iter)
+ : prefix(p), dbiter(iter) { }
+ ~CFIteratorImpl() {
+ delete dbiter;
+ }
+
+ int seek_to_first() override {
+ dbiter->SeekToFirst();
+ return dbiter->status().ok() ? 0 : -1;
+ }
+ int seek_to_last() override {
+ dbiter->SeekToLast();
+ return dbiter->status().ok() ? 0 : -1;
+ }
+ int upper_bound(const string &after) override {
+ lower_bound(after);
+ if (valid() && (key() == after)) {
+ next();
+ }
+ return dbiter->status().ok() ? 0 : -1;
+ }
+ int lower_bound(const string &to) override {
+ rocksdb::Slice slice_bound(to);
+ dbiter->Seek(slice_bound);
+ return dbiter->status().ok() ? 0 : -1;
+ }
+ int next() override {
+ if (valid()) {
+ dbiter->Next();
+ }
+ return dbiter->status().ok() ? 0 : -1;
+ }
+ int prev() override {
+ if (valid()) {
+ dbiter->Prev();
+ }
+ return dbiter->status().ok() ? 0 : -1;
+ }
+ bool valid() override {
+ return dbiter->Valid();
+ }
+ string key() override {
+ return dbiter->key().ToString();
+ }
+ std::pair<std::string, std::string> raw_key() override {
+ return make_pair(prefix, key());
+ }
+ bufferlist value() override {
+ return to_bufferlist(dbiter->value());
+ }
+ bufferptr value_as_ptr() override {
+ rocksdb::Slice val = dbiter->value();
+ return bufferptr(val.data(), val.size());
+ }
+ int status() override {
+ return dbiter->status().ok() ? 0 : -1;
+ }
+};
+
+KeyValueDB::Iterator RocksDBStore::get_iterator(const std::string& prefix)
+{
+ rocksdb::ColumnFamilyHandle *cf_handle =
+ static_cast<rocksdb::ColumnFamilyHandle*>(get_cf_handle(prefix));
+ if (cf_handle) {
+ return std::make_shared<CFIteratorImpl>(
+ prefix,
+ db->NewIterator(rocksdb::ReadOptions(), cf_handle));
+ } else {
+ return KeyValueDB::get_iterator(prefix);
+ }
+}