]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/kv/RocksDBStore.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / kv / RocksDBStore.cc
index 08bb6408735b8a21cad456d6d4530bb4a450b85b..33b8fd60ec5b5736c461812ad71e69ca233d20f8 100644 (file)
@@ -18,7 +18,6 @@
 #include "rocksdb/filter_policy.h"
 #include "rocksdb/utilities/convenience.h"
 #include "rocksdb/merge_operator.h"
-#include "kv/rocksdb_cache/BinnedLRUCache.h"
 
 using std::string;
 #include "common/perf_counters.h"
@@ -36,6 +35,12 @@ using std::string;
 #undef dout_prefix
 #define dout_prefix *_dout << "rocksdb: "
 
+static bufferlist to_bufferlist(rocksdb::Slice in) {
+  bufferlist bl;
+  bl.append(bufferptr(in.data(), in.size()));
+  return bl;
+}
+
 static rocksdb::SliceParts prepare_sliceparts(const bufferlist &bl,
                                              vector<rocksdb::Slice> *slices)
 {
@@ -48,12 +53,16 @@ static rocksdb::SliceParts prepare_sliceparts(const bufferlist &bl,
   return rocksdb::SliceParts(slices->data(), slices->size());
 }
 
+
 //
-// One of these per rocksdb instance, implements the merge operator prefix stuff
+// One of these for the default rocksdb column family, routing each prefix
+// to the appropriate MergeOperator.
 //
-class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperator {
+class RocksDBStore::MergeOperatorRouter
+  : public rocksdb::AssociativeMergeOperator
+{
   RocksDBStore& store;
-  public:
+public:
   const char *Name() const override {
     // Construct a name that rocksDB will validate against. We want to
     // do this in a way that doesn't constrain the ordering of calls
@@ -61,7 +70,13 @@ class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperat
     // construct a name from all of those parts.
     store.assoc_name.clear();
     map<std::string,std::string> names;
-    for (auto& p : store.merge_ops) names[p.first] = p.second->name();
+
+    for (auto& p : store.merge_ops) {
+      names[p.first] = p.second->name();
+    }
+    for (auto& p : store.cf_handles) {
+      names.erase(p.first);
+    }
     for (auto& p : names) {
       store.assoc_name += '.';
       store.assoc_name += p.first;
@@ -71,31 +86,65 @@ class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperat
     return store.assoc_name.c_str();
   }
 
-  MergeOperatorRouter(RocksDBStore &_store) : store(_store) {}
+  explicit MergeOperatorRouter(RocksDBStore &_store) : store(_store) {}
 
   bool Merge(const rocksdb::Slice& key,
-                     const rocksdb::Slice* existing_value,
-                     const rocksdb::Slice& value,
-                     std::string* new_value,
-                     rocksdb::Logger* logger) const override {
-    // Check each prefix
+            const rocksdb::Slice* existing_value,
+            const rocksdb::Slice& value,
+            std::string* new_value,
+            rocksdb::Logger* logger) const override {
+    // for default column family
+    // extract prefix from key and compare against each registered merge op;
+    // even though merge operator for explicit CF is included in merge_ops,
+    // it won't be picked up, since it won't match.
     for (auto& p : store.merge_ops) {
       if (p.first.compare(0, p.first.length(),
                          key.data(), p.first.length()) == 0 &&
          key.data()[p.first.length()] == 0) {
-        if (existing_value) {
-          p.second->merge(existing_value->data(), existing_value->size(),
+       if (existing_value) {
+         p.second->merge(existing_value->data(), existing_value->size(),
                          value.data(), value.size(),
                          new_value);
-        } else {
-          p.second->merge_nonexistent(value.data(), value.size(), new_value);
-        }
-        break;
+       } else {
+         p.second->merge_nonexistent(value.data(), value.size(), new_value);
+       }
+       break;
       }
     }
     return true; // OK :)
   }
+};
+
+//
+// One of these per non-default column family, linked directly to the
+// merge operator for that CF/prefix (if any).
+//
+class RocksDBStore::MergeOperatorLinker
+  : public rocksdb::AssociativeMergeOperator
+{
+private:
+  std::shared_ptr<KeyValueDB::MergeOperator> mop;
+public:
+  explicit MergeOperatorLinker(const std::shared_ptr<KeyValueDB::MergeOperator> &o) : mop(o) {}
 
+  const char *Name() const override {
+    return mop->name();
+  }
+
+  bool Merge(const rocksdb::Slice& key,
+            const rocksdb::Slice* existing_value,
+            const rocksdb::Slice& value,
+            std::string* new_value,
+            rocksdb::Logger* logger) const override {
+    if (existing_value) {
+      mop->merge(existing_value->data(), existing_value->size(),
+                value.data(), value.size(),
+                new_value);
+    } else {
+      mop->merge_nonexistent(value.data(), value.size(), new_value);
+    }
+    return true;
+  }
 };
 
 int RocksDBStore::set_merge_operator(
@@ -103,7 +152,7 @@ int RocksDBStore::set_merge_operator(
   std::shared_ptr<KeyValueDB::MergeOperator> mop)
 {
   // If you fail here, it's because you can't do this on an open database
-  assert(db == nullptr);
+  ceph_assert(db == nullptr);
   merge_ops.push_back(std::make_pair(prefix,mop));
   return 0;
 }
@@ -130,7 +179,7 @@ public:
   void Logv(const rocksdb::InfoLogLevel log_level, const char* format,
            va_list ap) override {
     int v = rocksdb::NUM_INFO_LOG_LEVELS - log_level - 1;
-    dout(v);
+    dout(ceph::dout::need_dynamic(v));
     char buf[65536];
     vsnprintf(buf, sizeof(buf), format, ap);
     *_dout << buf << dendl;
@@ -229,7 +278,7 @@ int RocksDBStore::init(string _options_str)
   return 0;
 }
 
-int RocksDBStore::create_and_open(ostream &out)
+int RocksDBStore::create_db_dir()
 {
   if (env) {
     unique_ptr<rocksdb::Directory> dir;
@@ -244,12 +293,38 @@ int RocksDBStore::create_and_open(ostream &out)
       return r;
     }
   }
-  return do_open(out, true);
+  return 0;
+}
+
+int RocksDBStore::install_cf_mergeop(
+  const string &cf_name,
+  rocksdb::ColumnFamilyOptions *cf_opt)
+{
+  ceph_assert(cf_opt != nullptr);
+  cf_opt->merge_operator.reset();
+  for (auto& i : merge_ops) {
+    if (i.first == cf_name) {
+      cf_opt->merge_operator.reset(new MergeOperatorLinker(i.second));
+    }
+  }
+  return 0;
 }
 
-int RocksDBStore::do_open(ostream &out, bool create_if_missing)
+int RocksDBStore::create_and_open(ostream &out,
+                                 const vector<ColumnFamily>& cfs)
+{
+  int r = create_db_dir();
+  if (r < 0)
+    return r;
+  if (cfs.empty()) {
+    return do_open(out, true, false, nullptr);
+  } else {
+    return do_open(out, true, false, &cfs);
+  }
+}
+
+int RocksDBStore::load_rocksdb_options(bool create_if_missing, rocksdb::Options& opt)
 {
-  rocksdb::Options opt;
   rocksdb::Status status;
 
   if (options_str.length()) {
@@ -259,39 +334,46 @@ int RocksDBStore::do_open(ostream &out, bool create_if_missing)
     }
   }
 
-  if (g_conf->rocksdb_perf)  {
+  if (g_conf()->rocksdb_perf)  {
     dbstats = rocksdb::CreateDBStatistics();
     opt.statistics = dbstats;
   }
 
   opt.create_if_missing = create_if_missing;
-  if (g_conf->rocksdb_separate_wal_dir) {
+  if (kv_options.count("separate_wal_dir")) {
     opt.wal_dir = path + ".wal";
   }
-  if (g_conf->get_val<std::string>("rocksdb_db_paths").length()) {
-    list<string> paths;
-    get_str_list(g_conf->get_val<std::string>("rocksdb_db_paths"), "; \t", paths);
-    for (auto& p : paths) {
-      size_t pos = p.find(',');
-      if (pos == std::string::npos) {
-       derr << __func__ << " invalid db path item " << p << " in "
-            << g_conf->get_val<std::string>("rocksdb_db_paths") << dendl;
-       return -EINVAL;
-      }
-      string path = p.substr(0, pos);
-      string size_str = p.substr(pos + 1);
-      uint64_t size = atoll(size_str.c_str());
-      if (!size) {
-       derr << __func__ << " invalid db path item " << p << " in "
-            << g_conf->get_val<std::string>("rocksdb_db_paths") << dendl;
-       return -EINVAL;
+
+  // Since ceph::for_each_substr doesn't return a value and
+  // std::stoull does throw, we may as well just catch everything here.
+  try {
+    if (kv_options.count("db_paths")) {
+      list<string> paths;
+      get_str_list(kv_options["db_paths"], "; \t", paths);
+      for (auto& p : paths) {
+       size_t pos = p.find(',');
+       if (pos == std::string::npos) {
+         derr << __func__ << " invalid db path item " << p << " in "
+              << kv_options["db_paths"] << dendl;
+         return -EINVAL;
+       }
+       string path = p.substr(0, pos);
+       string size_str = p.substr(pos + 1);
+       uint64_t size = atoll(size_str.c_str());
+       if (!size) {
+         derr << __func__ << " invalid db path item " << p << " in "
+              << kv_options["db_paths"] << dendl;
+         return -EINVAL;
+       }
+       opt.db_paths.push_back(rocksdb::DbPath(path, size));
+       dout(10) << __func__ << " db_path " << path << " size " << size << dendl;
       }
-      opt.db_paths.push_back(rocksdb::DbPath(path, size));
-      dout(10) << __func__ << " db_path " << path << " size " << size << dendl;
     }
+  } catch (const std::system_error& e) {
+    return -e.code().value();
   }
 
-  if (g_conf->rocksdb_log_to_ceph_log) {
+  if (g_conf()->rocksdb_log_to_ceph_log) {
     opt.info_log.reset(new CephRocksdbLogger(g_ceph_context));
   }
 
@@ -302,76 +384,206 @@ int RocksDBStore::do_open(ostream &out, bool create_if_missing)
 
   // caches
   if (!set_cache_flag) {
-    cache_size = g_conf->rocksdb_cache_size;
+    cache_size = g_conf()->rocksdb_cache_size;
   }
-  uint64_t row_cache_size = cache_size * g_conf->rocksdb_cache_row_ratio;
+  uint64_t row_cache_size = cache_size * g_conf()->rocksdb_cache_row_ratio;
   uint64_t block_cache_size = cache_size - row_cache_size;
 
-  if (g_conf->rocksdb_cache_type == "binned_lru") {
+  if (g_conf()->rocksdb_cache_type == "binned_lru") {
     bbt_opts.block_cache = rocksdb_cache::NewBinnedLRUCache(
+      cct,
       block_cache_size,
-      g_conf->rocksdb_cache_shard_bits);
-  } else if (g_conf->rocksdb_cache_type == "lru") {
+      g_conf()->rocksdb_cache_shard_bits);
+  } else if (g_conf()->rocksdb_cache_type == "lru") {
     bbt_opts.block_cache = rocksdb::NewLRUCache(
       block_cache_size,
-      g_conf->rocksdb_cache_shard_bits);
-  } else if (g_conf->rocksdb_cache_type == "clock") {
+      g_conf()->rocksdb_cache_shard_bits);
+  } else if (g_conf()->rocksdb_cache_type == "clock") {
     bbt_opts.block_cache = rocksdb::NewClockCache(
       block_cache_size,
-      g_conf->rocksdb_cache_shard_bits);
+      g_conf()->rocksdb_cache_shard_bits);
     if (!bbt_opts.block_cache) {
-      derr << "rocksdb_cache_type '" << g_conf->rocksdb_cache_type
+      derr << "rocksdb_cache_type '" << g_conf()->rocksdb_cache_type
            << "' chosen, but RocksDB not compiled with LibTBB. "
            << dendl;
       return -EINVAL;
     }
   } else {
-    derr << "unrecognized rocksdb_cache_type '" << g_conf->rocksdb_cache_type
+    derr << "unrecognized rocksdb_cache_type '" << g_conf()->rocksdb_cache_type
       << "'" << dendl;
     return -EINVAL;
   }
-  bbt_opts.block_size = g_conf->rocksdb_block_size;
+  bbt_opts.block_size = g_conf()->rocksdb_block_size;
 
   if (row_cache_size > 0)
     opt.row_cache = rocksdb::NewLRUCache(row_cache_size,
-                                    g_conf->rocksdb_cache_shard_bits);
-  uint64_t bloom_bits = g_conf->get_val<uint64_t>("rocksdb_bloom_bits_per_key");
+                                    g_conf()->rocksdb_cache_shard_bits);
+  uint64_t bloom_bits = g_conf().get_val<uint64_t>("rocksdb_bloom_bits_per_key");
   if (bloom_bits > 0) {
     dout(10) << __func__ << " set bloom filter bits per key to "
             << bloom_bits << dendl;
     bbt_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(bloom_bits));
   }
-  if (g_conf->get_val<std::string>("rocksdb_index_type") == "binary_search")
+  using std::placeholders::_1;
+  if (g_conf().with_val<std::string>("rocksdb_index_type",
+                                   std::bind(std::equal_to<std::string>(), _1,
+                                             "binary_search")))
     bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kBinarySearch;
-  if (g_conf->get_val<std::string>("rocksdb_index_type") == "hash_search")
+  if (g_conf().with_val<std::string>("rocksdb_index_type",
+                                   std::bind(std::equal_to<std::string>(), _1,
+                                             "hash_search")))
     bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kHashSearch;
-  if (g_conf->get_val<std::string>("rocksdb_index_type") == "two_level")
+  if (g_conf().with_val<std::string>("rocksdb_index_type",
+                                   std::bind(std::equal_to<std::string>(), _1,
+                                             "two_level")))
     bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
-  bbt_opts.cache_index_and_filter_blocks = 
-      g_conf->get_val<bool>("rocksdb_cache_index_and_filter_blocks");
-  bbt_opts.cache_index_and_filter_blocks_with_high_priority = 
-      g_conf->get_val<bool>("rocksdb_cache_index_and_filter_blocks_with_high_priority");
-  bbt_opts.partition_filters = g_conf->get_val<bool>("rocksdb_partition_filters");
-  if (g_conf->get_val<uint64_t>("rocksdb_metadata_block_size") > 0)
-    bbt_opts.metadata_block_size = g_conf->get_val<uint64_t>("rocksdb_metadata_block_size");
-  bbt_opts.pin_l0_filter_and_index_blocks_in_cache = 
-      g_conf->get_val<bool>("rocksdb_pin_l0_filter_and_index_blocks_in_cache");
+  if (!bbt_opts.no_block_cache) {
+    bbt_opts.cache_index_and_filter_blocks =
+        g_conf().get_val<bool>("rocksdb_cache_index_and_filter_blocks");
+    bbt_opts.cache_index_and_filter_blocks_with_high_priority =
+        g_conf().get_val<bool>("rocksdb_cache_index_and_filter_blocks_with_high_priority");
+    bbt_opts.pin_l0_filter_and_index_blocks_in_cache =
+      g_conf().get_val<bool>("rocksdb_pin_l0_filter_and_index_blocks_in_cache");
+  }
+  bbt_opts.partition_filters = g_conf().get_val<bool>("rocksdb_partition_filters");
+  if (g_conf().get_val<Option::size_t>("rocksdb_metadata_block_size") > 0)
+    bbt_opts.metadata_block_size = g_conf().get_val<Option::size_t>("rocksdb_metadata_block_size");
 
   opt.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbt_opts));
-  dout(10) << __func__ << " block size " << g_conf->rocksdb_block_size
+  dout(10) << __func__ << " block size " << g_conf()->rocksdb_block_size
            << ", block_cache size " << byte_u_t(block_cache_size)
           << ", row_cache size " << byte_u_t(row_cache_size)
           << "; shards "
-          << (1 << g_conf->rocksdb_cache_shard_bits)
-          << ", type " << g_conf->rocksdb_cache_type
+          << (1 << g_conf()->rocksdb_cache_shard_bits)
+          << ", type " << g_conf()->rocksdb_cache_type
           << dendl;
 
   opt.merge_operator.reset(new MergeOperatorRouter(*this));
-  status = rocksdb::DB::Open(opt, path, &db);
-  if (!status.ok()) {
-    derr << status.ToString() << dendl;
-    return -EINVAL;
+
+  return 0;
+}
+
+int RocksDBStore::do_open(ostream &out,
+                         bool create_if_missing,
+                         bool open_readonly,
+                         const vector<ColumnFamily>* cfs)
+{
+  ceph_assert(!(create_if_missing && open_readonly));
+  rocksdb::Options opt;
+  int r = load_rocksdb_options(create_if_missing, opt);
+  if (r) {
+    dout(1) << __func__ << " load rocksdb options failed" << dendl;
+    return r;
+  }
+  rocksdb::Status status;
+  if (create_if_missing) {
+    status = rocksdb::DB::Open(opt, path, &db);
+    if (!status.ok()) {
+      derr << status.ToString() << dendl;
+      return -EINVAL;
+    }
+    // create and open column families
+    if (cfs) {
+      for (auto& p : *cfs) {
+       // copy default CF settings, block cache, merge operators as
+       // the base for new CF
+       rocksdb::ColumnFamilyOptions cf_opt(opt);
+       // user input options will override the base options
+       status = rocksdb::GetColumnFamilyOptionsFromString(
+         cf_opt, p.option, &cf_opt);
+       if (!status.ok()) {
+         derr << __func__ << " invalid db column family option string for CF: "
+              << p.name << dendl;
+         return -EINVAL;
+       }
+       install_cf_mergeop(p.name, &cf_opt);
+       rocksdb::ColumnFamilyHandle *cf;
+       status = db->CreateColumnFamily(cf_opt, p.name, &cf);
+       if (!status.ok()) {
+         derr << __func__ << " Failed to create rocksdb column family: "
+              << p.name << dendl;
+         return -EINVAL;
+       }
+       // store the new CF handle
+       add_column_family(p.name, static_cast<void*>(cf));
+      }
+    }
+    default_cf = db->DefaultColumnFamily();
+  } else {
+    std::vector<string> existing_cfs;
+    status = rocksdb::DB::ListColumnFamilies(
+      rocksdb::DBOptions(opt),
+      path,
+      &existing_cfs);
+    dout(1) << __func__ << " column families: " << existing_cfs << dendl;
+    if (existing_cfs.empty()) {
+      // no column families
+      if (open_readonly) {
+       status = rocksdb::DB::Open(opt, path, &db);
+      } else {
+       status = rocksdb::DB::OpenForReadOnly(opt, path, &db);
+      }
+      if (!status.ok()) {
+       derr << status.ToString() << dendl;
+       return -EINVAL;
+      }
+      default_cf = db->DefaultColumnFamily();
+    } else {
+      // we cannot change column families for a created database.  so, map
+      // what options we are given to whatever cf's already exist.
+      std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
+      for (auto& n : existing_cfs) {
+       // copy default CF settings, block cache, merge operators as
+       // the base for new CF
+       rocksdb::ColumnFamilyOptions cf_opt(opt);
+       bool found = false;
+       if (cfs) {
+         for (auto& i : *cfs) {
+           if (i.name == n) {
+             found = true;
+             status = rocksdb::GetColumnFamilyOptionsFromString(
+               cf_opt, i.option, &cf_opt);
+             if (!status.ok()) {
+               derr << __func__ << " invalid db column family options for CF '"
+                    << i.name << "': " << i.option << dendl;
+               return -EINVAL;
+             }
+           }
+         }
+       }
+       if (n != rocksdb::kDefaultColumnFamilyName) {
+         install_cf_mergeop(n, &cf_opt);
+       }
+       column_families.push_back(rocksdb::ColumnFamilyDescriptor(n, cf_opt));
+       if (!found && n != rocksdb::kDefaultColumnFamilyName) {
+         dout(1) << __func__ << " column family '" << n
+                 << "' exists but not expected" << dendl;
+       }
+      }
+      std::vector<rocksdb::ColumnFamilyHandle*> handles;
+      if (open_readonly) {
+        status = rocksdb::DB::OpenForReadOnly(rocksdb::DBOptions(opt),
+                                             path, column_families,
+                                             &handles, &db);
+      } else {
+        status = rocksdb::DB::Open(rocksdb::DBOptions(opt),
+                                  path, column_families, &handles, &db);
+      }
+      if (!status.ok()) {
+       derr << status.ToString() << dendl;
+       return -EINVAL;
+      }
+      for (unsigned i = 0; i < existing_cfs.size(); ++i) {
+       if (existing_cfs[i] == rocksdb::kDefaultColumnFamilyName) {
+         default_cf = handles[i];
+         must_close_default_cf = true;
+       } else {
+         add_column_family(existing_cfs[i], static_cast<void*>(handles[i]));
+       }
+      }
+    }
   }
+  ceph_assert(default_cf != nullptr);
   
   PerfCountersBuilder plb(g_ceph_context, "rocksdb", l_rocksdb_first, l_rocksdb_last);
   plb.add_u64_counter(l_rocksdb_gets, "get", "Gets");
@@ -417,6 +629,16 @@ RocksDBStore::~RocksDBStore()
   delete logger;
 
   // Ensure db is destroyed before dependent db_cache and filterpolicy
+  for (auto& p : cf_handles) {
+    db->DestroyColumnFamilyHandle(
+      static_cast<rocksdb::ColumnFamilyHandle*>(p.second));
+    p.second = nullptr;
+  }
+  if (must_close_default_cf) {
+    db->DestroyColumnFamilyHandle(default_cf);
+    must_close_default_cf = false;
+  }
+  default_cf = nullptr;
   delete db;
   db = nullptr;
 
@@ -442,6 +664,24 @@ void RocksDBStore::close()
     cct->get_perfcounters_collection()->remove(logger);
 }
 
+int RocksDBStore::repair(std::ostream &out)
+{
+  rocksdb::Options opt;
+  int r = load_rocksdb_options(false, opt);
+  if (r) {
+    dout(1) << __func__ << " load rocksdb options failed" << dendl;
+    out << "load rocksdb options failed" << std::endl;
+    return r;
+  }
+  rocksdb::Status status = rocksdb::RepairDB(path, opt);
+  if (status.ok()) {
+    return 0;
+  } else {
+    out << "repair rocksdb failed : " << status.ToString() << std::endl;
+    return 1;
+  }
+}
+
 void RocksDBStore::split_stats(const std::string &s, char delim, std::vector<std::string> &elems) {
     std::stringstream ss;
     ss.str(s);
@@ -451,15 +691,36 @@ void RocksDBStore::split_stats(const std::string &s, char delim, std::vector<std
     }
 }
 
+int64_t RocksDBStore::estimate_prefix_size(const string& prefix)
+{
+  auto cf = get_cf_handle(prefix);
+  uint64_t size = 0;
+  uint8_t flags =
+    //rocksdb::DB::INCLUDE_MEMTABLES |  // do not include memtables...
+    rocksdb::DB::INCLUDE_FILES;
+  if (cf) {
+    string start(1, '\x00');
+    string limit("\xff\xff\xff\xff");
+    rocksdb::Range r(start, limit);
+    db->GetApproximateSizes(cf, &r, 1, &size, flags);
+  } else {
+    string limit = prefix + "\xff\xff\xff\xff";
+    rocksdb::Range r(prefix, limit);
+    db->GetApproximateSizes(default_cf,
+                           &r, 1, &size, flags);
+  }
+  return size;
+}
+
 void RocksDBStore::get_statistics(Formatter *f)
 {
-  if (!g_conf->rocksdb_perf)  {
-    dout(20) << __func__ << "RocksDB perf is disabled, can't probe for stats"
+  if (!g_conf()->rocksdb_perf)  {
+    dout(20) << __func__ << " RocksDB perf is disabled, can't probe for stats"
             << dendl;
     return;
   }
 
-  if (g_conf->rocksdb_collect_compaction_stats) {
+  if (g_conf()->rocksdb_collect_compaction_stats) {
     std::string stat_str;
     bool status = db->GetProperty("rocksdb.stats", &stat_str);
     if (status) {
@@ -473,7 +734,7 @@ void RocksDBStore::get_statistics(Formatter *f)
       f->close_section();
     }
   }
-  if (g_conf->rocksdb_collect_extended_stats) {
+  if (g_conf()->rocksdb_collect_extended_stats) {
     if (dbstats) {
       f->open_object_section("rocksdb_extended_statistics");
       string stat_str = dbstats->ToString();
@@ -489,33 +750,37 @@ void RocksDBStore::get_statistics(Formatter *f)
     logger->dump_formatted(f,0);
     f->close_section();
   }
-  if (g_conf->rocksdb_collect_memory_stats) {
+  if (g_conf()->rocksdb_collect_memory_stats) {
     f->open_object_section("rocksdb_memtable_statistics");
-    std::string str(stringify(bbt_opts.block_cache->GetUsage()));
-    f->dump_string("block_cache_usage", str.data());
-    str.clear();
-    str.append(stringify(bbt_opts.block_cache->GetPinnedUsage()));
-    f->dump_string("block_cache_pinned_blocks_usage", str);
-    str.clear();
+    std::string str;
+    if (!bbt_opts.no_block_cache) {
+      str.append(stringify(bbt_opts.block_cache->GetUsage()));
+      f->dump_string("block_cache_usage", str.data());
+      str.clear();
+      str.append(stringify(bbt_opts.block_cache->GetPinnedUsage()));
+      f->dump_string("block_cache_pinned_blocks_usage", str);
+      str.clear();
+    }
     db->GetProperty("rocksdb.cur-size-all-mem-tables", &str);
     f->dump_string("rocksdb_memtable_usage", str);
+    str.clear();
+    db->GetProperty("rocksdb.estimate-table-readers-mem", &str);
+    f->dump_string("rocksdb_index_filter_blocks_usage", str);
     f->close_section();
   }
 }
 
-int RocksDBStore::submit_transaction(KeyValueDB::Transaction t)
+int RocksDBStore::submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t) 
 {
-  utime_t start = ceph_clock_now();
   // enable rocksdb breakdown
   // considering performance overhead, default is disabled
-  if (g_conf->rocksdb_perf) {
+  if (g_conf()->rocksdb_perf) {
     rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
-    rocksdb::perf_context.Reset();
+    rocksdb::get_perf_context()->Reset();
   }
 
   RocksDBTransactionImpl * _t =
     static_cast<RocksDBTransactionImpl *>(t.get());
-  rocksdb::WriteOptions woptions;
   woptions.disableWAL = disableWAL;
   lgeneric_subdout(cct, rocksdb, 30) << __func__;
   RocksWBHandler bat_txc;
@@ -529,85 +794,58 @@ int RocksDBStore::submit_transaction(KeyValueDB::Transaction t)
     derr << __func__ << " error: " << s.ToString() << " code = " << s.code()
          << " Rocksdb transaction: " << rocks_txc.seen << dendl;
   }
-  utime_t lat = ceph_clock_now() - start;
 
-  if (g_conf->rocksdb_perf) {
+  if (g_conf()->rocksdb_perf) {
     utime_t write_memtable_time;
     utime_t write_delay_time;
     utime_t write_wal_time;
     utime_t write_pre_and_post_process_time;
     write_wal_time.set_from_double(
-       static_cast<double>(rocksdb::perf_context.write_wal_time)/1000000000);
+       static_cast<double>(rocksdb::get_perf_context()->write_wal_time)/1000000000);
     write_memtable_time.set_from_double(
-       static_cast<double>(rocksdb::perf_context.write_memtable_time)/1000000000);
+       static_cast<double>(rocksdb::get_perf_context()->write_memtable_time)/1000000000);
     write_delay_time.set_from_double(
-       static_cast<double>(rocksdb::perf_context.write_delay_time)/1000000000);
+       static_cast<double>(rocksdb::get_perf_context()->write_delay_time)/1000000000);
     write_pre_and_post_process_time.set_from_double(
-       static_cast<double>(rocksdb::perf_context.write_pre_and_post_process_time)/1000000000);
+       static_cast<double>(rocksdb::get_perf_context()->write_pre_and_post_process_time)/1000000000);
     logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time);
     logger->tinc(l_rocksdb_write_delay_time, write_delay_time);
     logger->tinc(l_rocksdb_write_wal_time, write_wal_time);
     logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time);
   }
 
-  logger->inc(l_rocksdb_txns);
-  logger->tinc(l_rocksdb_submit_latency, lat);
-
   return s.ok() ? 0 : -1;
 }
 
-int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t)
+int RocksDBStore::submit_transaction(KeyValueDB::Transaction t) 
 {
   utime_t start = ceph_clock_now();
-  // enable rocksdb breakdown
-  // considering performance overhead, default is disabled
-  if (g_conf->rocksdb_perf) {
-    rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
-    rocksdb::perf_context.Reset();
-  }
-
-  RocksDBTransactionImpl * _t =
-    static_cast<RocksDBTransactionImpl *>(t.get());
   rocksdb::WriteOptions woptions;
-  woptions.sync = true;
-  woptions.disableWAL = disableWAL;
-  lgeneric_subdout(cct, rocksdb, 30) << __func__;
-  RocksWBHandler bat_txc;
-  _t->bat.Iterate(&bat_txc);
-  *_dout << " Rocksdb transaction: " << bat_txc.seen << dendl;
+  woptions.sync = false;
 
-  rocksdb::Status s = db->Write(woptions, &_t->bat);
-  if (!s.ok()) {
-    RocksWBHandler rocks_txc;
-    _t->bat.Iterate(&rocks_txc);
-    derr << __func__ << " error: " << s.ToString() << " code = " << s.code()
-         << " Rocksdb transaction: " << rocks_txc.seen << dendl;
-  }
-  utime_t lat = ceph_clock_now() - start;
+  int result = submit_common(woptions, t);
 
-  if (g_conf->rocksdb_perf) {
-    utime_t write_memtable_time;
-    utime_t write_delay_time;
-    utime_t write_wal_time;
-    utime_t write_pre_and_post_process_time;
-    write_wal_time.set_from_double(
-       static_cast<double>(rocksdb::perf_context.write_wal_time)/1000000000);
-    write_memtable_time.set_from_double(
-       static_cast<double>(rocksdb::perf_context.write_memtable_time)/1000000000);
-    write_delay_time.set_from_double(
-       static_cast<double>(rocksdb::perf_context.write_delay_time)/1000000000);
-    write_pre_and_post_process_time.set_from_double(
-       static_cast<double>(rocksdb::perf_context.write_pre_and_post_process_time)/1000000000);
-    logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time);
-    logger->tinc(l_rocksdb_write_delay_time, write_delay_time);
-    logger->tinc(l_rocksdb_write_wal_time, write_wal_time);
-    logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time);
-  }
+  utime_t lat = ceph_clock_now() - start;
+  logger->inc(l_rocksdb_txns);
+  logger->tinc(l_rocksdb_submit_latency, lat);
+  
+  return result;
+}
 
