1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #ifndef ROCKS_DB_STORE_H
4 #define ROCKS_DB_STORE_H
6 #include "include/types.h"
7 #include "include/buffer_fwd.h"
8 #include "KeyValueDB.h"
13 #include <boost/scoped_ptr.hpp>
14 #include "rocksdb/write_batch.h"
15 #include "rocksdb/perf_context.h"
16 #include "rocksdb/iostats_context.h"
17 #include "rocksdb/statistics.h"
18 #include "rocksdb/table.h"
19 #include "kv/rocksdb_cache/BinnedLRUCache.h"
21 #include "common/errno.h"
22 #include "common/dout.h"
23 #include "include/ceph_assert.h"
24 #include "include/common_fwd.h"
25 #include "common/Formatter.h"
26 #include "common/Cond.h"
27 #include "common/ceph_context.h"
28 #include "common/PriorityCache.h"
32 l_rocksdb_first
= 34300,
36 l_rocksdb_get_latency
,
37 l_rocksdb_submit_latency
,
38 l_rocksdb_submit_sync_latency
,
40 l_rocksdb_compact_range
,
41 l_rocksdb_compact_queue_merge
,
42 l_rocksdb_compact_queue_len
,
43 l_rocksdb_write_wal_time
,
44 l_rocksdb_write_memtable_time
,
45 l_rocksdb_write_delay_time
,
46 l_rocksdb_write_pre_and_post_process_time
,
60 class ColumnFamilyHandle
;
62 struct BlockBasedTableOptions
;
64 struct ColumnFamilyOptions
;
67 extern rocksdb::Logger
*create_rocksdb_ceph_logger();
70 * Uses RocksDB to implement the KeyValueDB interface
72 class RocksDBStore
: public KeyValueDB
{
76 map
<string
,string
> kv_options
;
80 std::shared_ptr
<rocksdb::Statistics
> dbstats
;
81 rocksdb::BlockBasedTableOptions bbt_opts
;
84 uint64_t cache_size
= 0;
85 bool set_cache_flag
= false;
87 bool must_close_default_cf
= false;
88 rocksdb::ColumnFamilyHandle
*default_cf
= nullptr;
90 int submit_common(rocksdb::WriteOptions
& woptions
, KeyValueDB::Transaction t
);
91 int install_cf_mergeop(const string
&cf_name
, rocksdb::ColumnFamilyOptions
*cf_opt
);
93 int do_open(ostream
&out
, bool create_if_missing
, bool open_readonly
,
94 const vector
<ColumnFamily
>* cfs
= nullptr);
95 int load_rocksdb_options(bool create_if_missing
, rocksdb::Options
& opt
);
97 // manage async compactions
98 ceph::mutex compact_queue_lock
=
99 ceph::make_mutex("RocksDBStore::compact_thread_lock");
100 ceph::condition_variable compact_queue_cond
;
101 list
< pair
<string
,string
> > compact_queue
;
102 bool compact_queue_stop
;
103 class CompactThread
: public Thread
{
106 explicit CompactThread(RocksDBStore
*d
) : db(d
) {}
107 void *entry() override
{
108 db
->compact_thread_entry();
111 friend class RocksDBStore
;
114 void compact_thread_entry();
116 void compact_range(const string
& start
, const string
& end
);
117 void compact_range_async(const string
& start
, const string
& end
);
118 int tryInterpret(const string
& key
, const string
& val
, rocksdb::Options
& opt
);
121 /// compact the underlying rocksdb store
122 bool compact_on_mount
;
124 const uint64_t delete_range_threshold
;
125 void compact() override
;
127 void compact_async() override
{
128 compact_range_async(string(), string());
131 int ParseOptionsFromString(const string
& opt_str
, rocksdb::Options
& opt
);
132 static int ParseOptionsFromStringStatic(
134 const string
& opt_str
,
135 rocksdb::Options
&opt
,
136 function
<int(const string
&, const string
&, rocksdb::Options
&)> interp
);
137 static int _test_init(const string
& dir
);
138 int init(string options_str
) override
;
139 /// compact rocksdb for all keys with a given prefix
140 void compact_prefix(const string
& prefix
) override
{
141 compact_range(prefix
, past_prefix(prefix
));
143 void compact_prefix_async(const string
& prefix
) override
{
144 compact_range_async(prefix
, past_prefix(prefix
));
147 void compact_range(const string
& prefix
, const string
& start
, const string
& end
) override
{
148 compact_range(combine_strings(prefix
, start
), combine_strings(prefix
, end
));
150 void compact_range_async(const string
& prefix
, const string
& start
, const string
& end
) override
{
151 compact_range_async(combine_strings(prefix
, start
), combine_strings(prefix
, end
));
154 RocksDBStore(CephContext
*c
, const string
&path
, map
<string
,string
> opt
, void *p
) :
161 env(static_cast<rocksdb::Env
*>(p
)),
163 compact_queue_stop(false),
164 compact_thread(this),
165 compact_on_mount(false),
167 delete_range_threshold(cct
->_conf
.get_val
<uint64_t>("rocksdb_delete_range_threshold"))
170 ~RocksDBStore() override
;
172 static bool check_omap_dir(string
&omap_dir
);
173 /// Opens underlying db
174 int open(ostream
&out
, const vector
<ColumnFamily
>& cfs
= {}) override
{
175 return do_open(out
, false, false, &cfs
);
177 /// Creates underlying db if missing and opens it
178 int create_and_open(ostream
&out
,
179 const vector
<ColumnFamily
>& cfs
= {}) override
;
181 int open_read_only(ostream
&out
, const vector
<ColumnFamily
>& cfs
= {}) override
{
182 return do_open(out
, false, true, &cfs
);
185 void close() override
;
187 rocksdb::ColumnFamilyHandle
*get_cf_handle(const std::string
& cf_name
) {
188 auto iter
= cf_handles
.find(cf_name
);
189 if (iter
== cf_handles
.end())
192 return static_cast<rocksdb::ColumnFamilyHandle
*>(iter
->second
);
194 int repair(std::ostream
&out
) override
;
195 void split_stats(const std::string
&s
, char delim
, std::vector
<std::string
> &elems
);
196 void get_statistics(Formatter
*f
) override
;
198 PerfCounters
*get_perf_counters() override
204 const std::string
&property
,
205 uint64_t *out
) final
;
207 int64_t estimate_prefix_size(const string
& prefix
,
208 const string
& key_prefix
) override
;
210 struct RocksWBHandler
: public rocksdb::WriteBatch::Handler
{
213 static string
pretty_binary_string(const string
& in
) {
216 out
.reserve(in
.length() * 3);
217 enum { NONE
, HEX
, STRING
} mode
= NONE
;
218 unsigned from
= 0, i
;
219 for (i
=0; i
< in
.length(); ++i
) {
220 if ((in
[i
] < 32 || (unsigned char)in
[i
] > 126) ||
221 (mode
== HEX
&& in
.length() - i
>= 4 &&
222 ((in
[i
] < 32 || (unsigned char)in
[i
] > 126) ||
223 (in
[i
+1] < 32 || (unsigned char)in
[i
+1] > 126) ||
224 (in
[i
+2] < 32 || (unsigned char)in
[i
+2] > 126) ||
225 (in
[i
+3] < 32 || (unsigned char)in
[i
+3] > 126)))) {
227 if (mode
== STRING
) {
228 out
.append(in
.substr(from
, i
- from
));
235 if (in
.length() - i
>= 4) {
236 // print a whole u32 at once
237 snprintf(buf
, sizeof(buf
), "%08x",
238 (uint32_t)(((unsigned char)in
[i
] << 24) |
239 ((unsigned char)in
[i
+1] << 16) |
240 ((unsigned char)in
[i
+2] << 8) |
241 ((unsigned char)in
[i
+3] << 0)));
244 snprintf(buf
, sizeof(buf
), "%02x", (int)(unsigned char)in
[i
]);
248 if (mode
!= STRING
) {
255 if (mode
== STRING
) {
256 out
.append(in
.substr(from
, i
- from
));
261 void Put(const rocksdb::Slice
& key
,
262 const rocksdb::Slice
& value
) override
{
263 string
prefix ((key
.ToString()).substr(0,1));
264 string
key_to_decode ((key
.ToString()).substr(2,string::npos
));
265 uint64_t size
= (value
.ToString()).size();
266 seen
+= "\nPut( Prefix = " + prefix
+ " key = "
267 + pretty_binary_string(key_to_decode
)
268 + " Value size = " + std::to_string(size
) + ")";
271 void SingleDelete(const rocksdb::Slice
& key
) override
{
272 string
prefix ((key
.ToString()).substr(0,1));
273 string
key_to_decode ((key
.ToString()).substr(2,string::npos
));
274 seen
+= "\nSingleDelete(Prefix = "+ prefix
+ " Key = "
275 + pretty_binary_string(key_to_decode
) + ")";
278 void Delete(const rocksdb::Slice
& key
) override
{
279 string
prefix ((key
.ToString()).substr(0,1));
280 string
key_to_decode ((key
.ToString()).substr(2,string::npos
));
281 seen
+= "\nDelete( Prefix = " + prefix
+ " key = "
282 + pretty_binary_string(key_to_decode
) + ")";
286 void Merge(const rocksdb::Slice
& key
,
287 const rocksdb::Slice
& value
) override
{
288 string
prefix ((key
.ToString()).substr(0,1));
289 string
key_to_decode ((key
.ToString()).substr(2,string::npos
));
290 uint64_t size
= (value
.ToString()).size();
291 seen
+= "\nMerge( Prefix = " + prefix
+ " key = "
292 + pretty_binary_string(key_to_decode
) + " Value size = "
293 + std::to_string(size
) + ")";
297 bool Continue() override
{ return num_seen
< 50; }
301 class RocksDBTransactionImpl
: public KeyValueDB::TransactionImpl
{
303 rocksdb::WriteBatch bat
;
306 explicit RocksDBTransactionImpl(RocksDBStore
*_db
);
309 rocksdb::WriteBatch
& bat
,
310 rocksdb::ColumnFamilyHandle
*cf
,
312 const bufferlist
&to_set_bl
);
315 const string
&prefix
,
317 const bufferlist
&bl
) override
;
319 const string
&prefix
,
322 const bufferlist
&bl
) override
;
324 const string
&prefix
,
325 const string
&k
) override
;
327 const string
&prefix
,
329 size_t keylen
) override
;
331 const string
&prefix
,
332 const string
&k
) override
;
333 void rmkeys_by_prefix(
337 const string
&prefix
,
339 const string
&end
) override
;
341 const string
& prefix
,
343 const bufferlist
&bl
) override
;
346 KeyValueDB::Transaction
get_transaction() override
{
347 return std::make_shared
<RocksDBTransactionImpl
>(this);
350 int submit_transaction(KeyValueDB::Transaction t
) override
;
351 int submit_transaction_sync(KeyValueDB::Transaction t
) override
;
353 const string
&prefix
,
354 const std::set
<string
> &key
,
355 std::map
<string
, bufferlist
> *out
358 const string
&prefix
,
363 const string
&prefix
,
366 bufferlist
*out
) override
;
369 class RocksDBWholeSpaceIteratorImpl
:
370 public KeyValueDB::WholeSpaceIteratorImpl
{
372 rocksdb::Iterator
*dbiter
;
374 explicit RocksDBWholeSpaceIteratorImpl(rocksdb::Iterator
*iter
) :
376 //virtual ~RocksDBWholeSpaceIteratorImpl() { }
377 ~RocksDBWholeSpaceIteratorImpl() override
;
379 int seek_to_first() override
;
380 int seek_to_first(const string
&prefix
) override
;
381 int seek_to_last() override
;
382 int seek_to_last(const string
&prefix
) override
;
383 int upper_bound(const string
&prefix
, const string
&after
) override
;
384 int lower_bound(const string
&prefix
, const string
&to
) override
;
385 bool valid() override
;
388 string
key() override
;
389 pair
<string
,string
> raw_key() override
;
390 bool raw_key_is_prefixed(const string
&prefix
) override
;
391 bufferlist
value() override
;
392 bufferptr
value_as_ptr() override
;
393 int status() override
;
394 size_t key_size() override
;
395 size_t value_size() override
;
398 Iterator
get_iterator(const std::string
& prefix
) override
;
401 static string
combine_strings(const string
&prefix
, const string
&value
) {
407 static void combine_strings(const string
&prefix
,
408 const char *key
, size_t keylen
,
410 out
->reserve(prefix
.size() + 1 + keylen
);
413 out
->append(key
, keylen
);
416 static int split_key(rocksdb::Slice in
, string
*prefix
, string
*key
);
418 static string
past_prefix(const string
&prefix
);
420 class MergeOperatorRouter
;
421 class MergeOperatorLinker
;
422 friend class MergeOperatorRouter
;
423 int set_merge_operator(
424 const std::string
& prefix
,
425 std::shared_ptr
<KeyValueDB::MergeOperator
> mop
) override
;
426 string assoc_name
; ///< Name of associative operator
428 uint64_t get_estimated_size(map
<string
,uint64_t> &extra
) override
{
429 DIR *store_dir
= opendir(path
.c_str());
431 lderr(cct
) << __func__
<< " something happened opening the store: "
432 << cpp_strerror(errno
) << dendl
;
436 uint64_t total_size
= 0;
437 uint64_t sst_size
= 0;
438 uint64_t log_size
= 0;
439 uint64_t misc_size
= 0;
441 struct dirent
*entry
= NULL
;
442 while ((entry
= readdir(store_dir
)) != NULL
) {
443 string
n(entry
->d_name
);
445 if (n
== "." || n
== "..")
448 string fpath
= path
+ '/' + n
;
450 int err
= stat(fpath
.c_str(), &s
);
453 // we may race against rocksdb while reading files; this should only
454 // happen when those files are being updated, data is being shuffled
455 // and files get removed, in which case there's not much of a problem
456 // as we'll get to them next time around.
457 if (err
== -ENOENT
) {
461 lderr(cct
) << __func__
<< " error obtaining stats for " << fpath
462 << ": " << cpp_strerror(err
) << dendl
;
466 size_t pos
= n
.find_last_of('.');
467 if (pos
== string::npos
) {
468 misc_size
+= s
.st_size
;
472 string ext
= n
.substr(pos
+1);
474 sst_size
+= s
.st_size
;
475 } else if (ext
== "log") {
476 log_size
+= s
.st_size
;
478 misc_size
+= s
.st_size
;
482 total_size
= sst_size
+ log_size
+ misc_size
;
484 extra
["sst"] = sst_size
;
485 extra
["log"] = log_size
;
486 extra
["misc"] = misc_size
;
487 extra
["total"] = total_size
;
494 virtual int64_t get_cache_usage() const override
{
495 return static_cast<int64_t>(bbt_opts
.block_cache
->GetUsage());
498 int set_cache_size(uint64_t s
) override
{
500 set_cache_flag
= true;
504 virtual std::shared_ptr
<PriorityCache::PriCache
> get_priority_cache()
506 return dynamic_pointer_cast
<PriorityCache::PriCache
>(
507 bbt_opts
.block_cache
);
510 WholeSpaceIterator
get_wholespace_iterator() override
;