]> git.proxmox.com Git - ceph.git/blob - ceph/src/kv/RocksDBStore.h
import 15.2.2 octopus source
[ceph.git] / ceph / src / kv / RocksDBStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #ifndef ROCKS_DB_STORE_H
4 #define ROCKS_DB_STORE_H
5
6 #include "include/types.h"
7 #include "include/buffer_fwd.h"
8 #include "KeyValueDB.h"
9 #include <set>
10 #include <map>
11 #include <string>
12 #include <memory>
13 #include <boost/scoped_ptr.hpp>
14 #include "rocksdb/write_batch.h"
15 #include "rocksdb/perf_context.h"
16 #include "rocksdb/iostats_context.h"
17 #include "rocksdb/statistics.h"
18 #include "rocksdb/table.h"
19 #include "kv/rocksdb_cache/BinnedLRUCache.h"
20 #include <errno.h>
21 #include "common/errno.h"
22 #include "common/dout.h"
23 #include "include/ceph_assert.h"
24 #include "include/common_fwd.h"
25 #include "common/Formatter.h"
26 #include "common/Cond.h"
27 #include "common/ceph_context.h"
28 #include "common/PriorityCache.h"
29
30
31 enum {
32 l_rocksdb_first = 34300,
33 l_rocksdb_gets,
34 l_rocksdb_txns,
35 l_rocksdb_txns_sync,
36 l_rocksdb_get_latency,
37 l_rocksdb_submit_latency,
38 l_rocksdb_submit_sync_latency,
39 l_rocksdb_compact,
40 l_rocksdb_compact_range,
41 l_rocksdb_compact_queue_merge,
42 l_rocksdb_compact_queue_len,
43 l_rocksdb_write_wal_time,
44 l_rocksdb_write_memtable_time,
45 l_rocksdb_write_delay_time,
46 l_rocksdb_write_pre_and_post_process_time,
47 l_rocksdb_last,
48 };
49
50 namespace rocksdb{
51 class DB;
52 class Env;
53 class Cache;
54 class FilterPolicy;
55 class Snapshot;
56 class Slice;
57 class WriteBatch;
58 class Iterator;
59 class Logger;
60 class ColumnFamilyHandle;
61 struct Options;
62 struct BlockBasedTableOptions;
63 struct DBOptions;
64 struct ColumnFamilyOptions;
65 }
66
67 extern rocksdb::Logger *create_rocksdb_ceph_logger();
68
69 /**
70 * Uses RocksDB to implement the KeyValueDB interface
71 */
72 class RocksDBStore : public KeyValueDB {
73 CephContext *cct;
74 PerfCounters *logger;
75 string path;
76 map<string,string> kv_options;
77 void *priv;
78 rocksdb::DB *db;
79 rocksdb::Env *env;
80 std::shared_ptr<rocksdb::Statistics> dbstats;
81 rocksdb::BlockBasedTableOptions bbt_opts;
82 string options_str;
83
84 uint64_t cache_size = 0;
85 bool set_cache_flag = false;
86
87 bool must_close_default_cf = false;
88 rocksdb::ColumnFamilyHandle *default_cf = nullptr;
89
90 int submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t);
91 int install_cf_mergeop(const string &cf_name, rocksdb::ColumnFamilyOptions *cf_opt);
92 int create_db_dir();
93 int do_open(ostream &out, bool create_if_missing, bool open_readonly,
94 const vector<ColumnFamily>* cfs = nullptr);
95 int load_rocksdb_options(bool create_if_missing, rocksdb::Options& opt);
96
97 // manage async compactions
98 ceph::mutex compact_queue_lock =
99 ceph::make_mutex("RocksDBStore::compact_thread_lock");
100 ceph::condition_variable compact_queue_cond;
101 list< pair<string,string> > compact_queue;
102 bool compact_queue_stop;
103 class CompactThread : public Thread {
104 RocksDBStore *db;
105 public:
106 explicit CompactThread(RocksDBStore *d) : db(d) {}
107 void *entry() override {
108 db->compact_thread_entry();
109 return NULL;
110 }
111 friend class RocksDBStore;
112 } compact_thread;
113
114 void compact_thread_entry();
115
116 void compact_range(const string& start, const string& end);
117 void compact_range_async(const string& start, const string& end);
118 int tryInterpret(const string& key, const string& val, rocksdb::Options& opt);
119
120 public:
121 /// compact the underlying rocksdb store
122 bool compact_on_mount;
123 bool disableWAL;
124 const uint64_t delete_range_threshold;
125 void compact() override;
126
127 void compact_async() override {
128 compact_range_async(string(), string());
129 }
130
131 int ParseOptionsFromString(const string& opt_str, rocksdb::Options& opt);
132 static int ParseOptionsFromStringStatic(
133 CephContext* cct,
134 const string& opt_str,
135 rocksdb::Options &opt,
136 function<int(const string&, const string&, rocksdb::Options&)> interp);
137 static int _test_init(const string& dir);
138 int init(string options_str) override;
139 /// compact rocksdb for all keys with a given prefix
140 void compact_prefix(const string& prefix) override {
141 compact_range(prefix, past_prefix(prefix));
142 }
143 void compact_prefix_async(const string& prefix) override {
144 compact_range_async(prefix, past_prefix(prefix));
145 }
146
147 void compact_range(const string& prefix, const string& start, const string& end) override {
148 compact_range(combine_strings(prefix, start), combine_strings(prefix, end));
149 }
150 void compact_range_async(const string& prefix, const string& start, const string& end) override {
151 compact_range_async(combine_strings(prefix, start), combine_strings(prefix, end));
152 }
153
154 RocksDBStore(CephContext *c, const string &path, map<string,string> opt, void *p) :
155 cct(c),
156 logger(NULL),
157 path(path),
158 kv_options(opt),
159 priv(p),
160 db(NULL),
161 env(static_cast<rocksdb::Env*>(p)),
162 dbstats(NULL),
163 compact_queue_stop(false),
164 compact_thread(this),
165 compact_on_mount(false),
166 disableWAL(false),
167 delete_range_threshold(cct->_conf.get_val<uint64_t>("rocksdb_delete_range_threshold"))
168 {}
169
170 ~RocksDBStore() override;
171
172 static bool check_omap_dir(string &omap_dir);
173 /// Opens underlying db
174 int open(ostream &out, const vector<ColumnFamily>& cfs = {}) override {
175 return do_open(out, false, false, &cfs);
176 }
177 /// Creates underlying db if missing and opens it
178 int create_and_open(ostream &out,
179 const vector<ColumnFamily>& cfs = {}) override;
180
181 int open_read_only(ostream &out, const vector<ColumnFamily>& cfs = {}) override {
182 return do_open(out, false, true, &cfs);
183 }
184
185 void close() override;
186
187 rocksdb::ColumnFamilyHandle *get_cf_handle(const std::string& cf_name) {
188 auto iter = cf_handles.find(cf_name);
189 if (iter == cf_handles.end())
190 return nullptr;
191 else
192 return static_cast<rocksdb::ColumnFamilyHandle*>(iter->second);
193 }
194 int repair(std::ostream &out) override;
195 void split_stats(const std::string &s, char delim, std::vector<std::string> &elems);
196 void get_statistics(Formatter *f) override;
197
198 PerfCounters *get_perf_counters() override
199 {
200 return logger;
201 }
202
203 bool get_property(
204 const std::string &property,
205 uint64_t *out) final;
206
207 int64_t estimate_prefix_size(const string& prefix,
208 const string& key_prefix) override;
209
210 struct RocksWBHandler: public rocksdb::WriteBatch::Handler {
211 std::string seen ;
212 int num_seen = 0;
213 static string pretty_binary_string(const string& in) {
214 char buf[10];
215 string out;
216 out.reserve(in.length() * 3);
217 enum { NONE, HEX, STRING } mode = NONE;
218 unsigned from = 0, i;
219 for (i=0; i < in.length(); ++i) {
220 if ((in[i] < 32 || (unsigned char)in[i] > 126) ||
221 (mode == HEX && in.length() - i >= 4 &&
222 ((in[i] < 32 || (unsigned char)in[i] > 126) ||
223 (in[i+1] < 32 || (unsigned char)in[i+1] > 126) ||
224 (in[i+2] < 32 || (unsigned char)in[i+2] > 126) ||
225 (in[i+3] < 32 || (unsigned char)in[i+3] > 126)))) {
226
227 if (mode == STRING) {
228 out.append(in.substr(from, i - from));
229 out.push_back('\'');
230 }
231 if (mode != HEX) {
232 out.append("0x");
233 mode = HEX;
234 }
235 if (in.length() - i >= 4) {
236 // print a whole u32 at once
237 snprintf(buf, sizeof(buf), "%08x",
238 (uint32_t)(((unsigned char)in[i] << 24) |
239 ((unsigned char)in[i+1] << 16) |
240 ((unsigned char)in[i+2] << 8) |
241 ((unsigned char)in[i+3] << 0)));
242 i += 3;
243 } else {
244 snprintf(buf, sizeof(buf), "%02x", (int)(unsigned char)in[i]);
245 }
246 out.append(buf);
247 } else {
248 if (mode != STRING) {
249 out.push_back('\'');
250 mode = STRING;
251 from = i;
252 }
253 }
254 }
255 if (mode == STRING) {
256 out.append(in.substr(from, i - from));
257 out.push_back('\'');
258 }
259 return out;
260 }
261 void Put(const rocksdb::Slice& key,
262 const rocksdb::Slice& value) override {
263 string prefix ((key.ToString()).substr(0,1));
264 string key_to_decode ((key.ToString()).substr(2,string::npos));
265 uint64_t size = (value.ToString()).size();
266 seen += "\nPut( Prefix = " + prefix + " key = "
267 + pretty_binary_string(key_to_decode)
268 + " Value size = " + std::to_string(size) + ")";
269 num_seen++;
270 }
271 void SingleDelete(const rocksdb::Slice& key) override {
272 string prefix ((key.ToString()).substr(0,1));
273 string key_to_decode ((key.ToString()).substr(2,string::npos));
274 seen += "\nSingleDelete(Prefix = "+ prefix + " Key = "
275 + pretty_binary_string(key_to_decode) + ")";
276 num_seen++;
277 }
278 void Delete(const rocksdb::Slice& key) override {
279 string prefix ((key.ToString()).substr(0,1));
280 string key_to_decode ((key.ToString()).substr(2,string::npos));
281 seen += "\nDelete( Prefix = " + prefix + " key = "
282 + pretty_binary_string(key_to_decode) + ")";
283
284 num_seen++;
285 }
286 void Merge(const rocksdb::Slice& key,
287 const rocksdb::Slice& value) override {
288 string prefix ((key.ToString()).substr(0,1));
289 string key_to_decode ((key.ToString()).substr(2,string::npos));
290 uint64_t size = (value.ToString()).size();
291 seen += "\nMerge( Prefix = " + prefix + " key = "
292 + pretty_binary_string(key_to_decode) + " Value size = "
293 + std::to_string(size) + ")";
294
295 num_seen++;
296 }
297 bool Continue() override { return num_seen < 50; }
298
299 };
300
301 class RocksDBTransactionImpl : public KeyValueDB::TransactionImpl {
302 public:
303 rocksdb::WriteBatch bat;
304 RocksDBStore *db;
305
306 explicit RocksDBTransactionImpl(RocksDBStore *_db);
307 private:
308 void put_bat(
309 rocksdb::WriteBatch& bat,
310 rocksdb::ColumnFamilyHandle *cf,
311 const string &k,
312 const bufferlist &to_set_bl);
313 public:
314 void set(
315 const string &prefix,
316 const string &k,
317 const bufferlist &bl) override;
318 void set(
319 const string &prefix,
320 const char *k,
321 size_t keylen,
322 const bufferlist &bl) override;
323 void rmkey(
324 const string &prefix,
325 const string &k) override;
326 void rmkey(
327 const string &prefix,
328 const char *k,
329 size_t keylen) override;
330 void rm_single_key(
331 const string &prefix,
332 const string &k) override;
333 void rmkeys_by_prefix(
334 const string &prefix
335 ) override;
336 void rm_range_keys(
337 const string &prefix,
338 const string &start,
339 const string &end) override;
340 void merge(
341 const string& prefix,
342 const string& k,
343 const bufferlist &bl) override;
344 };
345
346 KeyValueDB::Transaction get_transaction() override {
347 return std::make_shared<RocksDBTransactionImpl>(this);
348 }
349
350 int submit_transaction(KeyValueDB::Transaction t) override;
351 int submit_transaction_sync(KeyValueDB::Transaction t) override;
352 int get(
353 const string &prefix,
354 const std::set<string> &key,
355 std::map<string, bufferlist> *out
356 ) override;
357 int get(
358 const string &prefix,
359 const string &key,
360 bufferlist *out
361 ) override;
362 int get(
363 const string &prefix,
364 const char *key,
365 size_t keylen,
366 bufferlist *out) override;
367
368
369 class RocksDBWholeSpaceIteratorImpl :
370 public KeyValueDB::WholeSpaceIteratorImpl {
371 protected:
372 rocksdb::Iterator *dbiter;
373 public:
374 explicit RocksDBWholeSpaceIteratorImpl(rocksdb::Iterator *iter) :
375 dbiter(iter) { }
376 //virtual ~RocksDBWholeSpaceIteratorImpl() { }
377 ~RocksDBWholeSpaceIteratorImpl() override;
378
379 int seek_to_first() override;
380 int seek_to_first(const string &prefix) override;
381 int seek_to_last() override;
382 int seek_to_last(const string &prefix) override;
383 int upper_bound(const string &prefix, const string &after) override;
384 int lower_bound(const string &prefix, const string &to) override;
385 bool valid() override;
386 int next() override;
387 int prev() override;
388 string key() override;
389 pair<string,string> raw_key() override;
390 bool raw_key_is_prefixed(const string &prefix) override;
391 bufferlist value() override;
392 bufferptr value_as_ptr() override;
393 int status() override;
394 size_t key_size() override;
395 size_t value_size() override;
396 };
397
398 Iterator get_iterator(const std::string& prefix) override;
399
400 /// Utility
401 static string combine_strings(const string &prefix, const string &value) {
402 string out = prefix;
403 out.push_back(0);
404 out.append(value);
405 return out;
406 }
407 static void combine_strings(const string &prefix,
408 const char *key, size_t keylen,
409 string *out) {
410 out->reserve(prefix.size() + 1 + keylen);
411 *out = prefix;
412 out->push_back(0);
413 out->append(key, keylen);
414 }
415
416 static int split_key(rocksdb::Slice in, string *prefix, string *key);
417
418 static string past_prefix(const string &prefix);
419
420 class MergeOperatorRouter;
421 class MergeOperatorLinker;
422 friend class MergeOperatorRouter;
423 int set_merge_operator(
424 const std::string& prefix,
425 std::shared_ptr<KeyValueDB::MergeOperator> mop) override;
426 string assoc_name; ///< Name of associative operator
427
428 uint64_t get_estimated_size(map<string,uint64_t> &extra) override {
429 DIR *store_dir = opendir(path.c_str());
430 if (!store_dir) {
431 lderr(cct) << __func__ << " something happened opening the store: "
432 << cpp_strerror(errno) << dendl;
433 return 0;
434 }
435
436 uint64_t total_size = 0;
437 uint64_t sst_size = 0;
438 uint64_t log_size = 0;
439 uint64_t misc_size = 0;
440
441 struct dirent *entry = NULL;
442 while ((entry = readdir(store_dir)) != NULL) {
443 string n(entry->d_name);
444
445 if (n == "." || n == "..")
446 continue;
447
448 string fpath = path + '/' + n;
449 struct stat s;
450 int err = stat(fpath.c_str(), &s);
451 if (err < 0)
452 err = -errno;
453 // we may race against rocksdb while reading files; this should only
454 // happen when those files are being updated, data is being shuffled
455 // and files get removed, in which case there's not much of a problem
456 // as we'll get to them next time around.
457 if (err == -ENOENT) {
458 continue;
459 }
460 if (err < 0) {
461 lderr(cct) << __func__ << " error obtaining stats for " << fpath
462 << ": " << cpp_strerror(err) << dendl;
463 goto err;
464 }
465
466 size_t pos = n.find_last_of('.');
467 if (pos == string::npos) {
468 misc_size += s.st_size;
469 continue;
470 }
471
472 string ext = n.substr(pos+1);
473 if (ext == "sst") {
474 sst_size += s.st_size;
475 } else if (ext == "log") {
476 log_size += s.st_size;
477 } else {
478 misc_size += s.st_size;
479 }
480 }
481
482 total_size = sst_size + log_size + misc_size;
483
484 extra["sst"] = sst_size;
485 extra["log"] = log_size;
486 extra["misc"] = misc_size;
487 extra["total"] = total_size;
488
489 err:
490 closedir(store_dir);
491 return total_size;
492 }
493
494 virtual int64_t get_cache_usage() const override {
495 return static_cast<int64_t>(bbt_opts.block_cache->GetUsage());
496 }
497
498 int set_cache_size(uint64_t s) override {
499 cache_size = s;
500 set_cache_flag = true;
501 return 0;
502 }
503
504 virtual std::shared_ptr<PriorityCache::PriCache> get_priority_cache()
505 const override {
506 return dynamic_pointer_cast<PriorityCache::PriCache>(
507 bbt_opts.block_cache);
508 }
509
510 WholeSpaceIterator get_wholespace_iterator() override;
511 };
512
513
514
515 #endif