]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/kv/RocksDBStore.h
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / kv / RocksDBStore.h
index daceb4b08cb9db773bd6a865925ed3c6112651bb..0735e115c36489d1acd0d0f1dbc60bb501774126 100644 (file)
@@ -16,6 +16,7 @@
 #include "rocksdb/iostats_context.h"
 #include "rocksdb/statistics.h"
 #include "rocksdb/table.h"
+#include "rocksdb/db.h"
 #include "kv/rocksdb_cache/BinnedLRUCache.h"
 #include <errno.h>
 #include "common/errno.h"
 #include "common/Cond.h"
 #include "common/ceph_context.h"
 #include "common/PriorityCache.h"
-
+#include "common/pretty_binary.h"
 
 enum {
   l_rocksdb_first = 34300,
   l_rocksdb_gets,
-  l_rocksdb_txns,
-  l_rocksdb_txns_sync,
   l_rocksdb_get_latency,
   l_rocksdb_submit_latency,
   l_rocksdb_submit_sync_latency,
@@ -72,33 +71,94 @@ extern rocksdb::Logger *create_rocksdb_ceph_logger();
 class RocksDBStore : public KeyValueDB {
   CephContext *cct;
   PerfCounters *logger;
-  string path;
-  map<string,string> kv_options;
+  std::string path;
+  std::map<std::string,std::string> kv_options;
   void *priv;
   rocksdb::DB *db;
   rocksdb::Env *env;
+  const rocksdb::Comparator* comparator;
   std::shared_ptr<rocksdb::Statistics> dbstats;
   rocksdb::BlockBasedTableOptions bbt_opts;
-  string options_str;
+  std::string options_str;
 
   uint64_t cache_size = 0;
   bool set_cache_flag = false;
+  friend class ShardMergeIteratorImpl;
+  friend class WholeMergeIteratorImpl;
+  /*
+   *  See RocksDB's definition of a column family(CF) and how to use it.
+   *  The interfaces of KeyValueDB is extended, when a column family is created.
+   *  Prefix will be the name of column family to use.
+   */
+public:
+  struct ColumnFamily {
+    string name;      //< name of this individual column family
+    size_t shard_cnt; //< count of shards
+    string options;   //< configure option string for this CF
+    uint32_t hash_l;  //< first character to take for hash calc.
+    uint32_t hash_h;  //< last character to take for hash calc.
+    ColumnFamily(const string &name, size_t shard_cnt, const string &options,
+                uint32_t hash_l, uint32_t hash_h)
+      : name(name), shard_cnt(shard_cnt), options(options), hash_l(hash_l), hash_h(hash_h) {}
+  };
+private:
+  friend std::ostream& operator<<(std::ostream& out, const ColumnFamily& cf);
 
   bool must_close_default_cf = false;
   rocksdb::ColumnFamilyHandle *default_cf = nullptr;
 
+  /// column families in use, name->handles
+  struct prefix_shards {
+    uint32_t hash_l;  //< first character to take for hash calc.
+    uint32_t hash_h;  //< last character to take for hash calc.
+    std::vector<rocksdb::ColumnFamilyHandle *> handles;
+  };
+  std::unordered_map<std::string, prefix_shards> cf_handles;
+  std::unordered_map<uint32_t, std::string> cf_ids_to_prefix;
+  std::unordered_map<std::string, rocksdb::BlockBasedTableOptions> cf_bbt_opts;
+  
+  void add_column_family(const std::string& cf_name, uint32_t hash_l, uint32_t hash_h,
+                        size_t shard_idx, rocksdb::ColumnFamilyHandle *handle);
+  bool is_column_family(const std::string& prefix);
+  rocksdb::ColumnFamilyHandle *get_cf_handle(const std::string& prefix, const std::string& key);
+  rocksdb::ColumnFamilyHandle *get_cf_handle(const std::string& prefix, const char* key, size_t keylen);
+
   int submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t);
-  int install_cf_mergeop(const string &cf_name, rocksdb::ColumnFamilyOptions *cf_opt);
+  int install_cf_mergeop(const std::string &cf_name, rocksdb::ColumnFamilyOptions *cf_opt);
   int create_db_dir();
-  int do_open(ostream &out, bool create_if_missing, bool open_readonly,
-             const vector<ColumnFamily>* cfs = nullptr);
+  int do_open(std::ostream &out, bool create_if_missing, bool open_readonly,
+             const std::string& cfs="");
   int load_rocksdb_options(bool create_if_missing, rocksdb::Options& opt);
+public:
+  static bool parse_sharding_def(const std::string_view text_def,
+                               std::vector<ColumnFamily>& sharding_def,
+                               char const* *error_position = nullptr,
+                               std::string *error_msg = nullptr);
+  const rocksdb::Comparator* get_comparator() const {
+    return comparator;
+  }
 
+private:
+  static void sharding_def_to_columns(const std::vector<ColumnFamily>& sharding_def,
+                                     std::vector<std::string>& columns);
+  int create_shards(const rocksdb::Options& opt,
+                   const vector<ColumnFamily>& sharding_def);
+  int apply_sharding(const rocksdb::Options& opt,
+                    const std::string& sharding_text);
+  int verify_sharding(const rocksdb::Options& opt,
+                     std::vector<rocksdb::ColumnFamilyDescriptor>& existing_cfs,
+                     std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> >& existing_cfs_shard,
+                     std::vector<rocksdb::ColumnFamilyDescriptor>& missing_cfs,
+                     std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> >& missing_cfs_shard);
+  std::shared_ptr<rocksdb::Cache> create_block_cache(const std::string& cache_type, size_t cache_size, double cache_prio_high = 0.0);
+  int extract_block_cache_options(const std::string& opts_str,
+                                 std::unordered_map<std::string, std::string>* column_opts_map,
+                                 std::string* block_cache_opt);
   // manage async compactions
   ceph::mutex compact_queue_lock =
     ceph::make_mutex("RocksDBStore::compact_thread_lock");
   ceph::condition_variable compact_queue_cond;
-  list< pair<string,string> > compact_queue;
+  std::list<std::pair<std::string,std::string>> compact_queue;
   bool compact_queue_stop;
   class CompactThread : public Thread {
     RocksDBStore *db;
@@ -113,9 +173,10 @@ class RocksDBStore : public KeyValueDB {
 
   void compact_thread_entry();
 
-  void compact_range(const string& start, const string& end);
-  void compact_range_async(const string& start, const string& end);
-  int tryInterpret(const string& key, const string& val, rocksdb::Options& opt);
+  void compact_range(const std::string& start, const std::string& end);
+  void compact_range_async(const std::string& start, const std::string& end);
+  int tryInterpret(const std::string& key, const std::string& val,
+                  rocksdb::Options& opt);
 
 public:
   /// compact the underlying rocksdb store
@@ -125,33 +186,35 @@ public:
   void compact() override;
 
   void compact_async() override {
-    compact_range_async(string(), string());
+    compact_range_async({}, {});
   }
 
-  int ParseOptionsFromString(const string& opt_str, rocksdb::Options& opt);
+  int ParseOptionsFromString(const std::string& opt_str, rocksdb::Options& opt);
   static int ParseOptionsFromStringStatic(
     CephContext* cct,
-    const string& opt_str,
+    const std::string& opt_str,
     rocksdb::Options &opt,
-    function<int(const string&, const string&, rocksdb::Options&)> interp);
-  static int _test_init(const string& dir);
-  int init(string options_str) override;
+    std::function<int(const std::string&, const std::string&, rocksdb::Options&)> interp);
+  static int _test_init(const std::string& dir);
+  int init(std::string options_str) override;
   /// compact rocksdb for all keys with a given prefix
-  void compact_prefix(const string& prefix) override {
+  void compact_prefix(const std::string& prefix) override {
     compact_range(prefix, past_prefix(prefix));
   }
-  void compact_prefix_async(const string& prefix) override {
+  void compact_prefix_async(const std::string& prefix) override {
     compact_range_async(prefix, past_prefix(prefix));
   }
 
-  void compact_range(const string& prefix, const string& start, const string& end) override {
+  void compact_range(const std::string& prefix, const std::string& start,
+                    const std::string& end) override {
     compact_range(combine_strings(prefix, start), combine_strings(prefix, end));
   }
-  void compact_range_async(const string& prefix, const string& start, const string& end) override {
+  void compact_range_async(const std::string& prefix, const std::string& start,
+                          const std::string& end) override {
     compact_range_async(combine_strings(prefix, start), combine_strings(prefix, end));
   }
 
-  RocksDBStore(CephContext *c, const string &path, map<string,string> opt, void *p) :
+  RocksDBStore(CephContext *c, const std::string &path, std::map<std::string,std::string> opt, void *p) :
     cct(c),
     logger(NULL),
     path(path),
@@ -159,6 +222,7 @@ public:
     priv(p),
     db(NULL),
     env(static_cast<rocksdb::Env*>(p)),
+    comparator(nullptr),
     dbstats(NULL),
     compact_queue_stop(false),
     compact_thread(this),
@@ -169,31 +233,24 @@ public:
 
   ~RocksDBStore() override;
 
-  static bool check_omap_dir(string &omap_dir);
+  static bool check_omap_dir(std::string &omap_dir);
   /// Opens underlying db
-  int open(ostream &out, const vector<ColumnFamily>& cfs = {}) override {
-    return do_open(out, false, false, &cfs);
+  int open(std::ostream &out, const std::string& cfs="") override {
+    return do_open(out, false, false, cfs);
   }
   /// Creates underlying db if missing and opens it
-  int create_and_open(ostream &out,
-                     const vector<ColumnFamily>& cfs = {}) override;
+  int create_and_open(std::ostream &out,
+                     const std::string& cfs="") override;
 
-  int open_read_only(ostream &out, const vector<ColumnFamily>& cfs = {}) override {
-    return do_open(out, false, true, &cfs);
+  int open_read_only(std::ostream &out, const std::string& cfs="") override {
+    return do_open(out, false, true, cfs);
   }
 
   void close() override;
 
-  rocksdb::ColumnFamilyHandle *get_cf_handle(const std::string& cf_name) {
-    auto iter = cf_handles.find(cf_name);
-    if (iter == cf_handles.end())
-      return nullptr;
-    else
-      return static_cast<rocksdb::ColumnFamilyHandle*>(iter->second);
-  }
   int repair(std::ostream &out) override;
   void split_stats(const std::string &s, char delim, std::vector<std::string> &elems);
-  void get_statistics(Formatter *f) override;
+  void get_statistics(ceph::Formatter *f) override;
 
   PerfCounters *get_perf_counters() override
   {
@@ -204,100 +261,9 @@ public:
     const std::string &property,
     uint64_t *out) final;
 
-  int64_t estimate_prefix_size(const string& prefix,
-                              const string& key_prefix) override;
-
-  struct  RocksWBHandler: public rocksdb::WriteBatch::Handler {
-    std::string seen ;
-    int num_seen = 0;
-    static string pretty_binary_string(const string& in) {
-      char buf[10];
-      string out;
-      out.reserve(in.length() * 3);
-      enum { NONE, HEX, STRING } mode = NONE;
-      unsigned from = 0, i;
-      for (i=0; i < in.length(); ++i) {
-        if ((in[i] < 32 || (unsigned char)in[i] > 126) ||
-          (mode == HEX && in.length() - i >= 4 &&
-          ((in[i] < 32 || (unsigned char)in[i] > 126) ||
-          (in[i+1] < 32 || (unsigned char)in[i+1] > 126) ||
-          (in[i+2] < 32 || (unsigned char)in[i+2] > 126) ||
-          (in[i+3] < 32 || (unsigned char)in[i+3] > 126)))) {
-
-          if (mode == STRING) {
-            out.append(in.substr(from, i - from));
-            out.push_back('\'');
-          }
-          if (mode != HEX) {
-            out.append("0x");
-            mode = HEX;
-          }
-          if (in.length() - i >= 4) {
-            // print a whole u32 at once
-            snprintf(buf, sizeof(buf), "%08x",
-                  (uint32_t)(((unsigned char)in[i] << 24) |
-                            ((unsigned char)in[i+1] << 16) |
-                            ((unsigned char)in[i+2] << 8) |
-                            ((unsigned char)in[i+3] << 0)));
-            i += 3;
-          } else {
-            snprintf(buf, sizeof(buf), "%02x", (int)(unsigned char)in[i]);
-          }
-          out.append(buf);
-        } else {
-          if (mode != STRING) {
-            out.push_back('\'');
-            mode = STRING;
-            from = i;
-          }
-        }
-      }
-      if (mode == STRING) {
-        out.append(in.substr(from, i - from));
-        out.push_back('\'');
-      }
-      return out;
-    }
-    void Put(const rocksdb::Slice& key,
-                    const rocksdb::Slice& value) override {
-      string prefix ((key.ToString()).substr(0,1));
-      string key_to_decode ((key.ToString()).substr(2,string::npos));
-      uint64_t size = (value.ToString()).size();
-      seen += "\nPut( Prefix = " + prefix + " key = " 
-            + pretty_binary_string(key_to_decode) 
-            + " Value size = " + std::to_string(size) + ")";
-      num_seen++;
-    }
-    void SingleDelete(const rocksdb::Slice& key) override {
-      string prefix ((key.ToString()).substr(0,1));
-      string key_to_decode ((key.ToString()).substr(2,string::npos));
-      seen += "\nSingleDelete(Prefix = "+ prefix + " Key = " 
-            + pretty_binary_string(key_to_decode) + ")";
-      num_seen++;
-    }
-    void Delete(const rocksdb::Slice& key) override {
-      string prefix ((key.ToString()).substr(0,1));
-      string key_to_decode ((key.ToString()).substr(2,string::npos));
-      seen += "\nDelete( Prefix = " + prefix + " key = " 
-            + pretty_binary_string(key_to_decode) + ")";
-
-      num_seen++;
-    }
-    void Merge(const rocksdb::Slice& key,
-                      const rocksdb::Slice& value) override {
-      string prefix ((key.ToString()).substr(0,1));
-      string key_to_decode ((key.ToString()).substr(2,string::npos));
-      uint64_t size = (value.ToString()).size();
-      seen += "\nMerge( Prefix = " + prefix + " key = " 
-            + pretty_binary_string(key_to_decode) + " Value size = " 
-            + std::to_string(size) + ")";
-
-      num_seen++;
-    }
-    bool Continue() override { return num_seen < 50; }
-
-  };
-
+  int64_t estimate_prefix_size(const std::string& prefix,
+                              const std::string& key_prefix) override;
+  struct RocksWBHandler;
   class RocksDBTransactionImpl : public KeyValueDB::TransactionImpl {
   public:
     rocksdb::WriteBatch bat;
@@ -308,39 +274,39 @@ public:
     void put_bat(
       rocksdb::WriteBatch& bat,
       rocksdb::ColumnFamilyHandle *cf,
-      const string &k,
-      const bufferlist &to_set_bl);
+      const std::string &k,
+      const ceph::bufferlist &to_set_bl);
   public:
     void set(
-      const string &prefix,
-      const string &k,
-      const bufferlist &bl) override;
+      const std::string &prefix,
+      const std::string &k,
+      const ceph::bufferlist &bl) override;
     void set(
-      const string &prefix,
+      const std::string &prefix,
       const char *k,
       size_t keylen,
-      const bufferlist &bl) override;
+      const ceph::bufferlist &bl) override;
     void rmkey(
-      const string &prefix,
-      const string &k) override;
+      const std::string &prefix,
+      const std::string &k) override;
     void rmkey(
-      const string &prefix,
+      const std::string &prefix,
       const char *k,
       size_t keylen) override;
     void rm_single_key(
-      const string &prefix,
-      const string &k) override;
+      const std::string &prefix,
+      const std::string &k) override;
     void rmkeys_by_prefix(
-      const string &prefix
+      const std::string &prefix
       ) override;
     void rm_range_keys(
-      const string &prefix,
-      const string &start,
-      const string &end) override;
+      const std::string &prefix,
+      const std::string &start,
+      const std::string &end) override;
     void merge(
-      const string& prefix,
-      const string& k,
-      const bufferlist &bl) override;
+      const std::string& prefix,
+      const std::string& k,
+      const ceph::bufferlist &bl) override;
   };
 
   KeyValueDB::Transaction get_transaction() override {
@@ -350,20 +316,20 @@ public:
   int submit_transaction(KeyValueDB::Transaction t) override;
   int submit_transaction_sync(KeyValueDB::Transaction t) override;
   int get(
-    const string &prefix,
-    const std::set<string> &key,
-    std::map<string, bufferlist> *out
+    const std::string &prefix,
+    const std::set<std::string> &key,
+    std::map<std::string, ceph::bufferlist> *out
     ) override;
   int get(
-    const string &prefix,
-    const string &key,
-    bufferlist *out
+    const std::string &prefix,
+    const std::string &key,
+    ceph::bufferlist *out
     ) override;
   int get(
-    const string &prefix,
+    const std::string &prefix,
     const char *key,
     size_t keylen,
-    bufferlist *out) override;
+    ceph::bufferlist *out) override;
 
 
   class RocksDBWholeSpaceIteratorImpl :
@@ -377,45 +343,48 @@ public:
     ~RocksDBWholeSpaceIteratorImpl() override;
 
     int seek_to_first() override;
-    int seek_to_first(const string &prefix) override;
+    int seek_to_first(const std::string &prefix) override;
     int seek_to_last() override;
-    int seek_to_last(const string &prefix) override;
-    int upper_bound(const string &prefix, const string &after) override;
-    int lower_bound(const string &prefix, const string &to) override;
+    int seek_to_last(const std::string &prefix) override;
+    int upper_bound(const std::string &prefix, const std::string &after) override;
+    int lower_bound(const std::string &prefix, const std::string &to) override;
     bool valid() override;
     int next() override;
     int prev() override;
-    string key() override;
-    pair<string,string> raw_key() override;
-    bool raw_key_is_prefixed(const string &prefix) override;
-    bufferlist value() override;
-    bufferptr value_as_ptr() override;
+    std::string key() override;
+    std::pair<std::string,std::string> raw_key() override;
+    bool raw_key_is_prefixed(const std::string &prefix) override;
+    ceph::bufferlist value() override;
+    ceph::bufferptr value_as_ptr() override;
     int status() override;
     size_t key_size() override;
     size_t value_size() override;
   };
 
