]> git.proxmox.com Git - ceph.git/blob - ceph/src/kv/RocksDBStore.h
update sources to 12.2.10
[ceph.git] / ceph / src / kv / RocksDBStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #ifndef ROCKS_DB_STORE_H
4 #define ROCKS_DB_STORE_H
5
6 #include "include/types.h"
7 #include "include/buffer_fwd.h"
8 #include "KeyValueDB.h"
9 #include <set>
10 #include <map>
11 #include <string>
12 #include <memory>
13 #include <boost/scoped_ptr.hpp>
14 #include "rocksdb/write_batch.h"
15 #include "rocksdb/perf_context.h"
16 #include "rocksdb/iostats_context.h"
17 #include "rocksdb/statistics.h"
18 #include "rocksdb/table.h"
19 #include <errno.h>
20 #include "common/errno.h"
21 #include "common/dout.h"
22 #include "include/assert.h"
23 #include "common/Formatter.h"
24 #include "common/Cond.h"
25 #include "common/ceph_context.h"
26 #include "common/PriorityCache.h"
27
28 class PerfCounters;
29
30 enum {
31 l_rocksdb_first = 34300,
32 l_rocksdb_gets,
33 l_rocksdb_txns,
34 l_rocksdb_txns_sync,
35 l_rocksdb_get_latency,
36 l_rocksdb_submit_latency,
37 l_rocksdb_submit_sync_latency,
38 l_rocksdb_compact,
39 l_rocksdb_compact_range,
40 l_rocksdb_compact_queue_merge,
41 l_rocksdb_compact_queue_len,
42 l_rocksdb_write_wal_time,
43 l_rocksdb_write_memtable_time,
44 l_rocksdb_write_delay_time,
45 l_rocksdb_write_pre_and_post_process_time,
46 l_rocksdb_last,
47 };
48
49 namespace rocksdb{
50 class DB;
51 class Env;
52 class Cache;
53 class FilterPolicy;
54 class Snapshot;
55 class Slice;
56 class WriteBatch;
57 class Iterator;
58 class Logger;
59 struct Options;
60 struct BlockBasedTableOptions;
61 }
62
63 extern rocksdb::Logger *create_rocksdb_ceph_logger();
64
65 /**
66 * Uses RocksDB to implement the KeyValueDB interface
67 */
68 class RocksDBStore : public KeyValueDB {
69 CephContext *cct;
70 PerfCounters *logger;
71 string path;
72 void *priv;
73 rocksdb::DB *db;
74 rocksdb::Env *env;
75 std::shared_ptr<rocksdb::Statistics> dbstats;
76 rocksdb::BlockBasedTableOptions bbt_opts;
77 string options_str;
78
79 uint64_t cache_size = 0;
80 bool set_cache_flag = false;
81
82 int do_open(ostream &out, bool create_if_missing);
83
84 // manage async compactions
85 Mutex compact_queue_lock;
86 Cond compact_queue_cond;
87 list< pair<string,string> > compact_queue;
88 bool compact_queue_stop;
89 class CompactThread : public Thread {
90 RocksDBStore *db;
91 public:
92 explicit CompactThread(RocksDBStore *d) : db(d) {}
93 void *entry() override {
94 db->compact_thread_entry();
95 return NULL;
96 }
97 friend class RocksDBStore;
98 } compact_thread;
99
100 void compact_thread_entry();
101
102 void compact_range(const string& start, const string& end);
103 void compact_range_async(const string& start, const string& end);
104
105 public:
106 /// compact the underlying rocksdb store
107 bool compact_on_mount;
108 bool disableWAL;
109 bool enable_rmrange;
110 void compact() override;
111 int64_t high_pri_watermark;
112
113 int tryInterpret(const string& key, const string& val, rocksdb::Options &opt);
114 int ParseOptionsFromString(const string& opt_str, rocksdb::Options &opt);
115 static int _test_init(const string& dir);
116 int init(string options_str) override;
117 /// compact rocksdb for all keys with a given prefix
118 void compact_prefix(const string& prefix) override {
119 compact_range(prefix, past_prefix(prefix));
120 }
121 void compact_prefix_async(const string& prefix) override {
122 compact_range_async(prefix, past_prefix(prefix));
123 }
124
125 void compact_range(const string& prefix, const string& start, const string& end) override {
126 compact_range(combine_strings(prefix, start), combine_strings(prefix, end));
127 }
128 void compact_range_async(const string& prefix, const string& start, const string& end) override {
129 compact_range_async(combine_strings(prefix, start), combine_strings(prefix, end));
130 }
131
132 RocksDBStore(CephContext *c, const string &path, void *p) :
133 cct(c),
134 logger(NULL),
135 path(path),
136 priv(p),
137 db(NULL),
138 env(static_cast<rocksdb::Env*>(p)),
139 dbstats(NULL),
140 compact_queue_lock("RocksDBStore::compact_thread_lock"),
141 compact_queue_stop(false),
142 compact_thread(this),
143 compact_on_mount(false),
144 disableWAL(false),
145 enable_rmrange(cct->_conf->rocksdb_enable_rmrange),
146 high_pri_watermark(0)
147 {}
148
149 ~RocksDBStore() override;
150
151 static bool check_omap_dir(string &omap_dir);
152 /// Opens underlying db
153 int open(ostream &out) override {
154 return do_open(out, false);
155 }
156 /// Creates underlying db if missing and opens it
157 int create_and_open(ostream &out) override;
158
159 void close() override;
160
161 void split_stats(const std::string &s, char delim, std::vector<std::string> &elems);
162 void get_statistics(Formatter *f) override;
163
164 PerfCounters *get_perf_counters() override
165 {
166 return logger;
167 }
168
169 struct RocksWBHandler: public rocksdb::WriteBatch::Handler {
170 std::string seen ;
171 int num_seen = 0;
172 static string pretty_binary_string(const string& in) {
173 char buf[10];
174 string out;
175 out.reserve(in.length() * 3);
176 enum { NONE, HEX, STRING } mode = NONE;
177 unsigned from = 0, i;
178 for (i=0; i < in.length(); ++i) {
179 if ((in[i] < 32 || (unsigned char)in[i] > 126) ||
180 (mode == HEX && in.length() - i >= 4 &&
181 ((in[i] < 32 || (unsigned char)in[i] > 126) ||
182 (in[i+1] < 32 || (unsigned char)in[i+1] > 126) ||
183 (in[i+2] < 32 || (unsigned char)in[i+2] > 126) ||
184 (in[i+3] < 32 || (unsigned char)in[i+3] > 126)))) {
185
186 if (mode == STRING) {
187 out.append(in.substr(from, i - from));
188 out.push_back('\'');
189 }
190 if (mode != HEX) {
191 out.append("0x");
192 mode = HEX;
193 }
194 if (in.length() - i >= 4) {
195 // print a whole u32 at once
196 snprintf(buf, sizeof(buf), "%08x",
197 (uint32_t)(((unsigned char)in[i] << 24) |
198 ((unsigned char)in[i+1] << 16) |
199 ((unsigned char)in[i+2] << 8) |
200 ((unsigned char)in[i+3] << 0)));
201 i += 3;
202 } else {
203 snprintf(buf, sizeof(buf), "%02x", (int)(unsigned char)in[i]);
204 }
205 out.append(buf);
206 } else {
207 if (mode != STRING) {
208 out.push_back('\'');
209 mode = STRING;
210 from = i;
211 }
212 }
213 }
214 if (mode == STRING) {
215 out.append(in.substr(from, i - from));
216 out.push_back('\'');
217 }
218 return out;
219 }
220 void Put(const rocksdb::Slice& key,
221 const rocksdb::Slice& value) override {
222 string prefix ((key.ToString()).substr(0,1));
223 string key_to_decode ((key.ToString()).substr(2,string::npos));
224 uint64_t size = (value.ToString()).size();
225 seen += "\nPut( Prefix = " + prefix + " key = "
226 + pretty_binary_string(key_to_decode)
227 + " Value size = " + std::to_string(size) + ")";
228 num_seen++;
229 }
230 void SingleDelete(const rocksdb::Slice& key) override {
231 string prefix ((key.ToString()).substr(0,1));
232 string key_to_decode ((key.ToString()).substr(2,string::npos));
233 seen += "\nSingleDelete(Prefix = "+ prefix + " Key = "
234 + pretty_binary_string(key_to_decode) + ")";
235 num_seen++;
236 }
237 void Delete(const rocksdb::Slice& key) override {
238 string prefix ((key.ToString()).substr(0,1));
239 string key_to_decode ((key.ToString()).substr(2,string::npos));
240 seen += "\nDelete( Prefix = " + prefix + " key = "
241 + pretty_binary_string(key_to_decode) + ")";
242
243 num_seen++;
244 }
245 void Merge(const rocksdb::Slice& key,
246 const rocksdb::Slice& value) override {
247 string prefix ((key.ToString()).substr(0,1));
248 string key_to_decode ((key.ToString()).substr(2,string::npos));
249 uint64_t size = (value.ToString()).size();
250 seen += "\nMerge( Prefix = " + prefix + " key = "
251 + pretty_binary_string(key_to_decode) + " Value size = "
252 + std::to_string(size) + ")";
253
254 num_seen++;
255 }
256 bool Continue() override { return num_seen < 50; }
257
258 };
259
260
261 class RocksDBTransactionImpl : public KeyValueDB::TransactionImpl {
262 public:
263 rocksdb::WriteBatch bat;
264 RocksDBStore *db;
265
266 explicit RocksDBTransactionImpl(RocksDBStore *_db);
267 void set(
268 const string &prefix,
269 const string &k,
270 const bufferlist &bl) override;
271 void set(
272 const string &prefix,
273 const char *k,
274 size_t keylen,
275 const bufferlist &bl) override;
276 void rmkey(
277 const string &prefix,
278 const string &k) override;
279 void rmkey(
280 const string &prefix,
281 const char *k,
282 size_t keylen) override;
283 void rm_single_key(
284 const string &prefix,
285 const string &k) override;
286 void rmkeys_by_prefix(
287 const string &prefix
288 ) override;
289 void rm_range_keys(
290 const string &prefix,
291 const string &start,
292 const string &end) override;
293 void merge(
294 const string& prefix,
295 const string& k,
296 const bufferlist &bl) override;
297 };
298
299 KeyValueDB::Transaction get_transaction() override {
300 return std::make_shared<RocksDBTransactionImpl>(this);
301 }
302
303 int submit_transaction(KeyValueDB::Transaction t) override;
304 int submit_transaction_sync(KeyValueDB::Transaction t) override;
305 int get(
306 const string &prefix,
307 const std::set<string> &key,
308 std::map<string, bufferlist> *out
309 ) override;
310 int get(
311 const string &prefix,
312 const string &key,
313 bufferlist *out
314 ) override;
315 int get(
316 const string &prefix,
317 const char *key,
318 size_t keylen,
319 bufferlist *out) override;
320
321
322 class RocksDBWholeSpaceIteratorImpl :
323 public KeyValueDB::WholeSpaceIteratorImpl {
324 protected:
325 rocksdb::Iterator *dbiter;
326 public:
327 explicit RocksDBWholeSpaceIteratorImpl(rocksdb::Iterator *iter) :
328 dbiter(iter) { }
329 //virtual ~RocksDBWholeSpaceIteratorImpl() { }
330 ~RocksDBWholeSpaceIteratorImpl() override;
331
332 int seek_to_first() override;
333 int seek_to_first(const string &prefix) override;
334 int seek_to_last() override;
335 int seek_to_last(const string &prefix) override;
336 int upper_bound(const string &prefix, const string &after) override;
337 int lower_bound(const string &prefix, const string &to) override;
338 bool valid() override;
339 int next() override;
340 int prev() override;
341 string key() override;
342 pair<string,string> raw_key() override;
343 bool raw_key_is_prefixed(const string &prefix) override;
344 bufferlist value() override;
345 bufferptr value_as_ptr() override;
346 int status() override;
347 size_t key_size() override;
348 size_t value_size() override;
349 };
350
351 /// Utility
352 static string combine_strings(const string &prefix, const string &value) {
353 string out = prefix;
354 out.push_back(0);
355 out.append(value);
356 return out;
357 }
358 static void combine_strings(const string &prefix,
359 const char *key, size_t keylen,
360 string *out) {
361 out->reserve(prefix.size() + 1 + keylen);
362 *out = prefix;
363 out->push_back(0);
364 out->append(key, keylen);
365 }
366
367 static int split_key(rocksdb::Slice in, string *prefix, string *key);
368
369 static bufferlist to_bufferlist(rocksdb::Slice in) {
370 bufferlist bl;
371 bl.append(bufferptr(in.data(), in.size()));
372 return bl;
373 }
374
375 static string past_prefix(const string &prefix);
376
377 class MergeOperatorRouter;
378 friend class MergeOperatorRouter;
379 int set_merge_operator(const std::string& prefix,
380 std::shared_ptr<KeyValueDB::MergeOperator> mop) override;
381 string assoc_name; ///< Name of associative operator
382
383 uint64_t get_estimated_size(map<string,uint64_t> &extra) override {
384 DIR *store_dir = opendir(path.c_str());
385 if (!store_dir) {
386 lderr(cct) << __func__ << " something happened opening the store: "
387 << cpp_strerror(errno) << dendl;
388 return 0;
389 }
390
391 uint64_t total_size = 0;
392 uint64_t sst_size = 0;
393 uint64_t log_size = 0;
394 uint64_t misc_size = 0;
395
396 struct dirent *entry = NULL;
397 while ((entry = readdir(store_dir)) != NULL) {
398 string n(entry->d_name);
399
400 if (n == "." || n == "..")
401 continue;
402
403 string fpath = path + '/' + n;
404 struct stat s;
405 int err = stat(fpath.c_str(), &s);
406 if (err < 0)
407 err = -errno;
408 // we may race against rocksdb while reading files; this should only
409 // happen when those files are being updated, data is being shuffled
410 // and files get removed, in which case there's not much of a problem
411 // as we'll get to them next time around.
412 if (err == -ENOENT) {
413 continue;
414 }
415 if (err < 0) {
416 lderr(cct) << __func__ << " error obtaining stats for " << fpath
417 << ": " << cpp_strerror(err) << dendl;
418 goto err;
419 }
420
421 size_t pos = n.find_last_of('.');
422 if (pos == string::npos) {
423 misc_size += s.st_size;
424 continue;
425 }
426
427 string ext = n.substr(pos+1);
428 if (ext == "sst") {
429 sst_size += s.st_size;
430 } else if (ext == "log") {
431 log_size += s.st_size;
432 } else {
433 misc_size += s.st_size;
434 }
435 }
436
437 total_size = sst_size + log_size + misc_size;
438
439 extra["sst"] = sst_size;
440 extra["log"] = log_size;
441 extra["misc"] = misc_size;
442 extra["total"] = total_size;
443
444 err:
445 closedir(store_dir);
446 return total_size;
447 }
448
449 virtual int64_t request_cache_bytes(
450 PriorityCache::Priority pri, uint64_t cache_bytes) const override;
451 virtual int64_t commit_cache_size() override;
452 virtual std::string get_cache_name() const override {
453 return "RocksDB Block Cache";
454 }
455 virtual int64_t get_cache_usage() const override;
456
457
458 int set_cache_size(uint64_t s) override {
459 cache_size = s;
460 set_cache_flag = true;
461 return 0;
462 }
463
464 protected:
465 WholeSpaceIterator _get_iterator() override;
466
467 int set_cache_capacity(int64_t capacity);
468 int64_t get_cache_capacity();
469 };
470
471
472
473 #endif