1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #ifndef ROCKS_DB_STORE_H
4 #define ROCKS_DB_STORE_H
6 #include "include/types.h"
7 #include "include/buffer_fwd.h"
8 #include "KeyValueDB.h"
13 #include <boost/scoped_ptr.hpp>
14 #include "rocksdb/write_batch.h"
15 #include "rocksdb/perf_context.h"
16 #include "rocksdb/iostats_context.h"
17 #include "rocksdb/statistics.h"
18 #include "rocksdb/table.h"
20 #include "common/errno.h"
21 #include "common/dout.h"
22 #include "include/assert.h"
23 #include "common/Formatter.h"
24 #include "common/Cond.h"
26 #include "common/ceph_context.h"
30 l_rocksdb_first
= 34300,
34 l_rocksdb_get_latency
,
35 l_rocksdb_submit_latency
,
36 l_rocksdb_submit_sync_latency
,
38 l_rocksdb_compact_range
,
39 l_rocksdb_compact_queue_merge
,
40 l_rocksdb_compact_queue_len
,
41 l_rocksdb_write_wal_time
,
42 l_rocksdb_write_memtable_time
,
43 l_rocksdb_write_delay_time
,
44 l_rocksdb_write_pre_and_post_process_time
,
59 struct BlockBasedTableOptions
;
62 extern rocksdb::Logger
*create_rocksdb_ceph_logger();
65 * Uses RocksDB to implement the KeyValueDB interface
67 class RocksDBStore
: public KeyValueDB
{
74 std::shared_ptr
<rocksdb::Statistics
> dbstats
;
75 rocksdb::BlockBasedTableOptions bbt_opts
;
78 int do_open(ostream
&out
, bool create_if_missing
);
80 // manage async compactions
81 Mutex compact_queue_lock
;
82 Cond compact_queue_cond
;
83 list
< pair
<string
,string
> > compact_queue
;
84 bool compact_queue_stop
;
85 class CompactThread
: public Thread
{
88 explicit CompactThread(RocksDBStore
*d
) : db(d
) {}
89 void *entry() override
{
90 db
->compact_thread_entry();
93 friend class RocksDBStore
;
96 void compact_thread_entry();
98 void compact_range(const string
& start
, const string
& end
);
99 void compact_range_async(const string
& start
, const string
& end
);
102 /// compact the underlying rocksdb store
103 bool compact_on_mount
;
106 void compact() override
;
108 int tryInterpret(const string key
, const string val
, rocksdb::Options
&opt
);
109 int ParseOptionsFromString(const string opt_str
, rocksdb::Options
&opt
);
110 static int _test_init(const string
& dir
);
111 int init(string options_str
) override
;
112 /// compact rocksdb for all keys with a given prefix
113 void compact_prefix(const string
& prefix
) override
{
114 compact_range(prefix
, past_prefix(prefix
));
116 void compact_prefix_async(const string
& prefix
) override
{
117 compact_range_async(prefix
, past_prefix(prefix
));
120 void compact_range(const string
& prefix
, const string
& start
, const string
& end
) override
{
121 compact_range(combine_strings(prefix
, start
), combine_strings(prefix
, end
));
123 void compact_range_async(const string
& prefix
, const string
& start
, const string
& end
) override
{
124 compact_range_async(combine_strings(prefix
, start
), combine_strings(prefix
, end
));
127 RocksDBStore(CephContext
*c
, const string
&path
, void *p
) :
133 env(static_cast<rocksdb::Env
*>(p
)),
135 compact_queue_lock("RocksDBStore::compact_thread_lock"),
136 compact_queue_stop(false),
137 compact_thread(this),
138 compact_on_mount(false),
140 enable_rmrange(cct
->_conf
->rocksdb_enable_rmrange
)
143 ~RocksDBStore() override
;
145 static bool check_omap_dir(string
&omap_dir
);
146 /// Opens underlying db
147 int open(ostream
&out
) override
{
148 return do_open(out
, false);
150 /// Creates underlying db if missing and opens it
151 int create_and_open(ostream
&out
) override
;
153 void close() override
;
155 void split_stats(const std::string
&s
, char delim
, std::vector
<std::string
> &elems
);
156 void get_statistics(Formatter
*f
) override
;
158 struct RocksWBHandler
: public rocksdb::WriteBatch::Handler
{
161 static string
pretty_binary_string(const string
& in
) {
164 out
.reserve(in
.length() * 3);
165 enum { NONE
, HEX
, STRING
} mode
= NONE
;
166 unsigned from
= 0, i
;
167 for (i
=0; i
< in
.length(); ++i
) {
168 if ((in
[i
] < 32 || (unsigned char)in
[i
] > 126) ||
169 (mode
== HEX
&& in
.length() - i
>= 4 &&
170 ((in
[i
] < 32 || (unsigned char)in
[i
] > 126) ||
171 (in
[i
+1] < 32 || (unsigned char)in
[i
+1] > 126) ||
172 (in
[i
+2] < 32 || (unsigned char)in
[i
+2] > 126) ||
173 (in
[i
+3] < 32 || (unsigned char)in
[i
+3] > 126)))) {
175 if (mode
== STRING
) {
176 out
.append(in
.substr(from
, i
- from
));
183 if (in
.length() - i
>= 4) {
184 // print a whole u32 at once
185 snprintf(buf
, sizeof(buf
), "%08x",
186 (uint32_t)(((unsigned char)in
[i
] << 24) |
187 ((unsigned char)in
[i
+1] << 16) |
188 ((unsigned char)in
[i
+2] << 8) |
189 ((unsigned char)in
[i
+3] << 0)));
192 snprintf(buf
, sizeof(buf
), "%02x", (int)(unsigned char)in
[i
]);
196 if (mode
!= STRING
) {
203 if (mode
== STRING
) {
204 out
.append(in
.substr(from
, i
- from
));
209 void Put(const rocksdb::Slice
& key
,
210 const rocksdb::Slice
& value
) override
{
211 string
prefix ((key
.ToString()).substr(0,1));
212 string
key_to_decode ((key
.ToString()).substr(2,string::npos
));
213 uint64_t size
= (value
.ToString()).size();
214 seen
+= "\nPut( Prefix = " + prefix
+ " key = "
215 + pretty_binary_string(key_to_decode
)
216 + " Value size = " + std::to_string(size
) + ")";
219 void SingleDelete(const rocksdb::Slice
& key
) override
{
220 string
prefix ((key
.ToString()).substr(0,1));
221 string
key_to_decode ((key
.ToString()).substr(2,string::npos
));
222 seen
+= "\nSingleDelete(Prefix = "+ prefix
+ " Key = "
223 + pretty_binary_string(key_to_decode
) + ")";
226 void Delete(const rocksdb::Slice
& key
) override
{
227 string
prefix ((key
.ToString()).substr(0,1));
228 string
key_to_decode ((key
.ToString()).substr(2,string::npos
));
229 seen
+= "\nDelete( Prefix = " + prefix
+ " key = "
230 + pretty_binary_string(key_to_decode
) + ")";
234 void Merge(const rocksdb::Slice
& key
,
235 const rocksdb::Slice
& value
) override
{
236 string
prefix ((key
.ToString()).substr(0,1));
237 string
key_to_decode ((key
.ToString()).substr(2,string::npos
));
238 uint64_t size
= (value
.ToString()).size();
239 seen
+= "\nMerge( Prefix = " + prefix
+ " key = "
240 + pretty_binary_string(key_to_decode
) + " Value size = "
241 + std::to_string(size
) + ")";
245 bool Continue() override
{ return num_seen
< 50; }
250 class RocksDBTransactionImpl
: public KeyValueDB::TransactionImpl
{
252 rocksdb::WriteBatch bat
;
255 explicit RocksDBTransactionImpl(RocksDBStore
*_db
);
257 const string
&prefix
,
259 const bufferlist
&bl
) override
;
261 const string
&prefix
,
264 const bufferlist
&bl
) override
;
266 const string
&prefix
,
267 const string
&k
) override
;
269 const string
&prefix
,
271 size_t keylen
) override
;
273 const string
&prefix
,
274 const string
&k
) override
;
275 void rmkeys_by_prefix(
279 const string
&prefix
,
281 const string
&end
) override
;
283 const string
& prefix
,
285 const bufferlist
&bl
) override
;
288 KeyValueDB::Transaction
get_transaction() override
{
289 return std::make_shared
<RocksDBTransactionImpl
>(this);
292 int submit_transaction(KeyValueDB::Transaction t
) override
;
293 int submit_transaction_sync(KeyValueDB::Transaction t
) override
;
295 const string
&prefix
,
296 const std::set
<string
> &key
,
297 std::map
<string
, bufferlist
> *out
300 const string
&prefix
,
305 const string
&prefix
,
308 bufferlist
*out
) override
;
311 class RocksDBWholeSpaceIteratorImpl
:
312 public KeyValueDB::WholeSpaceIteratorImpl
{
314 rocksdb::Iterator
*dbiter
;
316 explicit RocksDBWholeSpaceIteratorImpl(rocksdb::Iterator
*iter
) :
318 //virtual ~RocksDBWholeSpaceIteratorImpl() { }
319 ~RocksDBWholeSpaceIteratorImpl() override
;
321 int seek_to_first() override
;
322 int seek_to_first(const string
&prefix
) override
;
323 int seek_to_last() override
;
324 int seek_to_last(const string
&prefix
) override
;
325 int upper_bound(const string
&prefix
, const string
&after
) override
;
326 int lower_bound(const string
&prefix
, const string
&to
) override
;
327 bool valid() override
;
330 string
key() override
;
331 pair
<string
,string
> raw_key() override
;
332 bool raw_key_is_prefixed(const string
&prefix
) override
;
333 bufferlist
value() override
;
334 bufferptr
value_as_ptr() override
;
335 int status() override
;
336 size_t key_size() override
;
337 size_t value_size() override
;
341 static string
combine_strings(const string
&prefix
, const string
&value
) {
347 static void combine_strings(const string
&prefix
,
348 const char *key
, size_t keylen
,
350 out
->reserve(prefix
.size() + 1 + keylen
);
353 out
->append(key
, keylen
);
356 static int split_key(rocksdb::Slice in
, string
*prefix
, string
*key
);
358 static bufferlist
to_bufferlist(rocksdb::Slice in
) {
360 bl
.append(bufferptr(in
.data(), in
.size()));
364 static string
past_prefix(const string
&prefix
);
366 class MergeOperatorRouter
;
367 friend class MergeOperatorRouter
;
368 int set_merge_operator(const std::string
& prefix
,
369 std::shared_ptr
<KeyValueDB::MergeOperator
> mop
) override
;
370 string assoc_name
; ///< Name of associative operator
372 uint64_t get_estimated_size(map
<string
,uint64_t> &extra
) override
{
373 DIR *store_dir
= opendir(path
.c_str());
375 lderr(cct
) << __func__
<< " something happened opening the store: "
376 << cpp_strerror(errno
) << dendl
;
380 uint64_t total_size
= 0;
381 uint64_t sst_size
= 0;
382 uint64_t log_size
= 0;
383 uint64_t misc_size
= 0;
385 struct dirent
*entry
= NULL
;
386 while ((entry
= readdir(store_dir
)) != NULL
) {
387 string
n(entry
->d_name
);
389 if (n
== "." || n
== "..")
392 string fpath
= path
+ '/' + n
;
394 int err
= stat(fpath
.c_str(), &s
);
397 // we may race against rocksdb while reading files; this should only
398 // happen when those files are being updated, data is being shuffled
399 // and files get removed, in which case there's not much of a problem
400 // as we'll get to them next time around.
401 if (err
== -ENOENT
) {
405 lderr(cct
) << __func__
<< " error obtaining stats for " << fpath
406 << ": " << cpp_strerror(err
) << dendl
;
410 size_t pos
= n
.find_last_of('.');
411 if (pos
== string::npos
) {
412 misc_size
+= s
.st_size
;
416 string ext
= n
.substr(pos
+1);
418 sst_size
+= s
.st_size
;
419 } else if (ext
== "log") {
420 log_size
+= s
.st_size
;
422 misc_size
+= s
.st_size
;
426 total_size
= sst_size
+ log_size
+ misc_size
;
428 extra
["sst"] = sst_size
;
429 extra
["log"] = log_size
;
430 extra
["misc"] = misc_size
;
431 extra
["total"] = total_size
;
440 WholeSpaceIterator
_get_iterator() override
;