+int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t)
+{
+  utime_t start = ceph_clock_now();
+  rocksdb::WriteOptions woptions;
+  // if disableWAL, sync can't set
+  woptions.sync = !disableWAL;
+  
+  int result = submit_common(woptions, t);
+  
+  utime_t lat = ceph_clock_now() - start;
   logger->inc(l_rocksdb_txns_sync);
   logger->tinc(l_rocksdb_submit_sync_latency, lat);
 
-  return s.ok() ? 0 : -1;
+  return result;
 }
 
 RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db)
@@ -615,81 +853,122 @@ RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db)
   db = _db;
 }
 
-void RocksDBStore::RocksDBTransactionImpl::set(
-  const string &prefix,
-  const string &k,
+void RocksDBStore::RocksDBTransactionImpl::put_bat(
+  rocksdb::WriteBatch& bat,
+  rocksdb::ColumnFamilyHandle *cf,
+  const string &key,
   const bufferlist &to_set_bl)
 {
-  string key = combine_strings(prefix, k);
-
   // bufferlist::c_str() is non-constant, so we can't call c_str()
   if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
-    bat.Put(rocksdb::Slice(key),
-            rocksdb::Slice(to_set_bl.buffers().front().c_str(),
-                           to_set_bl.length()));
+    bat.Put(cf,
+           rocksdb::Slice(key),
+           rocksdb::Slice(to_set_bl.buffers().front().c_str(),
+                          to_set_bl.length()));
   } else {
     rocksdb::Slice key_slice(key);
     vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
-    bat.Put(nullptr, rocksdb::SliceParts(&key_slice, 1),
+    bat.Put(cf,
+           rocksdb::SliceParts(&key_slice, 1),
             prepare_sliceparts(to_set_bl, &value_slices));
   }
 }
 
 void RocksDBStore::RocksDBTransactionImpl::set(
   const string &prefix,
-  const char *k, size_t keylen,
+  const string &k,
   const bufferlist &to_set_bl)
 {
-  string key;
-  combine_strings(prefix, k, keylen, &key);
+  auto cf = db->get_cf_handle(prefix);
+  if (cf) {
+    put_bat(bat, cf, k, to_set_bl);
+  } else {
+    string key = combine_strings(prefix, k);
+    put_bat(bat, db->default_cf, key, to_set_bl);
+  }
+}
 
-  // bufferlist::c_str() is non-constant, so we can't call c_str()
-  if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
-    bat.Put(rocksdb::Slice(key),
-            rocksdb::Slice(to_set_bl.buffers().front().c_str(),
-                           to_set_bl.length()));
+void RocksDBStore::RocksDBTransactionImpl::set(
+  const string &prefix,
+  const char *k, size_t keylen,
+  const bufferlist &to_set_bl)
+{
+  auto cf = db->get_cf_handle(prefix);
+  if (cf) {
+    string key(k, keylen);  // fixme?
+    put_bat(bat, cf, key, to_set_bl);
   } else {
-    rocksdb::Slice key_slice(key);
-    vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
-    bat.Put(nullptr, rocksdb::SliceParts(&key_slice, 1),
-            prepare_sliceparts(to_set_bl, &value_slices));
+    string key;
+    combine_strings(prefix, k, keylen, &key);
+    put_bat(bat, cf, key, to_set_bl);
   }
 }
 
 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
                                                 const string &k)
 {
-  bat.Delete(combine_strings(prefix, k));
+  auto cf = db->get_cf_handle(prefix);
+  if (cf) {
+    bat.Delete(cf, rocksdb::Slice(k));
+  } else {
+    bat.Delete(db->default_cf, combine_strings(prefix, k));
+  }
 }
 
 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
                                                 const char *k,
                                                 size_t keylen)
 {
-  string key;
-  combine_strings(prefix, k, keylen, &key);
-  bat.Delete(key);
+  auto cf = db->get_cf_handle(prefix);
+  if (cf) {
+    bat.Delete(cf, rocksdb::Slice(k, keylen));
+  } else {
+    string key;
+    combine_strings(prefix, k, keylen, &key);
+    bat.Delete(db->default_cf, rocksdb::Slice(key));
+  }
 }
 
 void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix,
                                                         const string &k)
 {
-  bat.SingleDelete(combine_strings(prefix, k));
+  auto cf = db->get_cf_handle(prefix);
+  if (cf) {
+    bat.SingleDelete(cf, k);
+  } else {
+    bat.SingleDelete(db->default_cf, combine_strings(prefix, k));
+  }
 }
 
 void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
 {
-  if (db->enable_rmrange) {
-    string endprefix = prefix;
-    endprefix.push_back('\x01');
-    bat.DeleteRange(combine_strings(prefix, string()),
-                   combine_strings(endprefix, string()));
+  auto cf = db->get_cf_handle(prefix);
+  if (cf) {
+    if (db->enable_rmrange) {
+      string endprefix("\xff\xff\xff\xff");  // FIXME: this is cheating...
+      bat.DeleteRange(cf, string(), endprefix);
+    } else {
+      auto it = db->get_iterator(prefix);
+      for (it->seek_to_first();
+          it->valid();
+          it->next()) {
+       bat.Delete(cf, rocksdb::Slice(it->key()));
+      }
+    }
   } else {
-    KeyValueDB::Iterator it = db->get_iterator(prefix);
-    for (it->seek_to_first();
-        it->valid();
-        it->next()) {
-      bat.Delete(combine_strings(prefix, it->key()));
+    if (db->enable_rmrange) {
+      string endprefix = prefix;
+      endprefix.push_back('\x01');
+      bat.DeleteRange(db->default_cf,
+                     combine_strings(prefix, string()),
+                     combine_strings(endprefix, string()));
+    } else {
+      auto it = db->get_iterator(prefix);
+      for (it->seek_to_first();
+          it->valid();
+          it->next()) {
+       bat.Delete(db->default_cf, combine_strings(prefix, it->key()));
+      }
     }
   }
 }
@@ -698,17 +977,38 @@ void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix,
                                                          const string &start,
                                                          const string &end)
 {
-  if (db->enable_rmrange) {
-    bat.DeleteRange(combine_strings(prefix, start), combine_strings(prefix, end));
+  auto cf = db->get_cf_handle(prefix);
+  if (cf) {
+    if (db->enable_rmrange) {
+      bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end));
+    } else {
+      auto it = db->get_iterator(prefix);
+      it->lower_bound(start);
+      while (it->valid()) {
+       if (it->key() >= end) {
+         break;
+       }
+       bat.Delete(cf, rocksdb::Slice(it->key()));
+       it->next();
+      }
+    }
   } else {
-    auto it = db->get_iterator(prefix);
-    it->lower_bound(start);
-    while (it->valid()) {
-      if (it->key() >= end) {
-        break;
+    if (db->enable_rmrange) {
+      bat.DeleteRange(
+       db->default_cf,
+       rocksdb::Slice(combine_strings(prefix, start)),
+       rocksdb::Slice(combine_strings(prefix, end)));
+    } else {
+      auto it = db->get_iterator(prefix);
+      it->lower_bound(start);
+      while (it->valid()) {
+       if (it->key() >= end) {
+         break;
+       }
+       bat.Delete(db->default_cf,
+                  combine_strings(prefix, it->key()));
+       it->next();
       }
-      bat.Delete(combine_strings(prefix, it->key()));
-      it->next();
     }
   }
 }
@@ -718,40 +1018,75 @@ void RocksDBStore::RocksDBTransactionImpl::merge(
   const string &k,
   const bufferlist &to_set_bl)
 {
-  string key = combine_strings(prefix, k);
-
-  // bufferlist::c_str() is non-constant, so we can't call c_str()
-  if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
-    bat.Merge(rocksdb::Slice(key),
-              rocksdb::Slice(to_set_bl.buffers().front().c_str(),
-                           to_set_bl.length()));
+  auto cf = db->get_cf_handle(prefix);
+  if (cf) {
+    // bufferlist::c_str() is non-constant, so we can't call c_str()
+    if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
+      bat.Merge(
+       cf,
+       rocksdb::Slice(k),
+       rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length()));
+    } else {
+      // make a copy
+      rocksdb::Slice key_slice(k);
+      vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
+      bat.Merge(cf, rocksdb::SliceParts(&key_slice, 1),
+               prepare_sliceparts(to_set_bl, &value_slices));
+    }
   } else {
-    // make a copy
-    rocksdb::Slice key_slice(key);
-    vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
-    bat.Merge(nullptr, rocksdb::SliceParts(&key_slice, 1),
-              prepare_sliceparts(to_set_bl, &value_slices));
+    string key = combine_strings(prefix, k);
+    // bufferlist::c_str() is non-constant, so we can't call c_str()
+    if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
+      bat.Merge(
+       db->default_cf,
+       rocksdb::Slice(key),
+       rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length()));
+    } else {
+      // make a copy
+      rocksdb::Slice key_slice(key);
+      vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
+      bat.Merge(
+       db->default_cf,
+       rocksdb::SliceParts(&key_slice, 1),
+       prepare_sliceparts(to_set_bl, &value_slices));
+    }
   }
 }
 
