]> git.proxmox.com Git - ceph.git/blame - ceph/src/kv/RocksDBStore.h
import ceph 16.2.6
[ceph.git] / ceph / src / kv / RocksDBStore.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3#ifndef ROCKS_DB_STORE_H
4#define ROCKS_DB_STORE_H
5
6#include "include/types.h"
7#include "include/buffer_fwd.h"
8#include "KeyValueDB.h"
9#include <set>
10#include <map>
11#include <string>
12#include <memory>
13#include <boost/scoped_ptr.hpp>
14#include "rocksdb/write_batch.h"
15#include "rocksdb/perf_context.h"
16#include "rocksdb/iostats_context.h"
17#include "rocksdb/statistics.h"
18#include "rocksdb/table.h"
f67539c2 19#include "rocksdb/db.h"
11fdf7f2 20#include "kv/rocksdb_cache/BinnedLRUCache.h"
7c673cae
FG
21#include <errno.h>
22#include "common/errno.h"
23#include "common/dout.h"
11fdf7f2 24#include "include/ceph_assert.h"
9f95a23c 25#include "include/common_fwd.h"
7c673cae
FG
26#include "common/Formatter.h"
27#include "common/Cond.h"
7c673cae 28#include "common/ceph_context.h"
91327a77 29#include "common/PriorityCache.h"
f67539c2 30#include "common/pretty_binary.h"
7c673cae
FG
31
32enum {
33 l_rocksdb_first = 34300,
34 l_rocksdb_gets,
7c673cae
FG
35 l_rocksdb_get_latency,
36 l_rocksdb_submit_latency,
37 l_rocksdb_submit_sync_latency,
38 l_rocksdb_compact,
39 l_rocksdb_compact_range,
40 l_rocksdb_compact_queue_merge,
41 l_rocksdb_compact_queue_len,
42 l_rocksdb_write_wal_time,
43 l_rocksdb_write_memtable_time,
44 l_rocksdb_write_delay_time,
45 l_rocksdb_write_pre_and_post_process_time,
46 l_rocksdb_last,
47};
48
49namespace rocksdb{
50 class DB;
51 class Env;
52 class Cache;
53 class FilterPolicy;
54 class Snapshot;
55 class Slice;
56 class WriteBatch;
57 class Iterator;
58 class Logger;
11fdf7f2 59 class ColumnFamilyHandle;
7c673cae
FG
60 struct Options;
61 struct BlockBasedTableOptions;
11fdf7f2
TL
62 struct DBOptions;
63 struct ColumnFamilyOptions;
7c673cae
FG
64}
65
66extern rocksdb::Logger *create_rocksdb_ceph_logger();
67
68/**
69 * Uses RocksDB to implement the KeyValueDB interface
70 */
71class RocksDBStore : public KeyValueDB {
72 CephContext *cct;
73 PerfCounters *logger;
f67539c2
TL
74 std::string path;
75 std::map<std::string,std::string> kv_options;
7c673cae
FG
76 void *priv;
77 rocksdb::DB *db;
78 rocksdb::Env *env;
f67539c2 79 const rocksdb::Comparator* comparator;
7c673cae
FG
80 std::shared_ptr<rocksdb::Statistics> dbstats;
81 rocksdb::BlockBasedTableOptions bbt_opts;
f67539c2 82 std::string options_str;
7c673cae 83
31f18b77 84 uint64_t cache_size = 0;
224ce89b 85 bool set_cache_flag = false;
f67539c2
TL
86 friend class ShardMergeIteratorImpl;
87 friend class WholeMergeIteratorImpl;
88 /*
89 * See RocksDB's definition of a column family(CF) and how to use it.
90 * The interfaces of KeyValueDB is extended, when a column family is created.
91 * Prefix will be the name of column family to use.
92 */
93public:
94 struct ColumnFamily {
95 string name; //< name of this individual column family
96 size_t shard_cnt; //< count of shards
97 string options; //< configure option string for this CF
98 uint32_t hash_l; //< first character to take for hash calc.
99 uint32_t hash_h; //< last character to take for hash calc.
100 ColumnFamily(const string &name, size_t shard_cnt, const string &options,
101 uint32_t hash_l, uint32_t hash_h)
102 : name(name), shard_cnt(shard_cnt), options(options), hash_l(hash_l), hash_h(hash_h) {}
103 };
104private:
105 friend std::ostream& operator<<(std::ostream& out, const ColumnFamily& cf);
31f18b77 106
11fdf7f2
TL
107 bool must_close_default_cf = false;
108 rocksdb::ColumnFamilyHandle *default_cf = nullptr;
109
f67539c2
TL
110 /// column families in use, name->handles
111 struct prefix_shards {
112 uint32_t hash_l; //< first character to take for hash calc.
113 uint32_t hash_h; //< last character to take for hash calc.
114 std::vector<rocksdb::ColumnFamilyHandle *> handles;
115 };
116 std::unordered_map<std::string, prefix_shards> cf_handles;
117 std::unordered_map<uint32_t, std::string> cf_ids_to_prefix;
118 std::unordered_map<std::string, rocksdb::BlockBasedTableOptions> cf_bbt_opts;
119
120 void add_column_family(const std::string& cf_name, uint32_t hash_l, uint32_t hash_h,
121 size_t shard_idx, rocksdb::ColumnFamilyHandle *handle);
122 bool is_column_family(const std::string& prefix);
123 rocksdb::ColumnFamilyHandle *get_cf_handle(const std::string& prefix, const std::string& key);
124 rocksdb::ColumnFamilyHandle *get_cf_handle(const std::string& prefix, const char* key, size_t keylen);
125
11fdf7f2 126 int submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t);
f67539c2 127 int install_cf_mergeop(const std::string &cf_name, rocksdb::ColumnFamilyOptions *cf_opt);
11fdf7f2 128 int create_db_dir();
f67539c2
TL
129 int do_open(std::ostream &out, bool create_if_missing, bool open_readonly,
130 const std::string& cfs="");
11fdf7f2 131 int load_rocksdb_options(bool create_if_missing, rocksdb::Options& opt);
f67539c2
TL
132public:
133 static bool parse_sharding_def(const std::string_view text_def,
134 std::vector<ColumnFamily>& sharding_def,
135 char const* *error_position = nullptr,
136 std::string *error_msg = nullptr);
137 const rocksdb::Comparator* get_comparator() const {
138 return comparator;
139 }
7c673cae 140
f67539c2
TL
141private:
142 static void sharding_def_to_columns(const std::vector<ColumnFamily>& sharding_def,
143 std::vector<std::string>& columns);
144 int create_shards(const rocksdb::Options& opt,
145 const vector<ColumnFamily>& sharding_def);
146 int apply_sharding(const rocksdb::Options& opt,
147 const std::string& sharding_text);
148 int verify_sharding(const rocksdb::Options& opt,
149 std::vector<rocksdb::ColumnFamilyDescriptor>& existing_cfs,
150 std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> >& existing_cfs_shard,
151 std::vector<rocksdb::ColumnFamilyDescriptor>& missing_cfs,
152 std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> >& missing_cfs_shard);
153 std::shared_ptr<rocksdb::Cache> create_block_cache(const std::string& cache_type, size_t cache_size, double cache_prio_high = 0.0);
522d829b 154 int split_column_family_options(const std::string& opts_str,
f67539c2
TL
155 std::unordered_map<std::string, std::string>* column_opts_map,
156 std::string* block_cache_opt);
522d829b
TL
157 int apply_block_cache_options(const std::string& column_name,
158 const std::string& block_cache_opt,
159 rocksdb::ColumnFamilyOptions* cf_opt);
160 int update_column_family_options(const std::string& base_name,
161 const std::string& more_options,
162 rocksdb::ColumnFamilyOptions* cf_opt);
7c673cae 163 // manage async compactions
9f95a23c
TL
164 ceph::mutex compact_queue_lock =
165 ceph::make_mutex("RocksDBStore::compact_thread_lock");
166 ceph::condition_variable compact_queue_cond;
f67539c2 167 std::list<std::pair<std::string,std::string>> compact_queue;
7c673cae
FG
168 bool compact_queue_stop;
169 class CompactThread : public Thread {
170 RocksDBStore *db;
171 public:
172 explicit CompactThread(RocksDBStore *d) : db(d) {}
173 void *entry() override {
174 db->compact_thread_entry();
175 return NULL;
176 }
177 friend class RocksDBStore;
178 } compact_thread;
179
180 void compact_thread_entry();
181
f67539c2
TL
182 void compact_range(const std::string& start, const std::string& end);
183 void compact_range_async(const std::string& start, const std::string& end);
184 int tryInterpret(const std::string& key, const std::string& val,
185 rocksdb::Options& opt);
7c673cae
FG
186
187public:
188 /// compact the underlying rocksdb store
189 bool compact_on_mount;
190 bool disableWAL;
9f95a23c 191 const uint64_t delete_range_threshold;
7c673cae 192 void compact() override;
11fdf7f2
TL
193
194 void compact_async() override {
f67539c2 195 compact_range_async({}, {});
11fdf7f2 196 }
7c673cae 197
f67539c2 198 int ParseOptionsFromString(const std::string& opt_str, rocksdb::Options& opt);
9f95a23c
TL
199 static int ParseOptionsFromStringStatic(
200 CephContext* cct,
f67539c2 201 const std::string& opt_str,
9f95a23c 202 rocksdb::Options &opt,
f67539c2
TL
203 std::function<int(const std::string&, const std::string&, rocksdb::Options&)> interp);
204 static int _test_init(const std::string& dir);
205 int init(std::string options_str) override;
7c673cae 206 /// compact rocksdb for all keys with a given prefix
f67539c2 207 void compact_prefix(const std::string& prefix) override {
7c673cae
FG
208 compact_range(prefix, past_prefix(prefix));
209 }
f67539c2 210 void compact_prefix_async(const std::string& prefix) override {
7c673cae
FG
211 compact_range_async(prefix, past_prefix(prefix));
212 }
213
f67539c2
TL
214 void compact_range(const std::string& prefix, const std::string& start,
215 const std::string& end) override {
7c673cae
FG
216 compact_range(combine_strings(prefix, start), combine_strings(prefix, end));
217 }
f67539c2
TL
218 void compact_range_async(const std::string& prefix, const std::string& start,
219 const std::string& end) override {
7c673cae
FG
220 compact_range_async(combine_strings(prefix, start), combine_strings(prefix, end));
221 }
222
f67539c2 223 RocksDBStore(CephContext *c, const std::string &path, std::map<std::string,std::string> opt, void *p) :
7c673cae
FG
224 cct(c),
225 logger(NULL),
226 path(path),
11fdf7f2 227 kv_options(opt),
7c673cae
FG
228 priv(p),
229 db(NULL),
230 env(static_cast<rocksdb::Env*>(p)),
f67539c2 231 comparator(nullptr),
7c673cae 232 dbstats(NULL),
7c673cae
FG
233 compact_queue_stop(false),
234 compact_thread(this),
235 compact_on_mount(false),
236 disableWAL(false),
9f95a23c 237 delete_range_threshold(cct->_conf.get_val<uint64_t>("rocksdb_delete_range_threshold"))
7c673cae
FG
238 {}
239
240 ~RocksDBStore() override;
241
f67539c2 242 static bool check_omap_dir(std::string &omap_dir);
7c673cae 243 /// Opens underlying db
f67539c2
TL
244 int open(std::ostream &out, const std::string& cfs="") override {
245 return do_open(out, false, false, cfs);
7c673cae
FG
246 }
247 /// Creates underlying db if missing and opens it
f67539c2
TL
248 int create_and_open(std::ostream &out,
249 const std::string& cfs="") override;
11fdf7f2 250
f67539c2
TL
251 int open_read_only(std::ostream &out, const std::string& cfs="") override {
252 return do_open(out, false, true, cfs);
11fdf7f2 253 }
7c673cae
FG
254
255 void close() override;
256
11fdf7f2 257 int repair(std::ostream &out) override;
7c673cae 258 void split_stats(const std::string &s, char delim, std::vector<std::string> &elems);
f67539c2 259 void get_statistics(ceph::Formatter *f) override;
7c673cae 260
3efd9988
FG
261 PerfCounters *get_perf_counters() override
262 {
263 return logger;
264 }
265
9f95a23c
TL
266 bool get_property(
267 const std::string &property,
268 uint64_t *out) final;
269
f67539c2
TL
270 int64_t estimate_prefix_size(const std::string& prefix,
271 const std::string& key_prefix) override;
272 struct RocksWBHandler;
7c673cae
FG
273 class RocksDBTransactionImpl : public KeyValueDB::TransactionImpl {
274 public:
275 rocksdb::WriteBatch bat;
276 RocksDBStore *db;
277
278 explicit RocksDBTransactionImpl(RocksDBStore *_db);
11fdf7f2
TL
279 private:
280 void put_bat(
281 rocksdb::WriteBatch& bat,
282 rocksdb::ColumnFamilyHandle *cf,
f67539c2
TL
283 const std::string &k,
284 const ceph::bufferlist &to_set_bl);
11fdf7f2 285 public:
7c673cae 286 void set(
f67539c2
TL
287 const std::string &prefix,
288 const std::string &k,
289 const ceph::bufferlist &bl) override;
7c673cae 290 void set(
f67539c2 291 const std::string &prefix,
7c673cae
FG
292 const char *k,
293 size_t keylen,
f67539c2 294 const ceph::bufferlist &bl) override;
7c673cae 295 void rmkey(
f67539c2
TL
296 const std::string &prefix,
297 const std::string &k) override;
7c673cae 298 void rmkey(
f67539c2 299 const std::string &prefix,
7c673cae
FG
300 const char *k,
301 size_t keylen) override;
302 void rm_single_key(
f67539c2
TL
303 const std::string &prefix,
304 const std::string &k) override;
7c673cae 305 void rmkeys_by_prefix(
f67539c2 306 const std::string &prefix
7c673cae
FG
307 ) override;
308 void rm_range_keys(
f67539c2
TL
309 const std::string &prefix,
310 const std::string &start,
311 const std::string &end) override;
7c673cae 312 void merge(
f67539c2
TL
313 const std::string& prefix,
314 const std::string& k,
315 const ceph::bufferlist &bl) override;
7c673cae
FG
316 };
317
318 KeyValueDB::Transaction get_transaction() override {
319 return std::make_shared<RocksDBTransactionImpl>(this);
320 }
321
322 int submit_transaction(KeyValueDB::Transaction t) override;
323 int submit_transaction_sync(KeyValueDB::Transaction t) override;
324 int get(
f67539c2
TL
325 const std::string &prefix,
326 const std::set<std::string> &key,
327 std::map<std::string, ceph::bufferlist> *out
7c673cae
FG
328 ) override;
329 int get(
f67539c2
TL
330 const std::string &prefix,
331 const std::string &key,
332 ceph::bufferlist *out
7c673cae
FG
333 ) override;
334 int get(
f67539c2 335 const std::string &prefix,
7c673cae
FG
336 const char *key,
337 size_t keylen,
f67539c2 338 ceph::bufferlist *out) override;
7c673cae
FG
339
340
341 class RocksDBWholeSpaceIteratorImpl :
342 public KeyValueDB::WholeSpaceIteratorImpl {
343 protected:
344 rocksdb::Iterator *dbiter;
345 public:
346 explicit RocksDBWholeSpaceIteratorImpl(rocksdb::Iterator *iter) :
347 dbiter(iter) { }
348 //virtual ~RocksDBWholeSpaceIteratorImpl() { }
349 ~RocksDBWholeSpaceIteratorImpl() override;
350
351 int seek_to_first() override;
f67539c2 352 int seek_to_first(const std::string &prefix) override;
7c673cae 353 int seek_to_last() override;
f67539c2
TL
354 int seek_to_last(const std::string &prefix) override;
355 int upper_bound(const std::string &prefix, const std::string &after) override;
356 int lower_bound(const std::string &prefix, const std::string &to) override;
7c673cae
FG
357 bool valid() override;
358 int next() override;
359 int prev() override;
f67539c2
TL
360 std::string key() override;
361 std::pair<std::string,std::string> raw_key() override;
362 bool raw_key_is_prefixed(const std::string &prefix) override;
363 ceph::bufferlist value() override;
364 ceph::bufferptr value_as_ptr() override;
7c673cae
FG
365 int status() override;
366 size_t key_size() override;
367 size_t value_size() override;
368 };
369
f67539c2
TL
370 Iterator get_iterator(const std::string& prefix, IteratorOpts opts = 0) override;
371private:
372 /// this iterator spans single cf
373 rocksdb::Iterator* new_shard_iterator(rocksdb::ColumnFamilyHandle* cf);
374public:
7c673cae 375 /// Utility
f67539c2
TL
376 static std::string combine_strings(const std::string &prefix, const std::string &value) {
377 std::string out = prefix;
7c673cae
FG
378 out.push_back(0);
379 out.append(value);
380 return out;
381 }
f67539c2 382 static void combine_strings(const std::string &prefix,
7c673cae 383 const char *key, size_t keylen,
f67539c2 384 std::string *out) {
7c673cae
FG
385 out->reserve(prefix.size() + 1 + keylen);
386 *out = prefix;
387 out->push_back(0);
388 out->append(key, keylen);
389 }
390
f67539c2 391 static int split_key(rocksdb::Slice in, std::string *prefix, std::string *key);
7c673cae 392
f67539c2 393 static std::string past_prefix(const std::string &prefix);
7c673cae
FG
394
395 class MergeOperatorRouter;
11fdf7f2 396 class MergeOperatorLinker;
7c673cae 397 friend class MergeOperatorRouter;
11fdf7f2
TL
398 int set_merge_operator(
399 const std::string& prefix,
400 std::shared_ptr<KeyValueDB::MergeOperator> mop) override;
f67539c2 401 std::string assoc_name; ///< Name of associative operator
7c673cae 402
f67539c2 403 uint64_t get_estimated_size(std::map<std::string,uint64_t> &extra) override {
7c673cae
FG
404 DIR *store_dir = opendir(path.c_str());
405 if (!store_dir) {
406 lderr(cct) << __func__ << " something happened opening the store: "
407 << cpp_strerror(errno) << dendl;
408 return 0;
409 }
410
411 uint64_t total_size = 0;
412 uint64_t sst_size = 0;
413 uint64_t log_size = 0;
414 uint64_t misc_size = 0;
415
416 struct dirent *entry = NULL;
417 while ((entry = readdir(store_dir)) != NULL) {
f67539c2 418 std::string n(entry->d_name);
7c673cae
FG
419
420 if (n == "." || n == "..")
421 continue;
422
f67539c2 423 std::string fpath = path + '/' + n;
7c673cae
FG
424 struct stat s;
425 int err = stat(fpath.c_str(), &s);
426 if (err < 0)
427 err = -errno;
428 // we may race against rocksdb while reading files; this should only
429 // happen when those files are being updated, data is being shuffled
430 // and files get removed, in which case there's not much of a problem
431 // as we'll get to them next time around.
432 if (err == -ENOENT) {
433 continue;
434 }
435 if (err < 0) {
436 lderr(cct) << __func__ << " error obtaining stats for " << fpath
437 << ": " << cpp_strerror(err) << dendl;
438 goto err;
439 }
440
441 size_t pos = n.find_last_of('.');
f67539c2 442 if (pos == std::string::npos) {
7c673cae
FG
443 misc_size += s.st_size;
444 continue;
445 }
446
f67539c2 447 std::string ext = n.substr(pos+1);
7c673cae
FG
448 if (ext == "sst") {
449 sst_size += s.st_size;
450 } else if (ext == "log") {
451 log_size += s.st_size;
452 } else {
453 misc_size += s.st_size;
454 }
455 }
456
457 total_size = sst_size + log_size + misc_size;
458
459 extra["sst"] = sst_size;
460 extra["log"] = log_size;
461 extra["misc"] = misc_size;
462 extra["total"] = total_size;
463
464err:
465 closedir(store_dir);
466 return total_size;
467 }
468
11fdf7f2
TL
469 virtual int64_t get_cache_usage() const override {
470 return static_cast<int64_t>(bbt_opts.block_cache->GetUsage());
91327a77 471 }
91327a77 472
f67539c2
TL
473 virtual int64_t get_cache_usage(string prefix) const override {
474 auto it = cf_bbt_opts.find(prefix);
475 if (it != cf_bbt_opts.end() && it->second.block_cache) {
476 return static_cast<int64_t>(it->second.block_cache->GetUsage());
477 }
478 return -EINVAL;
479 }
480
31f18b77
FG
481 int set_cache_size(uint64_t s) override {
482 cache_size = s;
224ce89b 483 set_cache_flag = true;
31f18b77
FG
484 return 0;
485 }
7c673cae 486
f67539c2
TL
487 virtual std::shared_ptr<PriorityCache::PriCache>
488 get_priority_cache() const override {
11fdf7f2
TL
489 return dynamic_pointer_cast<PriorityCache::PriCache>(
490 bbt_opts.block_cache);
491 }
492
f67539c2
TL
493 virtual std::shared_ptr<PriorityCache::PriCache>
494 get_priority_cache(string prefix) const override {
495 auto it = cf_bbt_opts.find(prefix);
496 if (it != cf_bbt_opts.end()) {
497 return dynamic_pointer_cast<PriorityCache::PriCache>(
498 it->second.block_cache);
499 }
500 return nullptr;
501 }
7c673cae 502
f67539c2
TL
503 WholeSpaceIterator get_wholespace_iterator(IteratorOpts opts = 0) override;
504private:
505 WholeSpaceIterator get_default_cf_iterator();
506
507 using cf_deleter_t = std::function<void(rocksdb::ColumnFamilyHandle*)>;
508 using columns_t = std::map<std::string,
509 std::unique_ptr<rocksdb::ColumnFamilyHandle,
510 cf_deleter_t>>;
511 int prepare_for_reshard(const std::string& new_sharding,
512 columns_t& to_process_columns);
513 int reshard_cleanup(const columns_t& current_columns);
514public:
515 struct resharding_ctrl {
516 size_t bytes_per_iterator = 10000000; /// amount of data to process before refreshing iterator
517 size_t keys_per_iterator = 10000;
518 size_t bytes_per_batch = 1000000; /// amount of data before submitting batch
519 size_t keys_per_batch = 1000;
520 bool unittest_fail_after_first_batch = false;
521 bool unittest_fail_after_processing_column = false;
522 bool unittest_fail_after_successful_processing = false;
523 };
524 int reshard(const std::string& new_sharding, const resharding_ctrl* ctrl = nullptr);
525 bool get_sharding(std::string& sharding);
7c673cae 526
f67539c2 527};
7c673cae
FG
528
529#endif