#include "rocksdb/iostats_context.h"
#include "rocksdb/statistics.h"
#include "rocksdb/table.h"
+#include "rocksdb/db.h"
#include "kv/rocksdb_cache/BinnedLRUCache.h"
#include <errno.h>
#include "common/errno.h"
#include "common/Cond.h"
#include "common/ceph_context.h"
#include "common/PriorityCache.h"
-
+#include "common/pretty_binary.h"
enum {
l_rocksdb_first = 34300,
l_rocksdb_gets,
- l_rocksdb_txns,
- l_rocksdb_txns_sync,
l_rocksdb_get_latency,
l_rocksdb_submit_latency,
l_rocksdb_submit_sync_latency,
class RocksDBStore : public KeyValueDB {
CephContext *cct;
PerfCounters *logger;
- string path;
- map<string,string> kv_options;
+ std::string path;
+ std::map<std::string,std::string> kv_options;
void *priv;
rocksdb::DB *db;
rocksdb::Env *env;
+ const rocksdb::Comparator* comparator;
std::shared_ptr<rocksdb::Statistics> dbstats;
rocksdb::BlockBasedTableOptions bbt_opts;
- string options_str;
+ std::string options_str;
uint64_t cache_size = 0;
bool set_cache_flag = false;
+ friend class ShardMergeIteratorImpl;
+ friend class WholeMergeIteratorImpl;
+ /*
+ * See RocksDB's definition of a column family(CF) and how to use it.
+ * The interfaces of KeyValueDB is extended, when a column family is created.
+ * Prefix will be the name of column family to use.
+ */
+public:
+ struct ColumnFamily {
+ string name; //< name of this individual column family
+ size_t shard_cnt; //< count of shards
+ string options; //< configure option string for this CF
+ uint32_t hash_l; //< first character to take for hash calc.
+ uint32_t hash_h; //< last character to take for hash calc.
+ ColumnFamily(const string &name, size_t shard_cnt, const string &options,
+ uint32_t hash_l, uint32_t hash_h)
+ : name(name), shard_cnt(shard_cnt), options(options), hash_l(hash_l), hash_h(hash_h) {}
+ };
+private:
+ friend std::ostream& operator<<(std::ostream& out, const ColumnFamily& cf);
bool must_close_default_cf = false;
rocksdb::ColumnFamilyHandle *default_cf = nullptr;
+ /// column families in use, name->handles
+ struct prefix_shards {
+ uint32_t hash_l; //< first character to take for hash calc.
+ uint32_t hash_h; //< last character to take for hash calc.
+ std::vector<rocksdb::ColumnFamilyHandle *> handles;
+ };
+ std::unordered_map<std::string, prefix_shards> cf_handles;
+ std::unordered_map<uint32_t, std::string> cf_ids_to_prefix;
+ std::unordered_map<std::string, rocksdb::BlockBasedTableOptions> cf_bbt_opts;
+
+ void add_column_family(const std::string& cf_name, uint32_t hash_l, uint32_t hash_h,
+ size_t shard_idx, rocksdb::ColumnFamilyHandle *handle);
+ bool is_column_family(const std::string& prefix);
+ rocksdb::ColumnFamilyHandle *get_cf_handle(const std::string& prefix, const std::string& key);
+ rocksdb::ColumnFamilyHandle *get_cf_handle(const std::string& prefix, const char* key, size_t keylen);
+
int submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t);
- int install_cf_mergeop(const string &cf_name, rocksdb::ColumnFamilyOptions *cf_opt);
+ int install_cf_mergeop(const std::string &cf_name, rocksdb::ColumnFamilyOptions *cf_opt);
int create_db_dir();
- int do_open(ostream &out, bool create_if_missing, bool open_readonly,
- const vector<ColumnFamily>* cfs = nullptr);
+ int do_open(std::ostream &out, bool create_if_missing, bool open_readonly,
+ const std::string& cfs="");
int load_rocksdb_options(bool create_if_missing, rocksdb::Options& opt);
+public:
+ static bool parse_sharding_def(const std::string_view text_def,
+ std::vector<ColumnFamily>& sharding_def,
+ char const* *error_position = nullptr,
+ std::string *error_msg = nullptr);
+ const rocksdb::Comparator* get_comparator() const {
+ return comparator;
+ }
+private:
+ static void sharding_def_to_columns(const std::vector<ColumnFamily>& sharding_def,
+ std::vector<std::string>& columns);
+ int create_shards(const rocksdb::Options& opt,
+ const vector<ColumnFamily>& sharding_def);
+ int apply_sharding(const rocksdb::Options& opt,
+ const std::string& sharding_text);
+ int verify_sharding(const rocksdb::Options& opt,
+ std::vector<rocksdb::ColumnFamilyDescriptor>& existing_cfs,
+ std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> >& existing_cfs_shard,
+ std::vector<rocksdb::ColumnFamilyDescriptor>& missing_cfs,
+ std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> >& missing_cfs_shard);
+ std::shared_ptr<rocksdb::Cache> create_block_cache(const std::string& cache_type, size_t cache_size, double cache_prio_high = 0.0);
+ int extract_block_cache_options(const std::string& opts_str,
+ std::unordered_map<std::string, std::string>* column_opts_map,
+ std::string* block_cache_opt);
// manage async compactions
ceph::mutex compact_queue_lock =
ceph::make_mutex("RocksDBStore::compact_thread_lock");
ceph::condition_variable compact_queue_cond;
- list< pair<string,string> > compact_queue;
+ std::list<std::pair<std::string,std::string>> compact_queue;
bool compact_queue_stop;
class CompactThread : public Thread {
RocksDBStore *db;
void compact_thread_entry();
- void compact_range(const string& start, const string& end);
- void compact_range_async(const string& start, const string& end);
- int tryInterpret(const string& key, const string& val, rocksdb::Options& opt);
+ void compact_range(const std::string& start, const std::string& end);
+ void compact_range_async(const std::string& start, const std::string& end);
+ int tryInterpret(const std::string& key, const std::string& val,
+ rocksdb::Options& opt);
public:
/// compact the underlying rocksdb store
void compact() override;
void compact_async() override {
- compact_range_async(string(), string());
+ compact_range_async({}, {});
}
- int ParseOptionsFromString(const string& opt_str, rocksdb::Options& opt);
+ int ParseOptionsFromString(const std::string& opt_str, rocksdb::Options& opt);
static int ParseOptionsFromStringStatic(
CephContext* cct,
- const string& opt_str,
+ const std::string& opt_str,
rocksdb::Options &opt,
- function<int(const string&, const string&, rocksdb::Options&)> interp);
- static int _test_init(const string& dir);
- int init(string options_str) override;
+ std::function<int(const std::string&, const std::string&, rocksdb::Options&)> interp);
+ static int _test_init(const std::string& dir);
+ int init(std::string options_str) override;
/// compact rocksdb for all keys with a given prefix
- void compact_prefix(const string& prefix) override {
+ void compact_prefix(const std::string& prefix) override {
compact_range(prefix, past_prefix(prefix));
}
- void compact_prefix_async(const string& prefix) override {
+ void compact_prefix_async(const std::string& prefix) override {
compact_range_async(prefix, past_prefix(prefix));
}
- void compact_range(const string& prefix, const string& start, const string& end) override {
+ void compact_range(const std::string& prefix, const std::string& start,
+ const std::string& end) override {
compact_range(combine_strings(prefix, start), combine_strings(prefix, end));
}
- void compact_range_async(const string& prefix, const string& start, const string& end) override {
+ void compact_range_async(const std::string& prefix, const std::string& start,
+ const std::string& end) override {
compact_range_async(combine_strings(prefix, start), combine_strings(prefix, end));
}
- RocksDBStore(CephContext *c, const string &path, map<string,string> opt, void *p) :
+ RocksDBStore(CephContext *c, const std::string &path, std::map<std::string,std::string> opt, void *p) :
cct(c),
logger(NULL),
path(path),
priv(p),
db(NULL),
env(static_cast<rocksdb::Env*>(p)),
+ comparator(nullptr),
dbstats(NULL),
compact_queue_stop(false),
compact_thread(this),
~RocksDBStore() override;
- static bool check_omap_dir(string &omap_dir);
+ static bool check_omap_dir(std::string &omap_dir);
/// Opens underlying db
- int open(ostream &out, const vector<ColumnFamily>& cfs = {}) override {
- return do_open(out, false, false, &cfs);
+ int open(std::ostream &out, const std::string& cfs="") override {
+ return do_open(out, false, false, cfs);
}
/// Creates underlying db if missing and opens it
- int create_and_open(ostream &out,
- const vector<ColumnFamily>& cfs = {}) override;
+ int create_and_open(std::ostream &out,
+ const std::string& cfs="") override;
- int open_read_only(ostream &out, const vector<ColumnFamily>& cfs = {}) override {
- return do_open(out, false, true, &cfs);
+ int open_read_only(std::ostream &out, const std::string& cfs="") override {
+ return do_open(out, false, true, cfs);
}
void close() override;
- rocksdb::ColumnFamilyHandle *get_cf_handle(const std::string& cf_name) {
- auto iter = cf_handles.find(cf_name);
- if (iter == cf_handles.end())
- return nullptr;
- else
- return static_cast<rocksdb::ColumnFamilyHandle*>(iter->second);
- }
int repair(std::ostream &out) override;
void split_stats(const std::string &s, char delim, std::vector<std::string> &elems);
- void get_statistics(Formatter *f) override;
+ void get_statistics(ceph::Formatter *f) override;
PerfCounters *get_perf_counters() override
{
const std::string &property,
uint64_t *out) final;
- int64_t estimate_prefix_size(const string& prefix,
- const string& key_prefix) override;
-
- struct RocksWBHandler: public rocksdb::WriteBatch::Handler {
- std::string seen ;
- int num_seen = 0;
- static string pretty_binary_string(const string& in) {
- char buf[10];
- string out;
- out.reserve(in.length() * 3);
- enum { NONE, HEX, STRING } mode = NONE;
- unsigned from = 0, i;
- for (i=0; i < in.length(); ++i) {
- if ((in[i] < 32 || (unsigned char)in[i] > 126) ||
- (mode == HEX && in.length() - i >= 4 &&
- ((in[i] < 32 || (unsigned char)in[i] > 126) ||
- (in[i+1] < 32 || (unsigned char)in[i+1] > 126) ||
- (in[i+2] < 32 || (unsigned char)in[i+2] > 126) ||
- (in[i+3] < 32 || (unsigned char)in[i+3] > 126)))) {
-
- if (mode == STRING) {
- out.append(in.substr(from, i - from));
- out.push_back('\'');
- }
- if (mode != HEX) {
- out.append("0x");
- mode = HEX;
- }
- if (in.length() - i >= 4) {
- // print a whole u32 at once
- snprintf(buf, sizeof(buf), "%08x",
- (uint32_t)(((unsigned char)in[i] << 24) |
- ((unsigned char)in[i+1] << 16) |
- ((unsigned char)in[i+2] << 8) |
- ((unsigned char)in[i+3] << 0)));
- i += 3;
- } else {
- snprintf(buf, sizeof(buf), "%02x", (int)(unsigned char)in[i]);
- }
- out.append(buf);
- } else {
- if (mode != STRING) {
- out.push_back('\'');
- mode = STRING;
- from = i;
- }
- }
- }
- if (mode == STRING) {
- out.append(in.substr(from, i - from));
- out.push_back('\'');
- }
- return out;
- }
- void Put(const rocksdb::Slice& key,
- const rocksdb::Slice& value) override {
- string prefix ((key.ToString()).substr(0,1));
- string key_to_decode ((key.ToString()).substr(2,string::npos));
- uint64_t size = (value.ToString()).size();
- seen += "\nPut( Prefix = " + prefix + " key = "
- + pretty_binary_string(key_to_decode)
- + " Value size = " + std::to_string(size) + ")";
- num_seen++;
- }
- void SingleDelete(const rocksdb::Slice& key) override {
- string prefix ((key.ToString()).substr(0,1));
- string key_to_decode ((key.ToString()).substr(2,string::npos));
- seen += "\nSingleDelete(Prefix = "+ prefix + " Key = "
- + pretty_binary_string(key_to_decode) + ")";
- num_seen++;
- }
- void Delete(const rocksdb::Slice& key) override {
- string prefix ((key.ToString()).substr(0,1));
- string key_to_decode ((key.ToString()).substr(2,string::npos));
- seen += "\nDelete( Prefix = " + prefix + " key = "
- + pretty_binary_string(key_to_decode) + ")";
-
- num_seen++;
- }
- void Merge(const rocksdb::Slice& key,
- const rocksdb::Slice& value) override {
- string prefix ((key.ToString()).substr(0,1));
- string key_to_decode ((key.ToString()).substr(2,string::npos));
- uint64_t size = (value.ToString()).size();
- seen += "\nMerge( Prefix = " + prefix + " key = "
- + pretty_binary_string(key_to_decode) + " Value size = "
- + std::to_string(size) + ")";
-
- num_seen++;
- }
- bool Continue() override { return num_seen < 50; }
-
- };
-
+ int64_t estimate_prefix_size(const std::string& prefix,
+ const std::string& key_prefix) override;
+ struct RocksWBHandler;
class RocksDBTransactionImpl : public KeyValueDB::TransactionImpl {
public:
rocksdb::WriteBatch bat;
void put_bat(
rocksdb::WriteBatch& bat,
rocksdb::ColumnFamilyHandle *cf,
- const string &k,
- const bufferlist &to_set_bl);
+ const std::string &k,
+ const ceph::bufferlist &to_set_bl);
public:
void set(
- const string &prefix,
- const string &k,
- const bufferlist &bl) override;
+ const std::string &prefix,
+ const std::string &k,
+ const ceph::bufferlist &bl) override;
void set(
- const string &prefix,
+ const std::string &prefix,
const char *k,
size_t keylen,
- const bufferlist &bl) override;
+ const ceph::bufferlist &bl) override;
void rmkey(
- const string &prefix,
- const string &k) override;
+ const std::string &prefix,
+ const std::string &k) override;
void rmkey(
- const string &prefix,
+ const std::string &prefix,
const char *k,
size_t keylen) override;
void rm_single_key(
- const string &prefix,
- const string &k) override;
+ const std::string &prefix,
+ const std::string &k) override;
void rmkeys_by_prefix(
- const string &prefix
+ const std::string &prefix
) override;
void rm_range_keys(
- const string &prefix,
- const string &start,
- const string &end) override;
+ const std::string &prefix,
+ const std::string &start,
+ const std::string &end) override;
void merge(
- const string& prefix,
- const string& k,
- const bufferlist &bl) override;
+ const std::string& prefix,
+ const std::string& k,
+ const ceph::bufferlist &bl) override;
};
KeyValueDB::Transaction get_transaction() override {
int submit_transaction(KeyValueDB::Transaction t) override;
int submit_transaction_sync(KeyValueDB::Transaction t) override;
int get(
- const string &prefix,
- const std::set<string> &key,
- std::map<string, bufferlist> *out
+ const std::string &prefix,
+ const std::set<std::string> &key,
+ std::map<std::string, ceph::bufferlist> *out
) override;
int get(
- const string &prefix,
- const string &key,
- bufferlist *out
+ const std::string &prefix,
+ const std::string &key,
+ ceph::bufferlist *out
) override;
int get(
- const string &prefix,
+ const std::string &prefix,
const char *key,
size_t keylen,
- bufferlist *out) override;
+ ceph::bufferlist *out) override;
class RocksDBWholeSpaceIteratorImpl :
~RocksDBWholeSpaceIteratorImpl() override;
int seek_to_first() override;
- int seek_to_first(const string &prefix) override;
+ int seek_to_first(const std::string &prefix) override;
int seek_to_last() override;
- int seek_to_last(const string &prefix) override;
- int upper_bound(const string &prefix, const string &after) override;
- int lower_bound(const string &prefix, const string &to) override;
+ int seek_to_last(const std::string &prefix) override;
+ int upper_bound(const std::string &prefix, const std::string &after) override;
+ int lower_bound(const std::string &prefix, const std::string &to) override;
bool valid() override;
int next() override;
int prev() override;
- string key() override;
- pair<string,string> raw_key() override;
- bool raw_key_is_prefixed(const string &prefix) override;
- bufferlist value() override;
- bufferptr value_as_ptr() override;
+ std::string key() override;
+ std::pair<std::string,std::string> raw_key() override;
+ bool raw_key_is_prefixed(const std::string &prefix) override;
+ ceph::bufferlist value() override;
+ ceph::bufferptr value_as_ptr() override;
int status() override;
size_t key_size() override;
size_t value_size() override;
};
- Iterator get_iterator(const std::string& prefix) override;
-
+ Iterator get_iterator(const std::string& prefix, IteratorOpts opts = 0) override;
+private:
+ /// this iterator spans single cf
+ rocksdb::Iterator* new_shard_iterator(rocksdb::ColumnFamilyHandle* cf);
+public:
/// Utility
- static string combine_strings(const string &prefix, const string &value) {
- string out = prefix;
+ static std::string combine_strings(const std::string &prefix, const std::string &value) {
+ std::string out = prefix;
out.push_back(0);
out.append(value);
return out;
}
- static void combine_strings(const string &prefix,
+ static void combine_strings(const std::string &prefix,
const char *key, size_t keylen,
- string *out) {
+ std::string *out) {
out->reserve(prefix.size() + 1 + keylen);
*out = prefix;
out->push_back(0);
out->append(key, keylen);
}
- static int split_key(rocksdb::Slice in, string *prefix, string *key);
+ static int split_key(rocksdb::Slice in, std::string *prefix, std::string *key);
- static string past_prefix(const string &prefix);
+ static std::string past_prefix(const std::string &prefix);
class MergeOperatorRouter;
class MergeOperatorLinker;
int set_merge_operator(
const std::string& prefix,
std::shared_ptr<KeyValueDB::MergeOperator> mop) override;
- string assoc_name; ///< Name of associative operator
+ std::string assoc_name; ///< Name of associative operator
- uint64_t get_estimated_size(map<string,uint64_t> &extra) override {
+ uint64_t get_estimated_size(std::map<std::string,uint64_t> &extra) override {
DIR *store_dir = opendir(path.c_str());
if (!store_dir) {
lderr(cct) << __func__ << " something happened opening the store: "
struct dirent *entry = NULL;
while ((entry = readdir(store_dir)) != NULL) {
- string n(entry->d_name);
+ std::string n(entry->d_name);
if (n == "." || n == "..")
continue;
- string fpath = path + '/' + n;
+ std::string fpath = path + '/' + n;
struct stat s;
int err = stat(fpath.c_str(), &s);
if (err < 0)
}
size_t pos = n.find_last_of('.');
- if (pos == string::npos) {
+ if (pos == std::string::npos) {
misc_size += s.st_size;
continue;
}
- string ext = n.substr(pos+1);
+ std::string ext = n.substr(pos+1);
if (ext == "sst") {
sst_size += s.st_size;
} else if (ext == "log") {
return static_cast<int64_t>(bbt_opts.block_cache->GetUsage());
}
+ virtual int64_t get_cache_usage(string prefix) const override {
+ auto it = cf_bbt_opts.find(prefix);
+ if (it != cf_bbt_opts.end() && it->second.block_cache) {
+ return static_cast<int64_t>(it->second.block_cache->GetUsage());
+ }
+ return -EINVAL;
+ }
+
int set_cache_size(uint64_t s) override {
cache_size = s;
set_cache_flag = true;
return 0;
}
- virtual std::shared_ptr<PriorityCache::PriCache> get_priority_cache()
- const override {
+ virtual std::shared_ptr<PriorityCache::PriCache>
+ get_priority_cache() const override {
return dynamic_pointer_cast<PriorityCache::PriCache>(
bbt_opts.block_cache);
}
- WholeSpaceIterator get_wholespace_iterator() override;
-};
+ virtual std::shared_ptr<PriorityCache::PriCache>
+ get_priority_cache(string prefix) const override {
+ auto it = cf_bbt_opts.find(prefix);
+ if (it != cf_bbt_opts.end()) {
+ return dynamic_pointer_cast<PriorityCache::PriCache>(
+ it->second.block_cache);
+ }
+ return nullptr;
+ }
+ WholeSpaceIterator get_wholespace_iterator(IteratorOpts opts = 0) override;
+private:
+ WholeSpaceIterator get_default_cf_iterator();
+
+ using cf_deleter_t = std::function<void(rocksdb::ColumnFamilyHandle*)>;
+ using columns_t = std::map<std::string,
+ std::unique_ptr<rocksdb::ColumnFamilyHandle,
+ cf_deleter_t>>;
+ int prepare_for_reshard(const std::string& new_sharding,
+ columns_t& to_process_columns);
+ int reshard_cleanup(const columns_t& current_columns);
+public:
+ struct resharding_ctrl {
+ size_t bytes_per_iterator = 10000000; /// amount of data to process before refreshing iterator
+ size_t keys_per_iterator = 10000;
+ size_t bytes_per_batch = 1000000; /// amount of data before submitting batch
+ size_t keys_per_batch = 1000;
+ bool unittest_fail_after_first_batch = false;
+ bool unittest_fail_after_processing_column = false;
+ bool unittest_fail_after_successful_processing = false;
+ };
+ int reshard(const std::string& new_sharding, const resharding_ctrl* ctrl = nullptr);
+ bool get_sharding(std::string& sharding);
+};
#endif