-  Iterator get_iterator(const std::string& prefix) override;
-
+  Iterator get_iterator(const std::string& prefix, IteratorOpts opts = 0) override;
+private:
+  /// this iterator spans single cf
+  rocksdb::Iterator* new_shard_iterator(rocksdb::ColumnFamilyHandle* cf);
+public:
   /// Utility
-  static string combine_strings(const string &prefix, const string &value) {
-    string out = prefix;
+  static std::string combine_strings(const std::string &prefix, const std::string &value) {
+    std::string out = prefix;
     out.push_back(0);
     out.append(value);
     return out;
   }
-  static void combine_strings(const string &prefix,
+  static void combine_strings(const std::string &prefix,
                              const char *key, size_t keylen,
-                             string *out) {
+                             std::string *out) {
     out->reserve(prefix.size() + 1 + keylen);
     *out = prefix;
     out->push_back(0);
     out->append(key, keylen);
   }
 
-  static int split_key(rocksdb::Slice in, string *prefix, string *key);
+  static int split_key(rocksdb::Slice in, std::string *prefix, std::string *key);
 
-  static string past_prefix(const string &prefix);
+  static std::string past_prefix(const std::string &prefix);
 
   class MergeOperatorRouter;
   class MergeOperatorLinker;
@@ -423,9 +392,9 @@ public:
   int set_merge_operator(
     const std::string& prefix,
     std::shared_ptr<KeyValueDB::MergeOperator> mop) override;
-  string assoc_name; ///< Name of associative operator
+  std::string assoc_name; ///< Name of associative operator
 
-  uint64_t get_estimated_size(map<string,uint64_t> &extra) override {
+  uint64_t get_estimated_size(std::map<std::string,uint64_t> &extra) override {
     DIR *store_dir = opendir(path.c_str());
     if (!store_dir) {
       lderr(cct) << __func__ << " something happened opening the store: "
@@ -440,12 +409,12 @@ public:
 
     struct dirent *entry = NULL;
     while ((entry = readdir(store_dir)) != NULL) {
-      string n(entry->d_name);
+      std::string n(entry->d_name);
 
       if (n == "." || n == "..")
         continue;
 
-      string fpath = path + '/' + n;
+      std::string fpath = path + '/' + n;
       struct stat s;
       int err = stat(fpath.c_str(), &s);
       if (err < 0)
@@ -464,12 +433,12 @@ public:
       }
 
       size_t pos = n.find_last_of('.');
-      if (pos == string::npos) {
+      if (pos == std::string::npos) {
         misc_size += s.st_size;
         continue;
       }
 
-      string ext = n.substr(pos+1);
+      std::string ext = n.substr(pos+1);
       if (ext == "sst") {
         sst_size += s.st_size;
       } else if (ext == "log") {
@@ -495,21 +464,60 @@ err:
     return static_cast<int64_t>(bbt_opts.block_cache->GetUsage());
   }
 
+  virtual int64_t get_cache_usage(string prefix) const override {
+    auto it = cf_bbt_opts.find(prefix);
+    if (it != cf_bbt_opts.end() && it->second.block_cache) {
+      return static_cast<int64_t>(it->second.block_cache->GetUsage());
+    }
+    return -EINVAL;
+  }
+
   int set_cache_size(uint64_t s) override {
     cache_size = s;
     set_cache_flag = true;
     return 0;
   }
 
-  virtual std::shared_ptr<PriorityCache::PriCache> get_priority_cache() 
-      const override {
+  virtual std::shared_ptr<PriorityCache::PriCache>
+      get_priority_cache() const override {
     return dynamic_pointer_cast<PriorityCache::PriCache>(
         bbt_opts.block_cache);
   }
 
-  WholeSpaceIterator get_wholespace_iterator() override;
-};
+  virtual std::shared_ptr<PriorityCache::PriCache>
+      get_priority_cache(string prefix) const override {
+    auto it = cf_bbt_opts.find(prefix);
+    if (it != cf_bbt_opts.end()) {
+      return dynamic_pointer_cast<PriorityCache::PriCache>(
+          it->second.block_cache);
+    }
+    return nullptr;
+  }
 
+  WholeSpaceIterator get_wholespace_iterator(IteratorOpts opts = 0) override;
+private:
+  WholeSpaceIterator get_default_cf_iterator();
+
+  using cf_deleter_t = std::function<void(rocksdb::ColumnFamilyHandle*)>;
+  using columns_t = std::map<std::string,
+                            std::unique_ptr<rocksdb::ColumnFamilyHandle,
+                                            cf_deleter_t>>;
+  int prepare_for_reshard(const std::string& new_sharding,
+                         columns_t& to_process_columns);
+  int reshard_cleanup(const columns_t& current_columns);
+public:
+  struct resharding_ctrl {
+    size_t bytes_per_iterator = 10000000; /// amount of data to process before refreshing iterator
+    size_t keys_per_iterator =  10000;
+    size_t bytes_per_batch =    1000000;  /// amount of data before submitting batch
+    size_t keys_per_batch =     1000;
+    bool   unittest_fail_after_first_batch = false;
+    bool   unittest_fail_after_processing_column = false;
+    bool   unittest_fail_after_successful_processing = false;
+  };
+  int reshard(const std::string& new_sharding, const resharding_ctrl* ctrl = nullptr);
+  bool get_sharding(std::string& sharding);
 
+};
 
 #endif