-//gets will bypass RocksDB row cache, since it uses iterator
 int RocksDBStore::get(
     const string &prefix,
     const std::set<string> &keys,
     std::map<string, bufferlist> *out)
 {
   utime_t start = ceph_clock_now();
-  for (std::set<string>::const_iterator i = keys.begin();
-       i != keys.end(); ++i) {
-    std::string value;
-    std::string bound = combine_strings(prefix, *i);
-    auto status = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(bound), &value);
-    if (status.ok()) {
-      (*out)[*i].append(value);
-    } else if (status.IsIOError()) {
-      ceph_abort_msg(cct, status.ToString());
+  auto cf = get_cf_handle(prefix);
+  if (cf) {
+    for (auto& key : keys) {
+      std::string value;
+      auto status = db->Get(rocksdb::ReadOptions(),
+                           cf,
+                           rocksdb::Slice(key),
+                           &value);
+      if (status.ok()) {
+       (*out)[key].append(value);
+      } else if (status.IsIOError()) {
+       ceph_abort_msg(status.getState());
+      }
+    }
+  } else {
+    for (auto& key : keys) {
+      std::string value;
+      string k = combine_strings(prefix, key);
+      auto status = db->Get(rocksdb::ReadOptions(),
+                           default_cf,
+                           rocksdb::Slice(k),
+                           &value);
+      if (status.ok()) {
+       (*out)[key].append(value);
+      } else if (status.IsIOError()) {
+       ceph_abort_msg(status.getState());
+      }
     }
-
   }
   utime_t lat = ceph_clock_now() - start;
   logger->inc(l_rocksdb_gets);
@@ -764,19 +1099,30 @@ int RocksDBStore::get(
     const string &key,
     bufferlist *out)
 {
-  assert(out && (out->length() == 0));
+  ceph_assert(out && (out->length() == 0));
   utime_t start = ceph_clock_now();
   int r = 0;
-  string value, k;
+  string value;
   rocksdb::Status s;
-  k = combine_strings(prefix, key);
-  s = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(k), &value);
+  auto cf = get_cf_handle(prefix);
+  if (cf) {
+    s = db->Get(rocksdb::ReadOptions(),
+               cf,
+               rocksdb::Slice(key),
+               &value);
+  } else {
+    string k = combine_strings(prefix, key);
+    s = db->Get(rocksdb::ReadOptions(),
+               default_cf,
+               rocksdb::Slice(k),
+               &value);
+  }
   if (s.ok()) {
     out->append(value);
   } else if (s.IsNotFound()) {
     r = -ENOENT;
   } else {
-    ceph_abort_msg(cct, s.ToString());
+    ceph_abort_msg(s.getState());
   }
   utime_t lat = ceph_clock_now() - start;
   logger->inc(l_rocksdb_gets);
@@ -790,19 +1136,31 @@ int RocksDBStore::get(
   size_t keylen,
   bufferlist *out)
 {
-  assert(out && (out->length() == 0));
+  ceph_assert(out && (out->length() == 0));
   utime_t start = ceph_clock_now();
   int r = 0;
-  string value, k;
-  combine_strings(prefix, key, keylen, &k);
+  string value;
   rocksdb::Status s;
-  s = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(k), &value);
+  auto cf = get_cf_handle(prefix);
+  if (cf) {
+    s = db->Get(rocksdb::ReadOptions(),
+               cf,
+               rocksdb::Slice(key, keylen),
+               &value);
+  } else {
+    string k;
+    combine_strings(prefix, key, keylen, &k);
+    s = db->Get(rocksdb::ReadOptions(),
+               default_cf,
+               rocksdb::Slice(k),
+               &value);
+  }
   if (s.ok()) {
     out->append(value);
   } else if (s.IsNotFound()) {
     r = -ENOENT;
   } else {
-    ceph_abort_msg(cct, s.ToString());
+    ceph_abort_msg(s.getState());
   }
   utime_t lat = ceph_clock_now() - start;
   logger->inc(l_rocksdb_gets);
@@ -834,7 +1192,13 @@ void RocksDBStore::compact()
 {
   logger->inc(l_rocksdb_compact);
   rocksdb::CompactRangeOptions options;
-  db->CompactRange(options, nullptr, nullptr);
+  db->CompactRange(options, default_cf, nullptr, nullptr);
+  for (auto cf : cf_handles) {
+    db->CompactRange(
+      options,
+      static_cast<rocksdb::ColumnFamilyHandle*>(cf.second),
+      nullptr, nullptr);
+  }
 }
 
 
@@ -848,7 +1212,11 @@ void RocksDBStore::compact_thread_entry()
       logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
       compact_queue_lock.Unlock();
       logger->inc(l_rocksdb_compact_range);
-      compact_range(range.first, range.second);
+      if (range.first.empty() && range.second.empty()) {
+        compact();
+      } else {
+        compact_range(range.first, range.second);
+      }
       compact_queue_lock.Lock();
       continue;
     }
@@ -859,7 +1227,7 @@ void RocksDBStore::compact_thread_entry()
 
 void RocksDBStore::compact_range_async(const string& start, const string& end)
 {
-  Mutex::Locker l(compact_queue_lock);
+  std::lock_guard l(compact_queue_lock);
 
   // try to merge adjacent ranges.  this is O(n), but the queue should
   // be short.  note that we do not cover all overlap cases and merge
@@ -914,75 +1282,6 @@ void RocksDBStore::compact_range(const string& start, const string& end)
   db->CompactRange(options, &cstart, &cend);
 }
 
-int64_t RocksDBStore::request_cache_bytes(PriorityCache::Priority pri, uint64_t chunk_bytes) const
-{
-  auto cache = bbt_opts.block_cache;
-
-  int64_t assigned = get_cache_bytes(pri);
-  int64_t usage = 0;
-  int64_t request = 0;
-  switch (pri) {
-  // PRI0 is for rocksdb's high priority items (indexes/filters)
-  case PriorityCache::Priority::PRI0:
-    {
-      usage += cache->GetPinnedUsage();
-      if (g_conf->rocksdb_cache_type == "binned_lru") {
-        auto binned_cache =
-            std::static_pointer_cast<rocksdb_cache::BinnedLRUCache>(cache);
-        usage += binned_cache->GetHighPriPoolUsage();
-      }
-      break;
-    }
-  // All other cache items are currently shoved into the LAST priority. 
-  case PriorityCache::Priority::LAST:
-    { 
-      usage = get_cache_usage() - cache->GetPinnedUsage(); 
-      if (g_conf->rocksdb_cache_type == "binned_lru") {
-        auto binned_cache =
-            std::static_pointer_cast<rocksdb_cache::BinnedLRUCache>(cache);
-        usage -= binned_cache->GetHighPriPoolUsage();
-      }
-      break;
-    }
-  default:
-    break;
-  }
-  request = PriorityCache::get_chunk(usage, chunk_bytes);
-  request = (request > assigned) ? request - assigned : 0;
-  dout(10) << __func__ << " Priority: " << static_cast<uint32_t>(pri) 
-           << " Usage: " << usage << " Request: " << request << dendl;
-  return request;
-}
-
-int64_t RocksDBStore::get_cache_usage() const
-{
-  return static_cast<int64_t>(bbt_opts.block_cache->GetUsage());
-}
-
-int64_t RocksDBStore::commit_cache_size()
-{
-  size_t old_bytes = bbt_opts.block_cache->GetCapacity();
-  int64_t total_bytes = get_cache_bytes();
-  dout(10) << __func__ << " old: " << old_bytes
-           << " new: " << total_bytes << dendl;
-  bbt_opts.block_cache->SetCapacity((size_t) total_bytes);
-
-  // Set the high priority pool ratio is this is the binned LRU cache.
-  if (g_conf->rocksdb_cache_type == "binned_lru") {
-    auto binned_cache =
-        std::static_pointer_cast<rocksdb_cache::BinnedLRUCache>(bbt_opts.block_cache);
-    int64_t high_pri_bytes = get_cache_bytes(PriorityCache::Priority::PRI0);
-    double ratio = (double) high_pri_bytes / total_bytes;
-    dout(10) << __func__ << " High Pri Pool Ratio set to " << ratio << dendl;
-    binned_cache->SetHighPriPoolRatio(ratio);
-  }
-  return total_bytes;
-}
-
-int64_t RocksDBStore::get_cache_capacity() {
-  return bbt_opts.block_cache->GetCapacity();
-}
-
 RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl()
 {
   delete dbiter;
@@ -990,20 +1289,20 @@ RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl()
 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first()
 {
   dbiter->SeekToFirst();
-  assert(!dbiter->status().IsIOError());
+  ceph_assert(!dbiter->status().IsIOError());
   return dbiter->status().ok() ? 0 : -1;
 }
 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string &prefix)
 {
   rocksdb::Slice slice_prefix(prefix);
   dbiter->Seek(slice_prefix);
-  assert(!dbiter->status().IsIOError());
+  ceph_assert(!dbiter->status().IsIOError());
   return dbiter->status().ok() ? 0 : -1;
 }
 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last()
 {
   dbiter->SeekToLast();
-  assert(!dbiter->status().IsIOError());
+  ceph_assert(!dbiter->status().IsIOError());
   return dbiter->status().ok() ? 0 : -1;
 }
 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string &prefix)
