]> git.proxmox.com Git - ceph.git/blob - ceph/src/kv/RocksDBStore.h
c437badebabbbf76887094ec2c53fe0aae00a3ef
[ceph.git] / ceph / src / kv / RocksDBStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #ifndef ROCKS_DB_STORE_H
4 #define ROCKS_DB_STORE_H
5
6 #include "include/types.h"
7 #include "include/buffer_fwd.h"
8 #include "KeyValueDB.h"
9 #include <set>
10 #include <map>
11 #include <string>
12 #include <memory>
13 #include <boost/scoped_ptr.hpp>
14 #include "rocksdb/write_batch.h"
15 #include "rocksdb/perf_context.h"
16 #include "rocksdb/iostats_context.h"
17 #include "rocksdb/statistics.h"
18 #include "rocksdb/table.h"
19 #include <errno.h>
20 #include "common/errno.h"
21 #include "common/dout.h"
22 #include "include/assert.h"
23 #include "common/Formatter.h"
24 #include "common/Cond.h"
25
26 #include "common/ceph_context.h"
27 class PerfCounters;
28
29 enum {
30 l_rocksdb_first = 34300,
31 l_rocksdb_gets,
32 l_rocksdb_txns,
33 l_rocksdb_txns_sync,
34 l_rocksdb_get_latency,
35 l_rocksdb_submit_latency,
36 l_rocksdb_submit_sync_latency,
37 l_rocksdb_compact,
38 l_rocksdb_compact_range,
39 l_rocksdb_compact_queue_merge,
40 l_rocksdb_compact_queue_len,
41 l_rocksdb_write_wal_time,
42 l_rocksdb_write_memtable_time,
43 l_rocksdb_write_delay_time,
44 l_rocksdb_write_pre_and_post_process_time,
45 l_rocksdb_last,
46 };
47
48 namespace rocksdb{
49 class DB;
50 class Env;
51 class Cache;
52 class FilterPolicy;
53 class Snapshot;
54 class Slice;
55 class WriteBatch;
56 class Iterator;
57 class Logger;
58 struct Options;
59 struct BlockBasedTableOptions;
60 }
61
62 extern rocksdb::Logger *create_rocksdb_ceph_logger();
63
64 /**
65 * Uses RocksDB to implement the KeyValueDB interface
66 */
67 class RocksDBStore : public KeyValueDB {
68 CephContext *cct;
69 PerfCounters *logger;
70 string path;
71 void *priv;
72 rocksdb::DB *db;
73 rocksdb::Env *env;
74 std::shared_ptr<rocksdb::Statistics> dbstats;
75 rocksdb::BlockBasedTableOptions bbt_opts;
76 string options_str;
77
78 uint64_t cache_size = 0;
79
80 int do_open(ostream &out, bool create_if_missing);
81
82 // manage async compactions
83 Mutex compact_queue_lock;
84 Cond compact_queue_cond;
85 list< pair<string,string> > compact_queue;
86 bool compact_queue_stop;
87 class CompactThread : public Thread {
88 RocksDBStore *db;
89 public:
90 explicit CompactThread(RocksDBStore *d) : db(d) {}
91 void *entry() override {
92 db->compact_thread_entry();
93 return NULL;
94 }
95 friend class RocksDBStore;
96 } compact_thread;
97
98 void compact_thread_entry();
99
100 void compact_range(const string& start, const string& end);
101 void compact_range_async(const string& start, const string& end);
102
103 public:
104 /// compact the underlying rocksdb store
105 bool compact_on_mount;
106 bool disableWAL;
107 bool enable_rmrange;
108 void compact() override;
109
110 int tryInterpret(const string key, const string val, rocksdb::Options &opt);
111 int ParseOptionsFromString(const string opt_str, rocksdb::Options &opt);
112 static int _test_init(const string& dir);
113 int init(string options_str) override;
114 /// compact rocksdb for all keys with a given prefix
115 void compact_prefix(const string& prefix) override {
116 compact_range(prefix, past_prefix(prefix));
117 }
118 void compact_prefix_async(const string& prefix) override {
119 compact_range_async(prefix, past_prefix(prefix));
120 }
121
122 void compact_range(const string& prefix, const string& start, const string& end) override {
123 compact_range(combine_strings(prefix, start), combine_strings(prefix, end));
124 }
125 void compact_range_async(const string& prefix, const string& start, const string& end) override {
126 compact_range_async(combine_strings(prefix, start), combine_strings(prefix, end));
127 }
128
129 RocksDBStore(CephContext *c, const string &path, void *p) :
130 cct(c),
131 logger(NULL),
132 path(path),
133 priv(p),
134 db(NULL),
135 env(static_cast<rocksdb::Env*>(p)),
136 dbstats(NULL),
137 compact_queue_lock("RocksDBStore::compact_thread_lock"),
138 compact_queue_stop(false),
139 compact_thread(this),
140 compact_on_mount(false),
141 disableWAL(false),
142 enable_rmrange(cct->_conf->rocksdb_enable_rmrange)
143 {}
144
145 ~RocksDBStore() override;
146
147 static bool check_omap_dir(string &omap_dir);
148 /// Opens underlying db
149 int open(ostream &out) override {
150 return do_open(out, false);
151 }
152 /// Creates underlying db if missing and opens it
153 int create_and_open(ostream &out) override;
154
155 void close() override;
156
157 void split_stats(const std::string &s, char delim, std::vector<std::string> &elems);
158 void get_statistics(Formatter *f) override;
159
160 struct RocksWBHandler: public rocksdb::WriteBatch::Handler {
161 std::string seen ;
162 int num_seen = 0;
163 static string pretty_binary_string(const string& in) {
164 char buf[10];
165 string out;
166 out.reserve(in.length() * 3);
167 enum { NONE, HEX, STRING } mode = NONE;
168 unsigned from = 0, i;
169 for (i=0; i < in.length(); ++i) {
170 if ((in[i] < 32 || (unsigned char)in[i] > 126) ||
171 (mode == HEX && in.length() - i >= 4 &&
172 ((in[i] < 32 || (unsigned char)in[i] > 126) ||
173 (in[i+1] < 32 || (unsigned char)in[i+1] > 126) ||
174 (in[i+2] < 32 || (unsigned char)in[i+2] > 126) ||
175 (in[i+3] < 32 || (unsigned char)in[i+3] > 126)))) {
176
177 if (mode == STRING) {
178 out.append(in.substr(from, i - from));
179 out.push_back('\'');
180 }
181 if (mode != HEX) {
182 out.append("0x");
183 mode = HEX;
184 }
185 if (in.length() - i >= 4) {
186 // print a whole u32 at once
187 snprintf(buf, sizeof(buf), "%08x",
188 (uint32_t)(((unsigned char)in[i] << 24) |
189 ((unsigned char)in[i+1] << 16) |
190 ((unsigned char)in[i+2] << 8) |
191 ((unsigned char)in[i+3] << 0)));
192 i += 3;
193 } else {
194 snprintf(buf, sizeof(buf), "%02x", (int)(unsigned char)in[i]);
195 }
196 out.append(buf);
197 } else {
198 if (mode != STRING) {
199 out.push_back('\'');
200 mode = STRING;
201 from = i;
202 }
203 }
204 }
205 if (mode == STRING) {
206 out.append(in.substr(from, i - from));
207 out.push_back('\'');
208 }
209 return out;
210 }
211 void Put(const rocksdb::Slice& key,
212 const rocksdb::Slice& value) override {
213 string prefix ((key.ToString()).substr(0,1));
214 string key_to_decode ((key.ToString()).substr(2,string::npos));
215 uint64_t size = (value.ToString()).size();
216 seen += "\nPut( Prefix = " + prefix + " key = "
217 + pretty_binary_string(key_to_decode)
218 + " Value size = " + std::to_string(size) + ")";
219 num_seen++;
220 }
221 void SingleDelete(const rocksdb::Slice& key) override {
222 string prefix ((key.ToString()).substr(0,1));
223 string key_to_decode ((key.ToString()).substr(2,string::npos));
224 seen += "\nSingleDelete(Prefix = "+ prefix + " Key = "
225 + pretty_binary_string(key_to_decode) + ")";
226 num_seen++;
227 }
228 void Delete(const rocksdb::Slice& key) override {
229 string prefix ((key.ToString()).substr(0,1));
230 string key_to_decode ((key.ToString()).substr(2,string::npos));
231 seen += "\nDelete( Prefix = " + prefix + " key = "
232 + pretty_binary_string(key_to_decode) + ")";
233
234 num_seen++;
235 }
236 void Merge(const rocksdb::Slice& key,
237 const rocksdb::Slice& value) override {
238 string prefix ((key.ToString()).substr(0,1));
239 string key_to_decode ((key.ToString()).substr(2,string::npos));
240 uint64_t size = (value.ToString()).size();
241 seen += "\nMerge( Prefix = " + prefix + " key = "
242 + pretty_binary_string(key_to_decode) + " Value size = "
243 + std::to_string(size) + ")";
244
245 num_seen++;
246 }
247 bool Continue() override { return num_seen < 50; }
248
249 };
250
251
252 class RocksDBTransactionImpl : public KeyValueDB::TransactionImpl {
253 public:
254 rocksdb::WriteBatch bat;
255 RocksDBStore *db;
256
257 explicit RocksDBTransactionImpl(RocksDBStore *_db);
258 void set(
259 const string &prefix,
260 const string &k,
261 const bufferlist &bl) override;
262 void set(
263 const string &prefix,
264 const char *k,
265 size_t keylen,
266 const bufferlist &bl) override;
267 void rmkey(
268 const string &prefix,
269 const string &k) override;
270 void rmkey(
271 const string &prefix,
272 const char *k,
273 size_t keylen) override;
274 void rm_single_key(
275 const string &prefix,
276 const string &k) override;
277 void rmkeys_by_prefix(
278 const string &prefix
279 ) override;
280 void rm_range_keys(
281 const string &prefix,
282 const string &start,
283 const string &end) override;
284 void merge(
285 const string& prefix,
286 const string& k,
287 const bufferlist &bl) override;
288 };
289
290 KeyValueDB::Transaction get_transaction() override {
291 return std::make_shared<RocksDBTransactionImpl>(this);
292 }
293
294 int submit_transaction(KeyValueDB::Transaction t) override;
295 int submit_transaction_sync(KeyValueDB::Transaction t) override;
296 int get(
297 const string &prefix,
298 const std::set<string> &key,
299 std::map<string, bufferlist> *out
300 ) override;
301 int get(
302 const string &prefix,
303 const string &key,
304 bufferlist *out
305 ) override;
306 int get(
307 const string &prefix,
308 const char *key,
309 size_t keylen,
310 bufferlist *out) override;
311
312
313 class RocksDBWholeSpaceIteratorImpl :
314 public KeyValueDB::WholeSpaceIteratorImpl {
315 protected:
316 rocksdb::Iterator *dbiter;
317 public:
318 explicit RocksDBWholeSpaceIteratorImpl(rocksdb::Iterator *iter) :
319 dbiter(iter) { }
320 //virtual ~RocksDBWholeSpaceIteratorImpl() { }
321 ~RocksDBWholeSpaceIteratorImpl() override;
322
323 int seek_to_first() override;
324 int seek_to_first(const string &prefix) override;
325 int seek_to_last() override;
326 int seek_to_last(const string &prefix) override;
327 int upper_bound(const string &prefix, const string &after) override;
328 int lower_bound(const string &prefix, const string &to) override;
329 bool valid() override;
330 int next() override;
331 int prev() override;
332 string key() override;
333 pair<string,string> raw_key() override;
334 bool raw_key_is_prefixed(const string &prefix) override;
335 bufferlist value() override;
336 bufferptr value_as_ptr() override;
337 int status() override;
338 size_t key_size() override;
339 size_t value_size() override;
340 };
341
342 /// Utility
343 static string combine_strings(const string &prefix, const string &value) {
344 string out = prefix;
345 out.push_back(0);
346 out.append(value);
347 return out;
348 }
349 static void combine_strings(const string &prefix,
350 const char *key, size_t keylen,
351 string *out) {
352 out->reserve(prefix.size() + 1 + keylen);
353 *out = prefix;
354 out->push_back(0);
355 out->append(key, keylen);
356 }
357
358 static int split_key(rocksdb::Slice in, string *prefix, string *key);
359
360 static bufferlist to_bufferlist(rocksdb::Slice in) {
361 bufferlist bl;
362 bl.append(bufferptr(in.data(), in.size()));
363 return bl;
364 }
365
366 static string past_prefix(const string &prefix);
367
368 class MergeOperatorRouter;
369 friend class MergeOperatorRouter;
370 int set_merge_operator(const std::string& prefix,
371 std::shared_ptr<KeyValueDB::MergeOperator> mop) override;
372 string assoc_name; ///< Name of associative operator
373
374 uint64_t get_estimated_size(map<string,uint64_t> &extra) override {
375 DIR *store_dir = opendir(path.c_str());
376 if (!store_dir) {
377 lderr(cct) << __func__ << " something happened opening the store: "
378 << cpp_strerror(errno) << dendl;
379 return 0;
380 }
381
382 uint64_t total_size = 0;
383 uint64_t sst_size = 0;
384 uint64_t log_size = 0;
385 uint64_t misc_size = 0;
386
387 struct dirent *entry = NULL;
388 while ((entry = readdir(store_dir)) != NULL) {
389 string n(entry->d_name);
390
391 if (n == "." || n == "..")
392 continue;
393
394 string fpath = path + '/' + n;
395 struct stat s;
396 int err = stat(fpath.c_str(), &s);
397 if (err < 0)
398 err = -errno;
399 // we may race against rocksdb while reading files; this should only
400 // happen when those files are being updated, data is being shuffled
401 // and files get removed, in which case there's not much of a problem
402 // as we'll get to them next time around.
403 if (err == -ENOENT) {
404 continue;
405 }
406 if (err < 0) {
407 lderr(cct) << __func__ << " error obtaining stats for " << fpath
408 << ": " << cpp_strerror(err) << dendl;
409 goto err;
410 }
411
412 size_t pos = n.find_last_of('.');
413 if (pos == string::npos) {
414 misc_size += s.st_size;
415 continue;
416 }
417
418 string ext = n.substr(pos+1);
419 if (ext == "sst") {
420 sst_size += s.st_size;
421 } else if (ext == "log") {
422 log_size += s.st_size;
423 } else {
424 misc_size += s.st_size;
425 }
426 }
427
428 total_size = sst_size + log_size + misc_size;
429
430 extra["sst"] = sst_size;
431 extra["log"] = log_size;
432 extra["misc"] = misc_size;
433 extra["total"] = total_size;
434
435 err:
436 closedir(store_dir);
437 return total_size;
438 }
439
440 int set_cache_size(uint64_t s) override {
441 cache_size = s;
442 return 0;
443 }
444
445 protected:
446 WholeSpaceIterator _get_iterator() override;
447 };
448
449
450
451 #endif