1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #ifndef ROCKS_DB_STORE_H
4 #define ROCKS_DB_STORE_H
6 #include "include/types.h"
7 #include "include/buffer_fwd.h"
8 #include "KeyValueDB.h"
13 #include <boost/scoped_ptr.hpp>
14 #include "rocksdb/write_batch.h"
15 #include "rocksdb/perf_context.h"
16 #include "rocksdb/iostats_context.h"
17 #include "rocksdb/statistics.h"
18 #include "rocksdb/table.h"
20 #include "common/errno.h"
21 #include "common/dout.h"
22 #include "include/assert.h"
23 #include "common/Formatter.h"
24 #include "common/Cond.h"
26 #include "common/ceph_context.h"
30 l_rocksdb_first
= 34300,
34 l_rocksdb_get_latency
,
35 l_rocksdb_submit_latency
,
36 l_rocksdb_submit_sync_latency
,
38 l_rocksdb_compact_range
,
39 l_rocksdb_compact_queue_merge
,
40 l_rocksdb_compact_queue_len
,
41 l_rocksdb_write_wal_time
,
42 l_rocksdb_write_memtable_time
,
43 l_rocksdb_write_delay_time
,
44 l_rocksdb_write_pre_and_post_process_time
,
59 struct BlockBasedTableOptions
;
62 extern rocksdb::Logger
*create_rocksdb_ceph_logger();
65 * Uses RocksDB to implement the KeyValueDB interface
67 class RocksDBStore
: public KeyValueDB
{
74 std::shared_ptr
<rocksdb::Statistics
> dbstats
;
75 rocksdb::BlockBasedTableOptions bbt_opts
;
78 uint64_t cache_size
= 0;
80 int do_open(ostream
&out
, bool create_if_missing
);
82 // manage async compactions
83 Mutex compact_queue_lock
;
84 Cond compact_queue_cond
;
85 list
< pair
<string
,string
> > compact_queue
;
86 bool compact_queue_stop
;
87 class CompactThread
: public Thread
{
90 explicit CompactThread(RocksDBStore
*d
) : db(d
) {}
91 void *entry() override
{
92 db
->compact_thread_entry();
95 friend class RocksDBStore
;
98 void compact_thread_entry();
100 void compact_range(const string
& start
, const string
& end
);
101 void compact_range_async(const string
& start
, const string
& end
);
104 /// compact the underlying rocksdb store
105 bool compact_on_mount
;
108 void compact() override
;
110 int tryInterpret(const string key
, const string val
, rocksdb::Options
&opt
);
111 int ParseOptionsFromString(const string opt_str
, rocksdb::Options
&opt
);
112 static int _test_init(const string
& dir
);
113 int init(string options_str
) override
;
114 /// compact rocksdb for all keys with a given prefix
115 void compact_prefix(const string
& prefix
) override
{
116 compact_range(prefix
, past_prefix(prefix
));
118 void compact_prefix_async(const string
& prefix
) override
{
119 compact_range_async(prefix
, past_prefix(prefix
));
122 void compact_range(const string
& prefix
, const string
& start
, const string
& end
) override
{
123 compact_range(combine_strings(prefix
, start
), combine_strings(prefix
, end
));
125 void compact_range_async(const string
& prefix
, const string
& start
, const string
& end
) override
{
126 compact_range_async(combine_strings(prefix
, start
), combine_strings(prefix
, end
));
129 RocksDBStore(CephContext
*c
, const string
&path
, void *p
) :
135 env(static_cast<rocksdb::Env
*>(p
)),
137 compact_queue_lock("RocksDBStore::compact_thread_lock"),
138 compact_queue_stop(false),
139 compact_thread(this),
140 compact_on_mount(false),
142 enable_rmrange(cct
->_conf
->rocksdb_enable_rmrange
)
145 ~RocksDBStore() override
;
147 static bool check_omap_dir(string
&omap_dir
);
148 /// Opens underlying db
149 int open(ostream
&out
) override
{
150 return do_open(out
, false);
152 /// Creates underlying db if missing and opens it
153 int create_and_open(ostream
&out
) override
;
155 void close() override
;
157 void split_stats(const std::string
&s
, char delim
, std::vector
<std::string
> &elems
);
158 void get_statistics(Formatter
*f
) override
;
160 struct RocksWBHandler
: public rocksdb::WriteBatch::Handler
{
163 static string
pretty_binary_string(const string
& in
) {
166 out
.reserve(in
.length() * 3);
167 enum { NONE
, HEX
, STRING
} mode
= NONE
;
168 unsigned from
= 0, i
;
169 for (i
=0; i
< in
.length(); ++i
) {
170 if ((in
[i
] < 32 || (unsigned char)in
[i
] > 126) ||
171 (mode
== HEX
&& in
.length() - i
>= 4 &&
172 ((in
[i
] < 32 || (unsigned char)in
[i
] > 126) ||
173 (in
[i
+1] < 32 || (unsigned char)in
[i
+1] > 126) ||
174 (in
[i
+2] < 32 || (unsigned char)in
[i
+2] > 126) ||
175 (in
[i
+3] < 32 || (unsigned char)in
[i
+3] > 126)))) {
177 if (mode
== STRING
) {
178 out
.append(in
.substr(from
, i
- from
));
185 if (in
.length() - i
>= 4) {
186 // print a whole u32 at once
187 snprintf(buf
, sizeof(buf
), "%08x",
188 (uint32_t)(((unsigned char)in
[i
] << 24) |
189 ((unsigned char)in
[i
+1] << 16) |
190 ((unsigned char)in
[i
+2] << 8) |
191 ((unsigned char)in
[i
+3] << 0)));
194 snprintf(buf
, sizeof(buf
), "%02x", (int)(unsigned char)in
[i
]);
198 if (mode
!= STRING
) {
205 if (mode
== STRING
) {
206 out
.append(in
.substr(from
, i
- from
));
211 void Put(const rocksdb::Slice
& key
,
212 const rocksdb::Slice
& value
) override
{
213 string
prefix ((key
.ToString()).substr(0,1));
214 string
key_to_decode ((key
.ToString()).substr(2,string::npos
));
215 uint64_t size
= (value
.ToString()).size();
216 seen
+= "\nPut( Prefix = " + prefix
+ " key = "
217 + pretty_binary_string(key_to_decode
)
218 + " Value size = " + std::to_string(size
) + ")";
221 void SingleDelete(const rocksdb::Slice
& key
) override
{
222 string
prefix ((key
.ToString()).substr(0,1));
223 string
key_to_decode ((key
.ToString()).substr(2,string::npos
));
224 seen
+= "\nSingleDelete(Prefix = "+ prefix
+ " Key = "
225 + pretty_binary_string(key_to_decode
) + ")";
228 void Delete(const rocksdb::Slice
& key
) override
{
229 string
prefix ((key
.ToString()).substr(0,1));
230 string
key_to_decode ((key
.ToString()).substr(2,string::npos
));
231 seen
+= "\nDelete( Prefix = " + prefix
+ " key = "
232 + pretty_binary_string(key_to_decode
) + ")";
236 void Merge(const rocksdb::Slice
& key
,
237 const rocksdb::Slice
& value
) override
{
238 string
prefix ((key
.ToString()).substr(0,1));
239 string
key_to_decode ((key
.ToString()).substr(2,string::npos
));
240 uint64_t size
= (value
.ToString()).size();
241 seen
+= "\nMerge( Prefix = " + prefix
+ " key = "
242 + pretty_binary_string(key_to_decode
) + " Value size = "
243 + std::to_string(size
) + ")";
247 bool Continue() override
{ return num_seen
< 50; }
252 class RocksDBTransactionImpl
: public KeyValueDB::TransactionImpl
{
254 rocksdb::WriteBatch bat
;
257 explicit RocksDBTransactionImpl(RocksDBStore
*_db
);
259 const string
&prefix
,
261 const bufferlist
&bl
) override
;
263 const string
&prefix
,
266 const bufferlist
&bl
) override
;
268 const string
&prefix
,
269 const string
&k
) override
;
271 const string
&prefix
,
273 size_t keylen
) override
;
275 const string
&prefix
,
276 const string
&k
) override
;
277 void rmkeys_by_prefix(
281 const string
&prefix
,
283 const string
&end
) override
;
285 const string
& prefix
,
287 const bufferlist
&bl
) override
;
290 KeyValueDB::Transaction
get_transaction() override
{
291 return std::make_shared
<RocksDBTransactionImpl
>(this);
294 int submit_transaction(KeyValueDB::Transaction t
) override
;
295 int submit_transaction_sync(KeyValueDB::Transaction t
) override
;
297 const string
&prefix
,
298 const std::set
<string
> &key
,
299 std::map
<string
, bufferlist
> *out
302 const string
&prefix
,
307 const string
&prefix
,
310 bufferlist
*out
) override
;
313 class RocksDBWholeSpaceIteratorImpl
:
314 public KeyValueDB::WholeSpaceIteratorImpl
{
316 rocksdb::Iterator
*dbiter
;
318 explicit RocksDBWholeSpaceIteratorImpl(rocksdb::Iterator
*iter
) :
320 //virtual ~RocksDBWholeSpaceIteratorImpl() { }
321 ~RocksDBWholeSpaceIteratorImpl() override
;
323 int seek_to_first() override
;
324 int seek_to_first(const string
&prefix
) override
;
325 int seek_to_last() override
;
326 int seek_to_last(const string
&prefix
) override
;
327 int upper_bound(const string
&prefix
, const string
&after
) override
;
328 int lower_bound(const string
&prefix
, const string
&to
) override
;
329 bool valid() override
;
332 string
key() override
;
333 pair
<string
,string
> raw_key() override
;
334 bool raw_key_is_prefixed(const string
&prefix
) override
;
335 bufferlist
value() override
;
336 bufferptr
value_as_ptr() override
;
337 int status() override
;
338 size_t key_size() override
;
339 size_t value_size() override
;
343 static string
combine_strings(const string
&prefix
, const string
&value
) {
349 static void combine_strings(const string
&prefix
,
350 const char *key
, size_t keylen
,
352 out
->reserve(prefix
.size() + 1 + keylen
);
355 out
->append(key
, keylen
);
358 static int split_key(rocksdb::Slice in
, string
*prefix
, string
*key
);
360 static bufferlist
to_bufferlist(rocksdb::Slice in
) {
362 bl
.append(bufferptr(in
.data(), in
.size()));
366 static string
past_prefix(const string
&prefix
);
368 class MergeOperatorRouter
;
369 friend class MergeOperatorRouter
;
370 int set_merge_operator(const std::string
& prefix
,
371 std::shared_ptr
<KeyValueDB::MergeOperator
> mop
) override
;
372 string assoc_name
; ///< Name of associative operator
374 uint64_t get_estimated_size(map
<string
,uint64_t> &extra
) override
{
375 DIR *store_dir
= opendir(path
.c_str());
377 lderr(cct
) << __func__
<< " something happened opening the store: "
378 << cpp_strerror(errno
) << dendl
;
382 uint64_t total_size
= 0;
383 uint64_t sst_size
= 0;
384 uint64_t log_size
= 0;
385 uint64_t misc_size
= 0;
387 struct dirent
*entry
= NULL
;
388 while ((entry
= readdir(store_dir
)) != NULL
) {
389 string
n(entry
->d_name
);
391 if (n
== "." || n
== "..")
394 string fpath
= path
+ '/' + n
;
396 int err
= stat(fpath
.c_str(), &s
);
399 // we may race against rocksdb while reading files; this should only
400 // happen when those files are being updated, data is being shuffled
401 // and files get removed, in which case there's not much of a problem
402 // as we'll get to them next time around.
403 if (err
== -ENOENT
) {
407 lderr(cct
) << __func__
<< " error obtaining stats for " << fpath
408 << ": " << cpp_strerror(err
) << dendl
;
412 size_t pos
= n
.find_last_of('.');
413 if (pos
== string::npos
) {
414 misc_size
+= s
.st_size
;
418 string ext
= n
.substr(pos
+1);
420 sst_size
+= s
.st_size
;
421 } else if (ext
== "log") {
422 log_size
+= s
.st_size
;
424 misc_size
+= s
.st_size
;
428 total_size
= sst_size
+ log_size
+ misc_size
;
430 extra
["sst"] = sst_size
;
431 extra
["log"] = log_size
;
432 extra
["misc"] = misc_size
;
433 extra
["total"] = total_size
;
440 int set_cache_size(uint64_t s
) override
{
446 WholeSpaceIterator
_get_iterator() override
;