@@ -1045,7 +1344,7 @@ int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next()
   if (valid()) {
     dbiter->Next();
   }
-  assert(!dbiter->status().IsIOError());
+  ceph_assert(!dbiter->status().IsIOError());
   return dbiter->status().ok() ? 0 : -1;
 }
 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev()
@@ -1053,7 +1352,7 @@ int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev()
   if (valid()) {
     dbiter->Prev();
   }
-  assert(!dbiter->status().IsIOError());
+  ceph_assert(!dbiter->status().IsIOError());
   return dbiter->status().ok() ? 0 : -1;
 }
 string RocksDBStore::RocksDBWholeSpaceIteratorImpl::key()
@@ -1112,9 +1411,86 @@ string RocksDBStore::past_prefix(const string &prefix)
   return limit;
 }
 
-RocksDBStore::WholeSpaceIterator RocksDBStore::_get_iterator()
+RocksDBStore::WholeSpaceIterator RocksDBStore::get_wholespace_iterator()
 {
   return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
-        db->NewIterator(rocksdb::ReadOptions()));
+    db->NewIterator(rocksdb::ReadOptions(), default_cf));
 }
 
+class CFIteratorImpl : public KeyValueDB::IteratorImpl {
+protected:
+  string prefix;
+  rocksdb::Iterator *dbiter;
+public:
+  explicit CFIteratorImpl(const std::string& p,
+                                rocksdb::Iterator *iter)
+    : prefix(p), dbiter(iter) { }
+  ~CFIteratorImpl() {
+    delete dbiter;
+  }
+
+  int seek_to_first() override {
+    dbiter->SeekToFirst();
+    return dbiter->status().ok() ? 0 : -1;
+  }
+  int seek_to_last() override {
+    dbiter->SeekToLast();
+    return dbiter->status().ok() ? 0 : -1;
+  }
+  int upper_bound(const string &after) override {
+    lower_bound(after);
+    if (valid() && (key() == after)) {
+      next();
+    }
+    return dbiter->status().ok() ? 0 : -1;
+  }
+  int lower_bound(const string &to) override {
+    rocksdb::Slice slice_bound(to);
+    dbiter->Seek(slice_bound);
+    return dbiter->status().ok() ? 0 : -1;
+  }
+  int next() override {
+    if (valid()) {
+      dbiter->Next();
+    }
+    return dbiter->status().ok() ? 0 : -1;
+  }
+  int prev() override {
+    if (valid()) {
+      dbiter->Prev();
+    }
+    return dbiter->status().ok() ? 0 : -1;
+  }
+  bool valid() override {
+    return dbiter->Valid();
+  }
+  string key() override {
+    return dbiter->key().ToString();
+  }
+  std::pair<std::string, std::string> raw_key() override {
+    return make_pair(prefix, key());
+  }
+  bufferlist value() override {
+    return to_bufferlist(dbiter->value());
+  }
+  bufferptr value_as_ptr() override {
+    rocksdb::Slice val = dbiter->value();
+    return bufferptr(val.data(), val.size());
+  }
+  int status() override {
+    return dbiter->status().ok() ? 0 : -1;
+  }
+};
+
+KeyValueDB::Iterator RocksDBStore::get_iterator(const std::string& prefix)
+{
+  rocksdb::ColumnFamilyHandle *cf_handle =
+    static_cast<rocksdb::ColumnFamilyHandle*>(get_cf_handle(prefix));
+  if (cf_handle) {
+    return std::make_shared<CFIteratorImpl>(
+      prefix,
+      db->NewIterator(rocksdb::ReadOptions(), cf_handle));
+  } else {
+    return KeyValueDB::get_iterator(prefix);
+  }
+}