]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | #ifndef ROCKS_DB_STORE_H | |
4 | #define ROCKS_DB_STORE_H | |
5 | ||
6 | #include "include/types.h" | |
7 | #include "include/buffer_fwd.h" | |
8 | #include "KeyValueDB.h" | |
9 | #include <set> | |
10 | #include <map> | |
11 | #include <string> | |
12 | #include <memory> | |
13 | #include <boost/scoped_ptr.hpp> | |
14 | #include "rocksdb/write_batch.h" | |
15 | #include "rocksdb/perf_context.h" | |
16 | #include "rocksdb/iostats_context.h" | |
17 | #include "rocksdb/statistics.h" | |
18 | #include "rocksdb/table.h" | |
19 | #include <errno.h> | |
20 | #include "common/errno.h" | |
21 | #include "common/dout.h" | |
22 | #include "include/assert.h" | |
23 | #include "common/Formatter.h" | |
24 | #include "common/Cond.h" | |
25 | ||
26 | #include "common/ceph_context.h" | |
27 | class PerfCounters; | |
28 | ||
29 | enum { | |
30 | l_rocksdb_first = 34300, | |
31 | l_rocksdb_gets, | |
32 | l_rocksdb_txns, | |
33 | l_rocksdb_txns_sync, | |
34 | l_rocksdb_get_latency, | |
35 | l_rocksdb_submit_latency, | |
36 | l_rocksdb_submit_sync_latency, | |
37 | l_rocksdb_compact, | |
38 | l_rocksdb_compact_range, | |
39 | l_rocksdb_compact_queue_merge, | |
40 | l_rocksdb_compact_queue_len, | |
41 | l_rocksdb_write_wal_time, | |
42 | l_rocksdb_write_memtable_time, | |
43 | l_rocksdb_write_delay_time, | |
44 | l_rocksdb_write_pre_and_post_process_time, | |
45 | l_rocksdb_last, | |
46 | }; | |
47 | ||
48 | namespace rocksdb{ | |
49 | class DB; | |
50 | class Env; | |
51 | class Cache; | |
52 | class FilterPolicy; | |
53 | class Snapshot; | |
54 | class Slice; | |
55 | class WriteBatch; | |
56 | class Iterator; | |
57 | class Logger; | |
58 | struct Options; | |
59 | struct BlockBasedTableOptions; | |
60 | } | |
61 | ||
62 | extern rocksdb::Logger *create_rocksdb_ceph_logger(); | |
63 | ||
64 | /** | |
65 | * Uses RocksDB to implement the KeyValueDB interface | |
66 | */ | |
67 | class RocksDBStore : public KeyValueDB { | |
68 | CephContext *cct; | |
69 | PerfCounters *logger; | |
70 | string path; | |
71 | void *priv; | |
72 | rocksdb::DB *db; | |
73 | rocksdb::Env *env; | |
74 | std::shared_ptr<rocksdb::Statistics> dbstats; | |
75 | rocksdb::BlockBasedTableOptions bbt_opts; | |
76 | string options_str; | |
77 | ||
31f18b77 | 78 | uint64_t cache_size = 0; |
224ce89b | 79 | bool set_cache_flag = false; |
31f18b77 | 80 | |
7c673cae FG |
81 | int do_open(ostream &out, bool create_if_missing); |
82 | ||
83 | // manage async compactions | |
84 | Mutex compact_queue_lock; | |
85 | Cond compact_queue_cond; | |
86 | list< pair<string,string> > compact_queue; | |
87 | bool compact_queue_stop; | |
88 | class CompactThread : public Thread { | |
89 | RocksDBStore *db; | |
90 | public: | |
91 | explicit CompactThread(RocksDBStore *d) : db(d) {} | |
92 | void *entry() override { | |
93 | db->compact_thread_entry(); | |
94 | return NULL; | |
95 | } | |
96 | friend class RocksDBStore; | |
97 | } compact_thread; | |
98 | ||
99 | void compact_thread_entry(); | |
100 | ||
101 | void compact_range(const string& start, const string& end); | |
102 | void compact_range_async(const string& start, const string& end); | |
103 | ||
104 | public: | |
105 | /// compact the underlying rocksdb store | |
106 | bool compact_on_mount; | |
107 | bool disableWAL; | |
108 | bool enable_rmrange; | |
109 | void compact() override; | |
110 | ||
111 | int tryInterpret(const string key, const string val, rocksdb::Options &opt); | |
112 | int ParseOptionsFromString(const string opt_str, rocksdb::Options &opt); | |
113 | static int _test_init(const string& dir); | |
114 | int init(string options_str) override; | |
115 | /// compact rocksdb for all keys with a given prefix | |
116 | void compact_prefix(const string& prefix) override { | |
117 | compact_range(prefix, past_prefix(prefix)); | |
118 | } | |
119 | void compact_prefix_async(const string& prefix) override { | |
120 | compact_range_async(prefix, past_prefix(prefix)); | |
121 | } | |
122 | ||
123 | void compact_range(const string& prefix, const string& start, const string& end) override { | |
124 | compact_range(combine_strings(prefix, start), combine_strings(prefix, end)); | |
125 | } | |
126 | void compact_range_async(const string& prefix, const string& start, const string& end) override { | |
127 | compact_range_async(combine_strings(prefix, start), combine_strings(prefix, end)); | |
128 | } | |
129 | ||
130 | RocksDBStore(CephContext *c, const string &path, void *p) : | |
131 | cct(c), | |
132 | logger(NULL), | |
133 | path(path), | |
134 | priv(p), | |
135 | db(NULL), | |
136 | env(static_cast<rocksdb::Env*>(p)), | |
137 | dbstats(NULL), | |
138 | compact_queue_lock("RocksDBStore::compact_thread_lock"), | |
139 | compact_queue_stop(false), | |
140 | compact_thread(this), | |
141 | compact_on_mount(false), | |
142 | disableWAL(false), | |
143 | enable_rmrange(cct->_conf->rocksdb_enable_rmrange) | |
144 | {} | |
145 | ||
146 | ~RocksDBStore() override; | |
147 | ||
148 | static bool check_omap_dir(string &omap_dir); | |
149 | /// Opens underlying db | |
150 | int open(ostream &out) override { | |
151 | return do_open(out, false); | |
152 | } | |
153 | /// Creates underlying db if missing and opens it | |
154 | int create_and_open(ostream &out) override; | |
155 | ||
156 | void close() override; | |
157 | ||
158 | void split_stats(const std::string &s, char delim, std::vector<std::string> &elems); | |
159 | void get_statistics(Formatter *f) override; | |
160 | ||
161 | struct RocksWBHandler: public rocksdb::WriteBatch::Handler { | |
162 | std::string seen ; | |
163 | int num_seen = 0; | |
164 | static string pretty_binary_string(const string& in) { | |
165 | char buf[10]; | |
166 | string out; | |
167 | out.reserve(in.length() * 3); | |
168 | enum { NONE, HEX, STRING } mode = NONE; | |
169 | unsigned from = 0, i; | |
170 | for (i=0; i < in.length(); ++i) { | |
171 | if ((in[i] < 32 || (unsigned char)in[i] > 126) || | |
172 | (mode == HEX && in.length() - i >= 4 && | |
173 | ((in[i] < 32 || (unsigned char)in[i] > 126) || | |
174 | (in[i+1] < 32 || (unsigned char)in[i+1] > 126) || | |
175 | (in[i+2] < 32 || (unsigned char)in[i+2] > 126) || | |
176 | (in[i+3] < 32 || (unsigned char)in[i+3] > 126)))) { | |
177 | ||
178 | if (mode == STRING) { | |
179 | out.append(in.substr(from, i - from)); | |
180 | out.push_back('\''); | |
181 | } | |
182 | if (mode != HEX) { | |
183 | out.append("0x"); | |
184 | mode = HEX; | |
185 | } | |
186 | if (in.length() - i >= 4) { | |
187 | // print a whole u32 at once | |
188 | snprintf(buf, sizeof(buf), "%08x", | |
189 | (uint32_t)(((unsigned char)in[i] << 24) | | |
190 | ((unsigned char)in[i+1] << 16) | | |
191 | ((unsigned char)in[i+2] << 8) | | |
192 | ((unsigned char)in[i+3] << 0))); | |
193 | i += 3; | |
194 | } else { | |
195 | snprintf(buf, sizeof(buf), "%02x", (int)(unsigned char)in[i]); | |
196 | } | |
197 | out.append(buf); | |
198 | } else { | |
199 | if (mode != STRING) { | |
200 | out.push_back('\''); | |
201 | mode = STRING; | |
202 | from = i; | |
203 | } | |
204 | } | |
205 | } | |
206 | if (mode == STRING) { | |
207 | out.append(in.substr(from, i - from)); | |
208 | out.push_back('\''); | |
209 | } | |
210 | return out; | |
211 | } | |
212 | void Put(const rocksdb::Slice& key, | |
213 | const rocksdb::Slice& value) override { | |
214 | string prefix ((key.ToString()).substr(0,1)); | |
215 | string key_to_decode ((key.ToString()).substr(2,string::npos)); | |
216 | uint64_t size = (value.ToString()).size(); | |
217 | seen += "\nPut( Prefix = " + prefix + " key = " | |
218 | + pretty_binary_string(key_to_decode) | |
219 | + " Value size = " + std::to_string(size) + ")"; | |
220 | num_seen++; | |
221 | } | |
222 | void SingleDelete(const rocksdb::Slice& key) override { | |
223 | string prefix ((key.ToString()).substr(0,1)); | |
224 | string key_to_decode ((key.ToString()).substr(2,string::npos)); | |
225 | seen += "\nSingleDelete(Prefix = "+ prefix + " Key = " | |
226 | + pretty_binary_string(key_to_decode) + ")"; | |
227 | num_seen++; | |
228 | } | |
229 | void Delete(const rocksdb::Slice& key) override { | |
230 | string prefix ((key.ToString()).substr(0,1)); | |
231 | string key_to_decode ((key.ToString()).substr(2,string::npos)); | |
232 | seen += "\nDelete( Prefix = " + prefix + " key = " | |
233 | + pretty_binary_string(key_to_decode) + ")"; | |
234 | ||
235 | num_seen++; | |
236 | } | |
237 | void Merge(const rocksdb::Slice& key, | |
238 | const rocksdb::Slice& value) override { | |
239 | string prefix ((key.ToString()).substr(0,1)); | |
240 | string key_to_decode ((key.ToString()).substr(2,string::npos)); | |
241 | uint64_t size = (value.ToString()).size(); | |
242 | seen += "\nMerge( Prefix = " + prefix + " key = " | |
243 | + pretty_binary_string(key_to_decode) + " Value size = " | |
244 | + std::to_string(size) + ")"; | |
245 | ||
246 | num_seen++; | |
247 | } | |
248 | bool Continue() override { return num_seen < 50; } | |
249 | ||
250 | }; | |
251 | ||
252 | ||
253 | class RocksDBTransactionImpl : public KeyValueDB::TransactionImpl { | |
254 | public: | |
255 | rocksdb::WriteBatch bat; | |
256 | RocksDBStore *db; | |
257 | ||
258 | explicit RocksDBTransactionImpl(RocksDBStore *_db); | |
259 | void set( | |
260 | const string &prefix, | |
261 | const string &k, | |
262 | const bufferlist &bl) override; | |
263 | void set( | |
264 | const string &prefix, | |
265 | const char *k, | |
266 | size_t keylen, | |
267 | const bufferlist &bl) override; | |
268 | void rmkey( | |
269 | const string &prefix, | |
270 | const string &k) override; | |
271 | void rmkey( | |
272 | const string &prefix, | |
273 | const char *k, | |
274 | size_t keylen) override; | |
275 | void rm_single_key( | |
276 | const string &prefix, | |
277 | const string &k) override; | |
278 | void rmkeys_by_prefix( | |
279 | const string &prefix | |
280 | ) override; | |
281 | void rm_range_keys( | |
282 | const string &prefix, | |
283 | const string &start, | |
284 | const string &end) override; | |
285 | void merge( | |
286 | const string& prefix, | |
287 | const string& k, | |
288 | const bufferlist &bl) override; | |
289 | }; | |
290 | ||
291 | KeyValueDB::Transaction get_transaction() override { | |
292 | return std::make_shared<RocksDBTransactionImpl>(this); | |
293 | } | |
294 | ||
295 | int submit_transaction(KeyValueDB::Transaction t) override; | |
296 | int submit_transaction_sync(KeyValueDB::Transaction t) override; | |
297 | int get( | |
298 | const string &prefix, | |
299 | const std::set<string> &key, | |
300 | std::map<string, bufferlist> *out | |
301 | ) override; | |
302 | int get( | |
303 | const string &prefix, | |
304 | const string &key, | |
305 | bufferlist *out | |
306 | ) override; | |
307 | int get( | |
308 | const string &prefix, | |
309 | const char *key, | |
310 | size_t keylen, | |
311 | bufferlist *out) override; | |
312 | ||
313 | ||
314 | class RocksDBWholeSpaceIteratorImpl : | |
315 | public KeyValueDB::WholeSpaceIteratorImpl { | |
316 | protected: | |
317 | rocksdb::Iterator *dbiter; | |
318 | public: | |
319 | explicit RocksDBWholeSpaceIteratorImpl(rocksdb::Iterator *iter) : | |
320 | dbiter(iter) { } | |
321 | //virtual ~RocksDBWholeSpaceIteratorImpl() { } | |
322 | ~RocksDBWholeSpaceIteratorImpl() override; | |
323 | ||
324 | int seek_to_first() override; | |
325 | int seek_to_first(const string &prefix) override; | |
326 | int seek_to_last() override; | |
327 | int seek_to_last(const string &prefix) override; | |
328 | int upper_bound(const string &prefix, const string &after) override; | |
329 | int lower_bound(const string &prefix, const string &to) override; | |
330 | bool valid() override; | |
331 | int next() override; | |
332 | int prev() override; | |
333 | string key() override; | |
334 | pair<string,string> raw_key() override; | |
335 | bool raw_key_is_prefixed(const string &prefix) override; | |
336 | bufferlist value() override; | |
337 | bufferptr value_as_ptr() override; | |
338 | int status() override; | |
339 | size_t key_size() override; | |
340 | size_t value_size() override; | |
341 | }; | |
342 | ||
343 | /// Utility | |
344 | static string combine_strings(const string &prefix, const string &value) { | |
345 | string out = prefix; | |
346 | out.push_back(0); | |
347 | out.append(value); | |
348 | return out; | |
349 | } | |
350 | static void combine_strings(const string &prefix, | |
351 | const char *key, size_t keylen, | |
352 | string *out) { | |
353 | out->reserve(prefix.size() + 1 + keylen); | |
354 | *out = prefix; | |
355 | out->push_back(0); | |
356 | out->append(key, keylen); | |
357 | } | |
358 | ||
359 | static int split_key(rocksdb::Slice in, string *prefix, string *key); | |
360 | ||
361 | static bufferlist to_bufferlist(rocksdb::Slice in) { | |
362 | bufferlist bl; | |
363 | bl.append(bufferptr(in.data(), in.size())); | |
364 | return bl; | |
365 | } | |
366 | ||
367 | static string past_prefix(const string &prefix); | |
368 | ||
369 | class MergeOperatorRouter; | |
370 | friend class MergeOperatorRouter; | |
371 | int set_merge_operator(const std::string& prefix, | |
372 | std::shared_ptr<KeyValueDB::MergeOperator> mop) override; | |
373 | string assoc_name; ///< Name of associative operator | |
374 | ||
375 | uint64_t get_estimated_size(map<string,uint64_t> &extra) override { | |
376 | DIR *store_dir = opendir(path.c_str()); | |
377 | if (!store_dir) { | |
378 | lderr(cct) << __func__ << " something happened opening the store: " | |
379 | << cpp_strerror(errno) << dendl; | |
380 | return 0; | |
381 | } | |
382 | ||
383 | uint64_t total_size = 0; | |
384 | uint64_t sst_size = 0; | |
385 | uint64_t log_size = 0; | |
386 | uint64_t misc_size = 0; | |
387 | ||
388 | struct dirent *entry = NULL; | |
389 | while ((entry = readdir(store_dir)) != NULL) { | |
390 | string n(entry->d_name); | |
391 | ||
392 | if (n == "." || n == "..") | |
393 | continue; | |
394 | ||
395 | string fpath = path + '/' + n; | |
396 | struct stat s; | |
397 | int err = stat(fpath.c_str(), &s); | |
398 | if (err < 0) | |
399 | err = -errno; | |
400 | // we may race against rocksdb while reading files; this should only | |
401 | // happen when those files are being updated, data is being shuffled | |
402 | // and files get removed, in which case there's not much of a problem | |
403 | // as we'll get to them next time around. | |
404 | if (err == -ENOENT) { | |
405 | continue; | |
406 | } | |
407 | if (err < 0) { | |
408 | lderr(cct) << __func__ << " error obtaining stats for " << fpath | |
409 | << ": " << cpp_strerror(err) << dendl; | |
410 | goto err; | |
411 | } | |
412 | ||
413 | size_t pos = n.find_last_of('.'); | |
414 | if (pos == string::npos) { | |
415 | misc_size += s.st_size; | |
416 | continue; | |
417 | } | |
418 | ||
419 | string ext = n.substr(pos+1); | |
420 | if (ext == "sst") { | |
421 | sst_size += s.st_size; | |
422 | } else if (ext == "log") { | |
423 | log_size += s.st_size; | |
424 | } else { | |
425 | misc_size += s.st_size; | |
426 | } | |
427 | } | |
428 | ||
429 | total_size = sst_size + log_size + misc_size; | |
430 | ||
431 | extra["sst"] = sst_size; | |
432 | extra["log"] = log_size; | |
433 | extra["misc"] = misc_size; | |
434 | extra["total"] = total_size; | |
435 | ||
436 | err: | |
437 | closedir(store_dir); | |
438 | return total_size; | |
439 | } | |
440 | ||
31f18b77 FG |
441 | int set_cache_size(uint64_t s) override { |
442 | cache_size = s; | |
224ce89b | 443 | set_cache_flag = true; |
31f18b77 FG |
444 | return 0; |
445 | } | |
7c673cae FG |
446 | |
447 | protected: | |
448 | WholeSpaceIterator _get_iterator() override; | |
449 | }; | |
450 | ||
451 | ||
452 | ||
453 | #endif |