]> git.proxmox.com Git - ceph.git/blob - ceph/src/kv/RocksDBStore.h
25a2045ffafe91836457ce5d0671892e76a2923c
[ceph.git] / ceph / src / kv / RocksDBStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #ifndef ROCKS_DB_STORE_H
4 #define ROCKS_DB_STORE_H
5
6 #include "include/types.h"
7 #include "include/buffer_fwd.h"
8 #include "KeyValueDB.h"
9 #include <set>
10 #include <map>
11 #include <string>
12 #include <memory>
13 #include <boost/scoped_ptr.hpp>
14 #include "rocksdb/write_batch.h"
15 #include "rocksdb/perf_context.h"
16 #include "rocksdb/iostats_context.h"
17 #include "rocksdb/statistics.h"
18 #include "rocksdb/table.h"
19 #include "rocksdb/db.h"
20 #include "kv/rocksdb_cache/BinnedLRUCache.h"
21 #include <errno.h>
22 #include "common/errno.h"
23 #include "common/dout.h"
24 #include "include/ceph_assert.h"
25 #include "include/common_fwd.h"
26 #include "common/Formatter.h"
27 #include "common/Cond.h"
28 #include "common/ceph_context.h"
29 #include "common/PriorityCache.h"
30 #include "common/pretty_binary.h"
31
32 enum {
33 l_rocksdb_first = 34300,
34 l_rocksdb_get_latency,
35 l_rocksdb_submit_latency,
36 l_rocksdb_submit_sync_latency,
37 l_rocksdb_compact,
38 l_rocksdb_compact_range,
39 l_rocksdb_compact_queue_merge,
40 l_rocksdb_compact_queue_len,
41 l_rocksdb_write_wal_time,
42 l_rocksdb_write_memtable_time,
43 l_rocksdb_write_delay_time,
44 l_rocksdb_write_pre_and_post_process_time,
45 l_rocksdb_last,
46 };
47
48 namespace rocksdb{
49 class DB;
50 class Env;
51 class Cache;
52 class FilterPolicy;
53 class Snapshot;
54 class Slice;
55 class WriteBatch;
56 class Iterator;
57 class Logger;
58 class ColumnFamilyHandle;
59 struct Options;
60 struct BlockBasedTableOptions;
61 struct DBOptions;
62 struct ColumnFamilyOptions;
63 }
64
65 extern rocksdb::Logger *create_rocksdb_ceph_logger();
66
67 inline rocksdb::Slice make_slice(const std::optional<std::string>& bound) {
68 if (bound) {
69 return {*bound};
70 } else {
71 return {};
72 }
73 }
74
75 /**
76 * Uses RocksDB to implement the KeyValueDB interface
77 */
78 class RocksDBStore : public KeyValueDB {
79 CephContext *cct;
80 PerfCounters *logger;
81 std::string path;
82 std::map<std::string,std::string> kv_options;
83 void *priv;
84 rocksdb::DB *db;
85 rocksdb::Env *env;
86 const rocksdb::Comparator* comparator;
87 std::shared_ptr<rocksdb::Statistics> dbstats;
88 rocksdb::BlockBasedTableOptions bbt_opts;
89 std::string options_str;
90
91 uint64_t cache_size = 0;
92 bool set_cache_flag = false;
93 friend class ShardMergeIteratorImpl;
94 friend class CFIteratorImpl;
95 friend class WholeMergeIteratorImpl;
96 /*
97 * See RocksDB's definition of a column family(CF) and how to use it.
98 * The interfaces of KeyValueDB is extended, when a column family is created.
99 * Prefix will be the name of column family to use.
100 */
101 public:
102 struct ColumnFamily {
103 std::string name; //< name of this individual column family
104 size_t shard_cnt; //< count of shards
105 std::string options; //< configure option string for this CF
106 uint32_t hash_l; //< first character to take for hash calc.
107 uint32_t hash_h; //< last character to take for hash calc.
108 ColumnFamily(const std::string &name, size_t shard_cnt, const std::string &options,
109 uint32_t hash_l, uint32_t hash_h)
110 : name(name), shard_cnt(shard_cnt), options(options), hash_l(hash_l), hash_h(hash_h) {}
111 };
112 private:
113 friend std::ostream& operator<<(std::ostream& out, const ColumnFamily& cf);
114
115 bool must_close_default_cf = false;
116 rocksdb::ColumnFamilyHandle *default_cf = nullptr;
117
118 /// column families in use, name->handles
119 struct prefix_shards {
120 uint32_t hash_l; //< first character to take for hash calc.
121 uint32_t hash_h; //< last character to take for hash calc.
122 std::vector<rocksdb::ColumnFamilyHandle *> handles;
123 };
124 std::unordered_map<std::string, prefix_shards> cf_handles;
125 std::unordered_map<uint32_t, std::string> cf_ids_to_prefix;
126 std::unordered_map<std::string, rocksdb::BlockBasedTableOptions> cf_bbt_opts;
127
128 void add_column_family(const std::string& cf_name, uint32_t hash_l, uint32_t hash_h,
129 size_t shard_idx, rocksdb::ColumnFamilyHandle *handle);
130 bool is_column_family(const std::string& prefix);
131 std::string_view get_key_hash_view(const prefix_shards& shards, const char* key, const size_t keylen);
132 rocksdb::ColumnFamilyHandle *get_key_cf(const prefix_shards& shards, const char* key, const size_t keylen);
133 rocksdb::ColumnFamilyHandle *get_cf_handle(const std::string& prefix, const std::string& key);
134 rocksdb::ColumnFamilyHandle *get_cf_handle(const std::string& prefix, const char* key, size_t keylen);
135 rocksdb::ColumnFamilyHandle *get_cf_handle(const std::string& prefix, const IteratorBounds& bounds);
136
137 int submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t);
138 int install_cf_mergeop(const std::string &cf_name, rocksdb::ColumnFamilyOptions *cf_opt);
139 int create_db_dir();
140 int do_open(std::ostream &out, bool create_if_missing, bool open_readonly,
141 const std::string& cfs="");
142 int load_rocksdb_options(bool create_if_missing, rocksdb::Options& opt);
143 public:
144 static bool parse_sharding_def(const std::string_view text_def,
145 std::vector<ColumnFamily>& sharding_def,
146 char const* *error_position = nullptr,
147 std::string *error_msg = nullptr);
148 const rocksdb::Comparator* get_comparator() const {
149 return comparator;
150 }
151
152 private:
153 static void sharding_def_to_columns(const std::vector<ColumnFamily>& sharding_def,
154 std::vector<std::string>& columns);
155 int create_shards(const rocksdb::Options& opt,
156 const std::vector<ColumnFamily>& sharding_def);
157 int apply_sharding(const rocksdb::Options& opt,
158 const std::string& sharding_text);
159 int verify_sharding(const rocksdb::Options& opt,
160 std::vector<rocksdb::ColumnFamilyDescriptor>& existing_cfs,
161 std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> >& existing_cfs_shard,
162 std::vector<rocksdb::ColumnFamilyDescriptor>& missing_cfs,
163 std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> >& missing_cfs_shard);
164 std::shared_ptr<rocksdb::Cache> create_block_cache(const std::string& cache_type, size_t cache_size, double cache_prio_high = 0.0);
165 int split_column_family_options(const std::string& opts_str,
166 std::unordered_map<std::string, std::string>* column_opts_map,
167 std::string* block_cache_opt);
168 int apply_block_cache_options(const std::string& column_name,
169 const std::string& block_cache_opt,
170 rocksdb::ColumnFamilyOptions* cf_opt);
171 int update_column_family_options(const std::string& base_name,
172 const std::string& more_options,
173 rocksdb::ColumnFamilyOptions* cf_opt);
174 // manage async compactions
175 ceph::mutex compact_queue_lock =
176 ceph::make_mutex("RocksDBStore::compact_thread_lock");
177 ceph::condition_variable compact_queue_cond;
178 std::list<std::pair<std::string,std::string>> compact_queue;
179 bool compact_queue_stop;
180 class CompactThread : public Thread {
181 RocksDBStore *db;
182 public:
183 explicit CompactThread(RocksDBStore *d) : db(d) {}
184 void *entry() override {
185 db->compact_thread_entry();
186 return NULL;
187 }
188 friend class RocksDBStore;
189 } compact_thread;
190
191 void compact_thread_entry();
192
193 void compact_range(const std::string& start, const std::string& end);
194 void compact_range_async(const std::string& start, const std::string& end);
195 int tryInterpret(const std::string& key, const std::string& val,
196 rocksdb::Options& opt);
197
198 public:
199 /// compact the underlying rocksdb store
200 bool compact_on_mount;
201 bool disableWAL;
202 const uint64_t delete_range_threshold;
203 void compact() override;
204
205 void compact_async() override {
206 compact_range_async({}, {});
207 }
208
209 int ParseOptionsFromString(const std::string& opt_str, rocksdb::Options& opt);
210 static int ParseOptionsFromStringStatic(
211 CephContext* cct,
212 const std::string& opt_str,
213 rocksdb::Options &opt,
214 std::function<int(const std::string&, const std::string&, rocksdb::Options&)> interp);
215 static int _test_init(const std::string& dir);
216 int init(std::string options_str) override;
217 /// compact rocksdb for all keys with a given prefix
218 void compact_prefix(const std::string& prefix) override {
219 compact_range(prefix, past_prefix(prefix));
220 }
221 void compact_prefix_async(const std::string& prefix) override {
222 compact_range_async(prefix, past_prefix(prefix));
223 }
224
225 void compact_range(const std::string& prefix, const std::string& start,
226 const std::string& end) override {
227 compact_range(combine_strings(prefix, start), combine_strings(prefix, end));
228 }
229 void compact_range_async(const std::string& prefix, const std::string& start,
230 const std::string& end) override {
231 compact_range_async(combine_strings(prefix, start), combine_strings(prefix, end));
232 }
233
234 RocksDBStore(CephContext *c, const std::string &path, std::map<std::string,std::string> opt, void *p) :
235 cct(c),
236 logger(NULL),
237 path(path),
238 kv_options(opt),
239 priv(p),
240 db(NULL),
241 env(static_cast<rocksdb::Env*>(p)),
242 comparator(nullptr),
243 dbstats(NULL),
244 compact_queue_stop(false),
245 compact_thread(this),
246 compact_on_mount(false),
247 disableWAL(false),
248 delete_range_threshold(cct->_conf.get_val<uint64_t>("rocksdb_delete_range_threshold"))
249 {}
250
251 ~RocksDBStore() override;
252
253 static bool check_omap_dir(std::string &omap_dir);
254 /// Opens underlying db
255 int open(std::ostream &out, const std::string& cfs="") override {
256 return do_open(out, false, false, cfs);
257 }
258 /// Creates underlying db if missing and opens it
259 int create_and_open(std::ostream &out,
260 const std::string& cfs="") override;
261
262 int open_read_only(std::ostream &out, const std::string& cfs="") override {
263 return do_open(out, false, true, cfs);
264 }
265
266 void close() override;
267
268 int repair(std::ostream &out) override;
269 void split_stats(const std::string &s, char delim, std::vector<std::string> &elems);
270 void get_statistics(ceph::Formatter *f) override;
271
272 PerfCounters *get_perf_counters() override
273 {
274 return logger;
275 }
276
277 bool get_property(
278 const std::string &property,
279 uint64_t *out) final;
280
281 int64_t estimate_prefix_size(const std::string& prefix,
282 const std::string& key_prefix) override;
283 struct RocksWBHandler;
284 class RocksDBTransactionImpl : public KeyValueDB::TransactionImpl {
285 public:
286 rocksdb::WriteBatch bat;
287 RocksDBStore *db;
288
289 explicit RocksDBTransactionImpl(RocksDBStore *_db);
290 private:
291 void put_bat(
292 rocksdb::WriteBatch& bat,
293 rocksdb::ColumnFamilyHandle *cf,
294 const std::string &k,
295 const ceph::bufferlist &to_set_bl);
296 public:
297 void set(
298 const std::string &prefix,
299 const std::string &k,
300 const ceph::bufferlist &bl) override;
301 void set(
302 const std::string &prefix,
303 const char *k,
304 size_t keylen,
305 const ceph::bufferlist &bl) override;
306 void rmkey(
307 const std::string &prefix,
308 const std::string &k) override;
309 void rmkey(
310 const std::string &prefix,
311 const char *k,
312 size_t keylen) override;
313 void rm_single_key(
314 const std::string &prefix,
315 const std::string &k) override;
316 void rmkeys_by_prefix(
317 const std::string &prefix
318 ) override;
319 void rm_range_keys(
320 const std::string &prefix,
321 const std::string &start,
322 const std::string &end) override;
323 void merge(
324 const std::string& prefix,
325 const std::string& k,
326 const ceph::bufferlist &bl) override;
327 };
328
329 KeyValueDB::Transaction get_transaction() override {
330 return std::make_shared<RocksDBTransactionImpl>(this);
331 }
332
333 int submit_transaction(KeyValueDB::Transaction t) override;
334 int submit_transaction_sync(KeyValueDB::Transaction t) override;
335 int get(
336 const std::string &prefix,
337 const std::set<std::string> &key,
338 std::map<std::string, ceph::bufferlist> *out
339 ) override;
340 int get(
341 const std::string &prefix,
342 const std::string &key,
343 ceph::bufferlist *out
344 ) override;
345 int get(
346 const std::string &prefix,
347 const char *key,
348 size_t keylen,
349 ceph::bufferlist *out) override;
350
351
352 class RocksDBWholeSpaceIteratorImpl :
353 public KeyValueDB::WholeSpaceIteratorImpl {
354 protected:
355 rocksdb::Iterator *dbiter;
356 public:
357 explicit RocksDBWholeSpaceIteratorImpl(const RocksDBStore* db,
358 rocksdb::ColumnFamilyHandle* cf,
359 const KeyValueDB::IteratorOpts opts)
360 {
361 rocksdb::ReadOptions options = rocksdb::ReadOptions();
362 if (opts & ITERATOR_NOCACHE)
363 options.fill_cache=false;
364 dbiter = db->db->NewIterator(options, cf);
365 }
366 ~RocksDBWholeSpaceIteratorImpl() override;
367
368 int seek_to_first() override;
369 int seek_to_first(const std::string &prefix) override;
370 int seek_to_last() override;
371 int seek_to_last(const std::string &prefix) override;
372 int upper_bound(const std::string &prefix, const std::string &after) override;
373 int lower_bound(const std::string &prefix, const std::string &to) override;
374 bool valid() override;
375 int next() override;
376 int prev() override;
377 std::string key() override;
378 std::pair<std::string,std::string> raw_key() override;
379 bool raw_key_is_prefixed(const std::string &prefix) override;
380 ceph::bufferlist value() override;
381 ceph::bufferptr value_as_ptr() override;
382 int status() override;
383 size_t key_size() override;
384 size_t value_size() override;
385 };
386
387 Iterator get_iterator(const std::string& prefix, IteratorOpts opts = 0, IteratorBounds = IteratorBounds()) override;
388 private:
389 /// this iterator spans single cf
390 rocksdb::Iterator* new_shard_iterator(rocksdb::ColumnFamilyHandle* cf);
391 public:
392 /// Utility
393 static std::string combine_strings(const std::string &prefix, const std::string &value) {
394 std::string out = prefix;
395 out.push_back(0);
396 out.append(value);
397 return out;
398 }
399 static void combine_strings(const std::string &prefix,
400 const char *key, size_t keylen,
401 std::string *out) {
402 out->reserve(prefix.size() + 1 + keylen);
403 *out = prefix;
404 out->push_back(0);
405 out->append(key, keylen);
406 }
407
408 static int split_key(rocksdb::Slice in, std::string *prefix, std::string *key);
409
410 static std::string past_prefix(const std::string &prefix);
411
412 class MergeOperatorRouter;
413 class MergeOperatorLinker;
414 friend class MergeOperatorRouter;
415 int set_merge_operator(
416 const std::string& prefix,
417 std::shared_ptr<KeyValueDB::MergeOperator> mop) override;
418 std::string assoc_name; ///< Name of associative operator
419
420 uint64_t get_estimated_size(std::map<std::string,uint64_t> &extra) override {
421 DIR *store_dir = opendir(path.c_str());
422 if (!store_dir) {
423 lderr(cct) << __func__ << " something happened opening the store: "
424 << cpp_strerror(errno) << dendl;
425 return 0;
426 }
427
428 uint64_t total_size = 0;
429 uint64_t sst_size = 0;
430 uint64_t log_size = 0;
431 uint64_t misc_size = 0;
432
433 struct dirent *entry = NULL;
434 while ((entry = readdir(store_dir)) != NULL) {
435 std::string n(entry->d_name);
436
437 if (n == "." || n == "..")
438 continue;
439
440 std::string fpath = path + '/' + n;
441 struct stat s;
442 int err = stat(fpath.c_str(), &s);
443 if (err < 0)
444 err = -errno;
445 // we may race against rocksdb while reading files; this should only
446 // happen when those files are being updated, data is being shuffled
447 // and files get removed, in which case there's not much of a problem
448 // as we'll get to them next time around.
449 if (err == -ENOENT) {
450 continue;
451 }
452 if (err < 0) {
453 lderr(cct) << __func__ << " error obtaining stats for " << fpath
454 << ": " << cpp_strerror(err) << dendl;
455 goto err;
456 }
457
458 size_t pos = n.find_last_of('.');
459 if (pos == std::string::npos) {
460 misc_size += s.st_size;
461 continue;
462 }
463
464 std::string ext = n.substr(pos+1);
465 if (ext == "sst") {
466 sst_size += s.st_size;
467 } else if (ext == "log") {
468 log_size += s.st_size;
469 } else {
470 misc_size += s.st_size;
471 }
472 }
473
474 total_size = sst_size + log_size + misc_size;
475
476 extra["sst"] = sst_size;
477 extra["log"] = log_size;
478 extra["misc"] = misc_size;
479 extra["total"] = total_size;
480
481 err:
482 closedir(store_dir);
483 return total_size;
484 }
485
486 virtual int64_t get_cache_usage() const override {
487 return static_cast<int64_t>(bbt_opts.block_cache->GetUsage());
488 }
489
490 virtual int64_t get_cache_usage(std::string prefix) const override {
491 auto it = cf_bbt_opts.find(prefix);
492 if (it != cf_bbt_opts.end() && it->second.block_cache) {
493 return static_cast<int64_t>(it->second.block_cache->GetUsage());
494 }
495 return -EINVAL;
496 }
497
498 int set_cache_size(uint64_t s) override {
499 cache_size = s;
500 set_cache_flag = true;
501 return 0;
502 }
503
504 virtual std::shared_ptr<PriorityCache::PriCache>
505 get_priority_cache() const override {
506 return std::dynamic_pointer_cast<PriorityCache::PriCache>(
507 bbt_opts.block_cache);
508 }
509
510 virtual std::shared_ptr<PriorityCache::PriCache>
511 get_priority_cache(std::string prefix) const override {
512 auto it = cf_bbt_opts.find(prefix);
513 if (it != cf_bbt_opts.end()) {
514 return std::dynamic_pointer_cast<PriorityCache::PriCache>(
515 it->second.block_cache);
516 }
517 return nullptr;
518 }
519
520 WholeSpaceIterator get_wholespace_iterator(IteratorOpts opts = 0) override;
521 private:
522 WholeSpaceIterator get_default_cf_iterator();
523
524 using cf_deleter_t = std::function<void(rocksdb::ColumnFamilyHandle*)>;
525 using columns_t = std::map<std::string,
526 std::unique_ptr<rocksdb::ColumnFamilyHandle,
527 cf_deleter_t>>;
528 int prepare_for_reshard(const std::string& new_sharding,
529 columns_t& to_process_columns);
530 int reshard_cleanup(const columns_t& current_columns);
531 public:
532 struct resharding_ctrl {
533 size_t bytes_per_iterator = 10000000; /// amount of data to process before refreshing iterator
534 size_t keys_per_iterator = 10000;
535 size_t bytes_per_batch = 1000000; /// amount of data before submitting batch
536 size_t keys_per_batch = 1000;
537 bool unittest_fail_after_first_batch = false;
538 bool unittest_fail_after_processing_column = false;
539 bool unittest_fail_after_successful_processing = false;
540 };
541 int reshard(const std::string& new_sharding, const resharding_ctrl* ctrl = nullptr);
542 bool get_sharding(std::string& sharding);
543
544 };
545
546 #endif