1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #ifndef ROCKS_DB_STORE_H
4 #define ROCKS_DB_STORE_H
6 #include "include/types.h"
7 #include "include/buffer_fwd.h"
8 #include "KeyValueDB.h"
13 #include <boost/scoped_ptr.hpp>
14 #include "rocksdb/write_batch.h"
15 #include "rocksdb/perf_context.h"
16 #include "rocksdb/iostats_context.h"
17 #include "rocksdb/statistics.h"
18 #include "rocksdb/table.h"
20 #include "common/errno.h"
21 #include "common/dout.h"
22 #include "include/assert.h"
23 #include "common/Formatter.h"
24 #include "common/Cond.h"
26 #include "common/ceph_context.h"
30 l_rocksdb_first
= 34300,
34 l_rocksdb_get_latency
,
35 l_rocksdb_submit_latency
,
36 l_rocksdb_submit_sync_latency
,
38 l_rocksdb_compact_range
,
39 l_rocksdb_compact_queue_merge
,
40 l_rocksdb_compact_queue_len
,
41 l_rocksdb_write_wal_time
,
42 l_rocksdb_write_memtable_time
,
43 l_rocksdb_write_delay_time
,
44 l_rocksdb_write_pre_and_post_process_time
,
59 struct BlockBasedTableOptions
;
62 extern rocksdb::Logger
*create_rocksdb_ceph_logger();
65 * Uses RocksDB to implement the KeyValueDB interface
67 class RocksDBStore
: public KeyValueDB
{
74 std::shared_ptr
<rocksdb::Statistics
> dbstats
;
75 rocksdb::BlockBasedTableOptions bbt_opts
;
78 uint64_t cache_size
= 0;
79 bool set_cache_flag
= false;
81 int do_open(ostream
&out
, bool create_if_missing
);
83 // manage async compactions
84 Mutex compact_queue_lock
;
85 Cond compact_queue_cond
;
86 list
< pair
<string
,string
> > compact_queue
;
87 bool compact_queue_stop
;
88 class CompactThread
: public Thread
{
91 explicit CompactThread(RocksDBStore
*d
) : db(d
) {}
92 void *entry() override
{
93 db
->compact_thread_entry();
96 friend class RocksDBStore
;
99 void compact_thread_entry();
101 void compact_range(const string
& start
, const string
& end
);
102 void compact_range_async(const string
& start
, const string
& end
);
105 /// compact the underlying rocksdb store
106 bool compact_on_mount
;
109 void compact() override
;
111 int tryInterpret(const string key
, const string val
, rocksdb::Options
&opt
);
112 int ParseOptionsFromString(const string opt_str
, rocksdb::Options
&opt
);
113 static int _test_init(const string
& dir
);
114 int init(string options_str
) override
;
115 /// compact rocksdb for all keys with a given prefix
116 void compact_prefix(const string
& prefix
) override
{
117 compact_range(prefix
, past_prefix(prefix
));
119 void compact_prefix_async(const string
& prefix
) override
{
120 compact_range_async(prefix
, past_prefix(prefix
));
123 void compact_range(const string
& prefix
, const string
& start
, const string
& end
) override
{
124 compact_range(combine_strings(prefix
, start
), combine_strings(prefix
, end
));
126 void compact_range_async(const string
& prefix
, const string
& start
, const string
& end
) override
{
127 compact_range_async(combine_strings(prefix
, start
), combine_strings(prefix
, end
));
130 RocksDBStore(CephContext
*c
, const string
&path
, void *p
) :
136 env(static_cast<rocksdb::Env
*>(p
)),
138 compact_queue_lock("RocksDBStore::compact_thread_lock"),
139 compact_queue_stop(false),
140 compact_thread(this),
141 compact_on_mount(false),
143 enable_rmrange(cct
->_conf
->rocksdb_enable_rmrange
)
146 ~RocksDBStore() override
;
148 static bool check_omap_dir(string
&omap_dir
);
149 /// Opens underlying db
150 int open(ostream
&out
) override
{
151 return do_open(out
, false);
153 /// Creates underlying db if missing and opens it
154 int create_and_open(ostream
&out
) override
;
156 void close() override
;
158 void split_stats(const std::string
&s
, char delim
, std::vector
<std::string
> &elems
);
159 void get_statistics(Formatter
*f
) override
;
161 struct RocksWBHandler
: public rocksdb::WriteBatch::Handler
{
164 static string
pretty_binary_string(const string
& in
) {
167 out
.reserve(in
.length() * 3);
168 enum { NONE
, HEX
, STRING
} mode
= NONE
;
169 unsigned from
= 0, i
;
170 for (i
=0; i
< in
.length(); ++i
) {
171 if ((in
[i
] < 32 || (unsigned char)in
[i
] > 126) ||
172 (mode
== HEX
&& in
.length() - i
>= 4 &&
173 ((in
[i
] < 32 || (unsigned char)in
[i
] > 126) ||
174 (in
[i
+1] < 32 || (unsigned char)in
[i
+1] > 126) ||
175 (in
[i
+2] < 32 || (unsigned char)in
[i
+2] > 126) ||
176 (in
[i
+3] < 32 || (unsigned char)in
[i
+3] > 126)))) {
178 if (mode
== STRING
) {
179 out
.append(in
.substr(from
, i
- from
));
186 if (in
.length() - i
>= 4) {
187 // print a whole u32 at once
188 snprintf(buf
, sizeof(buf
), "%08x",
189 (uint32_t)(((unsigned char)in
[i
] << 24) |
190 ((unsigned char)in
[i
+1] << 16) |
191 ((unsigned char)in
[i
+2] << 8) |
192 ((unsigned char)in
[i
+3] << 0)));
195 snprintf(buf
, sizeof(buf
), "%02x", (int)(unsigned char)in
[i
]);
199 if (mode
!= STRING
) {
206 if (mode
== STRING
) {
207 out
.append(in
.substr(from
, i
- from
));
212 void Put(const rocksdb::Slice
& key
,
213 const rocksdb::Slice
& value
) override
{
214 string
prefix ((key
.ToString()).substr(0,1));
215 string
key_to_decode ((key
.ToString()).substr(2,string::npos
));
216 uint64_t size
= (value
.ToString()).size();
217 seen
+= "\nPut( Prefix = " + prefix
+ " key = "
218 + pretty_binary_string(key_to_decode
)
219 + " Value size = " + std::to_string(size
) + ")";
222 void SingleDelete(const rocksdb::Slice
& key
) override
{
223 string
prefix ((key
.ToString()).substr(0,1));
224 string
key_to_decode ((key
.ToString()).substr(2,string::npos
));
225 seen
+= "\nSingleDelete(Prefix = "+ prefix
+ " Key = "
226 + pretty_binary_string(key_to_decode
) + ")";
229 void Delete(const rocksdb::Slice
& key
) override
{
230 string
prefix ((key
.ToString()).substr(0,1));
231 string
key_to_decode ((key
.ToString()).substr(2,string::npos
));
232 seen
+= "\nDelete( Prefix = " + prefix
+ " key = "
233 + pretty_binary_string(key_to_decode
) + ")";
237 void Merge(const rocksdb::Slice
& key
,
238 const rocksdb::Slice
& value
) override
{
239 string
prefix ((key
.ToString()).substr(0,1));
240 string
key_to_decode ((key
.ToString()).substr(2,string::npos
));
241 uint64_t size
= (value
.ToString()).size();
242 seen
+= "\nMerge( Prefix = " + prefix
+ " key = "
243 + pretty_binary_string(key_to_decode
) + " Value size = "
244 + std::to_string(size
) + ")";
248 bool Continue() override
{ return num_seen
< 50; }
253 class RocksDBTransactionImpl
: public KeyValueDB::TransactionImpl
{
255 rocksdb::WriteBatch bat
;
258 explicit RocksDBTransactionImpl(RocksDBStore
*_db
);
260 const string
&prefix
,
262 const bufferlist
&bl
) override
;
264 const string
&prefix
,
267 const bufferlist
&bl
) override
;
269 const string
&prefix
,
270 const string
&k
) override
;
272 const string
&prefix
,
274 size_t keylen
) override
;
276 const string
&prefix
,
277 const string
&k
) override
;
278 void rmkeys_by_prefix(
282 const string
&prefix
,
284 const string
&end
) override
;
286 const string
& prefix
,
288 const bufferlist
&bl
) override
;
291 KeyValueDB::Transaction
get_transaction() override
{
292 return std::make_shared
<RocksDBTransactionImpl
>(this);
295 int submit_transaction(KeyValueDB::Transaction t
) override
;
296 int submit_transaction_sync(KeyValueDB::Transaction t
) override
;
298 const string
&prefix
,
299 const std::set
<string
> &key
,
300 std::map
<string
, bufferlist
> *out
303 const string
&prefix
,
308 const string
&prefix
,
311 bufferlist
*out
) override
;
314 class RocksDBWholeSpaceIteratorImpl
:
315 public KeyValueDB::WholeSpaceIteratorImpl
{
317 rocksdb::Iterator
*dbiter
;
319 explicit RocksDBWholeSpaceIteratorImpl(rocksdb::Iterator
*iter
) :
321 //virtual ~RocksDBWholeSpaceIteratorImpl() { }
322 ~RocksDBWholeSpaceIteratorImpl() override
;
324 int seek_to_first() override
;
325 int seek_to_first(const string
&prefix
) override
;
326 int seek_to_last() override
;
327 int seek_to_last(const string
&prefix
) override
;
328 int upper_bound(const string
&prefix
, const string
&after
) override
;
329 int lower_bound(const string
&prefix
, const string
&to
) override
;
330 bool valid() override
;
333 string
key() override
;
334 pair
<string
,string
> raw_key() override
;
335 bool raw_key_is_prefixed(const string
&prefix
) override
;
336 bufferlist
value() override
;
337 bufferptr
value_as_ptr() override
;
338 int status() override
;
339 size_t key_size() override
;
340 size_t value_size() override
;
344 static string
combine_strings(const string
&prefix
, const string
&value
) {
350 static void combine_strings(const string
&prefix
,
351 const char *key
, size_t keylen
,
353 out
->reserve(prefix
.size() + 1 + keylen
);
356 out
->append(key
, keylen
);
359 static int split_key(rocksdb::Slice in
, string
*prefix
, string
*key
);
361 static bufferlist
to_bufferlist(rocksdb::Slice in
) {
363 bl
.append(bufferptr(in
.data(), in
.size()));
367 static string
past_prefix(const string
&prefix
);
369 class MergeOperatorRouter
;
370 friend class MergeOperatorRouter
;
371 int set_merge_operator(const std::string
& prefix
,
372 std::shared_ptr
<KeyValueDB::MergeOperator
> mop
) override
;
373 string assoc_name
; ///< Name of associative operator
375 uint64_t get_estimated_size(map
<string
,uint64_t> &extra
) override
{
376 DIR *store_dir
= opendir(path
.c_str());
378 lderr(cct
) << __func__
<< " something happened opening the store: "
379 << cpp_strerror(errno
) << dendl
;
383 uint64_t total_size
= 0;
384 uint64_t sst_size
= 0;
385 uint64_t log_size
= 0;
386 uint64_t misc_size
= 0;
388 struct dirent
*entry
= NULL
;
389 while ((entry
= readdir(store_dir
)) != NULL
) {
390 string
n(entry
->d_name
);
392 if (n
== "." || n
== "..")
395 string fpath
= path
+ '/' + n
;
397 int err
= stat(fpath
.c_str(), &s
);
400 // we may race against rocksdb while reading files; this should only
401 // happen when those files are being updated, data is being shuffled
402 // and files get removed, in which case there's not much of a problem
403 // as we'll get to them next time around.
404 if (err
== -ENOENT
) {
408 lderr(cct
) << __func__
<< " error obtaining stats for " << fpath
409 << ": " << cpp_strerror(err
) << dendl
;
413 size_t pos
= n
.find_last_of('.');
414 if (pos
== string::npos
) {
415 misc_size
+= s
.st_size
;
419 string ext
= n
.substr(pos
+1);
421 sst_size
+= s
.st_size
;
422 } else if (ext
== "log") {
423 log_size
+= s
.st_size
;
425 misc_size
+= s
.st_size
;
429 total_size
= sst_size
+ log_size
+ misc_size
;
431 extra
["sst"] = sst_size
;
432 extra
["log"] = log_size
;
433 extra
["misc"] = misc_size
;
434 extra
["total"] = total_size
;
441 int set_cache_size(uint64_t s
) override
{
443 set_cache_flag
= true;
448 WholeSpaceIterator
_get_iterator() override
;