1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
10 #include <sys/types.h>
13 #include "rocksdb/db.h"
14 #include "rocksdb/table.h"
15 #include "rocksdb/env.h"
16 #include "rocksdb/slice.h"
17 #include "rocksdb/cache.h"
18 #include "rocksdb/filter_policy.h"
19 #include "rocksdb/utilities/convenience.h"
20 #include "rocksdb/merge_operator.h"
22 #include "common/perf_counters.h"
23 #include "common/debug.h"
24 #include "include/str_list.h"
25 #include "include/stringify.h"
26 #include "include/str_map.h"
27 #include "KeyValueDB.h"
28 #include "RocksDBStore.h"
30 #include "common/debug.h"
32 #define dout_context cct
33 #define dout_subsys ceph_subsys_rocksdb
35 #define dout_prefix *_dout << "rocksdb: "
38 // One of these per rocksdb instance, implements the merge operator prefix stuff
40 class RocksDBStore::MergeOperatorRouter
: public rocksdb::AssociativeMergeOperator
{
43 const char *Name() const override
{
44 // Construct a name that rocksDB will validate against. We want to
45 // do this in a way that doesn't constrain the ordering of calls
46 // to set_merge_operator, so sort the merge operators and then
47 // construct a name from all of those parts.
48 store
.assoc_name
.clear();
49 map
<std::string
,std::string
> names
;
50 for (auto& p
: store
.merge_ops
) names
[p
.first
] = p
.second
->name();
51 for (auto& p
: names
) {
52 store
.assoc_name
+= '.';
53 store
.assoc_name
+= p
.first
;
54 store
.assoc_name
+= ':';
55 store
.assoc_name
+= p
.second
;
57 return store
.assoc_name
.c_str();
60 MergeOperatorRouter(RocksDBStore
&_store
) : store(_store
) {}
62 bool Merge(const rocksdb::Slice
& key
,
63 const rocksdb::Slice
* existing_value
,
64 const rocksdb::Slice
& value
,
65 std::string
* new_value
,
66 rocksdb::Logger
* logger
) const override
{
68 for (auto& p
: store
.merge_ops
) {
69 if (p
.first
.compare(0, p
.first
.length(),
70 key
.data(), p
.first
.length()) == 0 &&
71 key
.data()[p
.first
.length()] == 0) {
73 p
.second
->merge(existing_value
->data(), existing_value
->size(),
74 value
.data(), value
.size(),
77 p
.second
->merge_nonexistent(value
.data(), value
.size(), new_value
);
87 int RocksDBStore::set_merge_operator(
89 std::shared_ptr
<KeyValueDB::MergeOperator
> mop
)
91 // If you fail here, it's because you can't do this on an open database
92 assert(db
== nullptr);
93 merge_ops
.push_back(std::make_pair(prefix
,mop
));
97 class CephRocksdbLogger
: public rocksdb::Logger
{
100 explicit CephRocksdbLogger(CephContext
*c
) : cct(c
) {
103 ~CephRocksdbLogger() override
{
107 // Write an entry to the log file with the specified format.
108 void Logv(const char* format
, va_list ap
) override
{
109 Logv(rocksdb::INFO_LEVEL
, format
, ap
);
112 // Write an entry to the log file with the specified log level
113 // and format. Any log with level under the internal log level
114 // of *this (see @SetInfoLogLevel and @GetInfoLogLevel) will not be
116 void Logv(const rocksdb::InfoLogLevel log_level
, const char* format
,
117 va_list ap
) override
{
118 int v
= rocksdb::NUM_INFO_LOG_LEVELS
- log_level
- 1;
121 vsnprintf(buf
, sizeof(buf
), format
, ap
);
122 *_dout
<< buf
<< dendl
;
126 rocksdb::Logger
*create_rocksdb_ceph_logger()
128 return new CephRocksdbLogger(g_ceph_context
);
131 int string2bool(string val
, bool &b_val
)
133 if (strcasecmp(val
.c_str(), "false") == 0) {
136 } else if (strcasecmp(val
.c_str(), "true") == 0) {
141 int b
= strict_strtol(val
.c_str(), 10, &err
);
149 int RocksDBStore::tryInterpret(const string key
, const string val
, rocksdb::Options
&opt
)
151 if (key
== "compaction_threads") {
153 int f
= strict_sistrtoll(val
.c_str(), &err
);
156 //Low priority threadpool is used for compaction
157 opt
.env
->SetBackgroundThreads(f
, rocksdb::Env::Priority::LOW
);
158 } else if (key
== "flusher_threads") {
160 int f
= strict_sistrtoll(val
.c_str(), &err
);
163 //High priority threadpool is used for flusher
164 opt
.env
->SetBackgroundThreads(f
, rocksdb::Env::Priority::HIGH
);
165 } else if (key
== "compact_on_mount") {
166 int ret
= string2bool(val
, compact_on_mount
);
169 } else if (key
== "disableWAL") {
170 int ret
= string2bool(val
, disableWAL
);
174 //unrecognize config options.
180 int RocksDBStore::ParseOptionsFromString(const string opt_str
, rocksdb::Options
&opt
)
182 map
<string
, string
> str_map
;
183 int r
= get_str_map(opt_str
, &str_map
, ",\n;");
186 map
<string
, string
>::iterator it
;
187 for(it
= str_map
.begin(); it
!= str_map
.end(); ++it
) {
188 string this_opt
= it
->first
+ "=" + it
->second
;
189 rocksdb::Status status
= rocksdb::GetOptionsFromString(opt
, this_opt
, &opt
);
191 //unrecognized by rocksdb, try to interpret by ourselves.
192 r
= tryInterpret(it
->first
, it
->second
, opt
);
194 derr
<< status
.ToString() << dendl
;
198 lgeneric_dout(cct
, 0) << " set rocksdb option " << it
->first
199 << " = " << it
->second
<< dendl
;
204 int RocksDBStore::init(string _options_str
)
206 options_str
= _options_str
;
207 rocksdb::Options opt
;
209 if (options_str
.length()) {
210 int r
= ParseOptionsFromString(options_str
, opt
);
218 int RocksDBStore::create_and_open(ostream
&out
)
221 unique_ptr
<rocksdb::Directory
> dir
;
222 env
->NewDirectory(path
, &dir
);
224 int r
= ::mkdir(path
.c_str(), 0755);
227 if (r
< 0 && r
!= -EEXIST
) {
228 derr
<< __func__
<< " failed to create " << path
<< ": " << cpp_strerror(r
)
233 return do_open(out
, true);
236 int RocksDBStore::do_open(ostream
&out
, bool create_if_missing
)
238 rocksdb::Options opt
;
239 rocksdb::Status status
;
241 if (options_str
.length()) {
242 int r
= ParseOptionsFromString(options_str
, opt
);
248 if (g_conf
->rocksdb_perf
) {
249 dbstats
= rocksdb::CreateDBStatistics();
250 opt
.statistics
= dbstats
;
253 opt
.create_if_missing
= create_if_missing
;
254 if (g_conf
->rocksdb_separate_wal_dir
) {
255 opt
.wal_dir
= path
+ ".wal";
257 if (g_conf
->get_val
<std::string
>("rocksdb_db_paths").length()) {
259 get_str_list(g_conf
->get_val
<std::string
>("rocksdb_db_paths"), "; \t", paths
);
260 for (auto& p
: paths
) {
261 size_t pos
= p
.find(',');
262 if (pos
== std::string::npos
) {
263 derr
<< __func__
<< " invalid db path item " << p
<< " in "
264 << g_conf
->get_val
<std::string
>("rocksdb_db_paths") << dendl
;
267 string path
= p
.substr(0, pos
);
268 string size_str
= p
.substr(pos
+ 1);
269 uint64_t size
= atoll(size_str
.c_str());
271 derr
<< __func__
<< " invalid db path item " << p
<< " in "
272 << g_conf
->get_val
<std::string
>("rocksdb_db_paths") << dendl
;
275 opt
.db_paths
.push_back(rocksdb::DbPath(path
, size
));
276 dout(10) << __func__
<< " db_path " << path
<< " size " << size
<< dendl
;
280 if (g_conf
->rocksdb_log_to_ceph_log
) {
281 opt
.info_log
.reset(new CephRocksdbLogger(g_ceph_context
));
285 dout(10) << __func__
<< " using custom Env " << priv
<< dendl
;
286 opt
.env
= static_cast<rocksdb::Env
*>(priv
);
289 auto cache
= rocksdb::NewLRUCache(g_conf
->rocksdb_cache_size
, g_conf
->rocksdb_cache_shard_bits
);
290 bbt_opts
.block_size
= g_conf
->rocksdb_block_size
;
291 bbt_opts
.block_cache
= cache
;
292 if (g_conf
->kstore_rocksdb_bloom_bits_per_key
> 0) {
293 dout(10) << __func__
<< " set bloom filter bits per key to "
294 << g_conf
->kstore_rocksdb_bloom_bits_per_key
<< dendl
;
295 bbt_opts
.filter_policy
.reset(rocksdb::NewBloomFilterPolicy(g_conf
->kstore_rocksdb_bloom_bits_per_key
));
297 opt
.table_factory
.reset(rocksdb::NewBlockBasedTableFactory(bbt_opts
));
298 dout(10) << __func__
<< " set block size to " << g_conf
->rocksdb_block_size
299 << " cache size to " << g_conf
->rocksdb_cache_size
300 << " num of cache shards to " << (1 << g_conf
->rocksdb_cache_shard_bits
) << dendl
;
302 opt
.merge_operator
.reset(new MergeOperatorRouter(*this));
303 status
= rocksdb::DB::Open(opt
, path
, &db
);
305 derr
<< status
.ToString() << dendl
;
309 PerfCountersBuilder
plb(g_ceph_context
, "rocksdb", l_rocksdb_first
, l_rocksdb_last
);
310 plb
.add_u64_counter(l_rocksdb_gets
, "get", "Gets");
311 plb
.add_u64_counter(l_rocksdb_txns
, "submit_transaction", "Submit transactions");
312 plb
.add_u64_counter(l_rocksdb_txns_sync
, "submit_transaction_sync", "Submit transactions sync");
313 plb
.add_time_avg(l_rocksdb_get_latency
, "get_latency", "Get latency");
314 plb
.add_time_avg(l_rocksdb_submit_latency
, "submit_latency", "Submit Latency");
315 plb
.add_time_avg(l_rocksdb_submit_sync_latency
, "submit_sync_latency", "Submit Sync Latency");
316 plb
.add_u64_counter(l_rocksdb_compact
, "compact", "Compactions");
317 plb
.add_u64_counter(l_rocksdb_compact_range
, "compact_range", "Compactions by range");
318 plb
.add_u64_counter(l_rocksdb_compact_queue_merge
, "compact_queue_merge", "Mergings of ranges in compaction queue");
319 plb
.add_u64(l_rocksdb_compact_queue_len
, "compact_queue_len", "Length of compaction queue");
320 plb
.add_time_avg(l_rocksdb_write_wal_time
, "rocksdb_write_wal_time", "Rocksdb write wal time");
321 plb
.add_time_avg(l_rocksdb_write_memtable_time
, "rocksdb_write_memtable_time", "Rocksdb write memtable time");
322 plb
.add_time_avg(l_rocksdb_write_delay_time
, "rocksdb_write_delay_time", "Rocksdb write delay time");
323 plb
.add_time_avg(l_rocksdb_write_pre_and_post_process_time
,
324 "rocksdb_write_pre_and_post_time", "total time spent on writing a record, excluding write process");
325 logger
= plb
.create_perf_counters();
326 cct
->get_perfcounters_collection()->add(logger
);
328 if (compact_on_mount
) {
329 derr
<< "Compacting rocksdb store..." << dendl
;
331 derr
<< "Finished compacting rocksdb store" << dendl
;
336 int RocksDBStore::_test_init(const string
& dir
)
338 rocksdb::Options options
;
339 options
.create_if_missing
= true;
341 rocksdb::Status status
= rocksdb::DB::Open(options
, dir
, &db
);
344 return status
.ok() ? 0 : -EIO
;
347 RocksDBStore::~RocksDBStore()
352 // Ensure db is destroyed before dependent db_cache and filterpolicy
357 delete static_cast<rocksdb::Env
*>(priv
);
361 void RocksDBStore::close()
363 // stop compaction thread
364 compact_queue_lock
.Lock();
365 if (compact_thread
.is_started()) {
366 compact_queue_stop
= true;
367 compact_queue_cond
.Signal();
368 compact_queue_lock
.Unlock();
369 compact_thread
.join();
371 compact_queue_lock
.Unlock();
375 cct
->get_perfcounters_collection()->remove(logger
);
378 void RocksDBStore::split_stats(const std::string
&s
, char delim
, std::vector
<std::string
> &elems
) {
379 std::stringstream ss
;
382 while (std::getline(ss
, item
, delim
)) {
383 elems
.push_back(item
);
387 void RocksDBStore::get_statistics(Formatter
*f
)
389 if (!g_conf
->rocksdb_perf
) {
390 dout(20) << __func__
<< "RocksDB perf is disabled, can't probe for stats"
395 if (g_conf
->rocksdb_collect_compaction_stats
) {
396 std::string stat_str
;
397 bool status
= db
->GetProperty("rocksdb.stats", &stat_str
);
399 f
->open_object_section("rocksdb_statistics");
400 f
->dump_string("rocksdb_compaction_statistics", "");
401 vector
<string
> stats
;
402 split_stats(stat_str
, '\n', stats
);
403 for (auto st
:stats
) {
404 f
->dump_string("", st
);
409 if (g_conf
->rocksdb_collect_extended_stats
) {
411 f
->open_object_section("rocksdb_extended_statistics");
412 string stat_str
= dbstats
->ToString();
413 vector
<string
> stats
;
414 split_stats(stat_str
, '\n', stats
);
415 f
->dump_string("rocksdb_extended_statistics", "");
416 for (auto st
:stats
) {
417 f
->dump_string(".", st
);
421 f
->open_object_section("rocksdbstore_perf_counters");
422 logger
->dump_formatted(f
,0);
425 if (g_conf
->rocksdb_collect_memory_stats
) {
426 f
->open_object_section("rocksdb_memtable_statistics");
427 std::string
str(stringify(bbt_opts
.block_cache
->GetUsage()));
428 f
->dump_string("block_cache_usage", str
.data());
430 str
.append(stringify(bbt_opts
.block_cache
->GetPinnedUsage()));
431 f
->dump_string("block_cache_pinned_blocks_usage", str
);
433 db
->GetProperty("rocksdb.cur-size-all-mem-tables", &str
);
434 f
->dump_string("rocksdb_memtable_usage", str
);
439 int RocksDBStore::submit_transaction(KeyValueDB::Transaction t
)
441 utime_t start
= ceph_clock_now();
442 // enable rocksdb breakdown
443 // considering performance overhead, default is disabled
444 if (g_conf
->rocksdb_perf
) {
445 rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex
);
446 rocksdb::perf_context
.Reset();
449 RocksDBTransactionImpl
* _t
=
450 static_cast<RocksDBTransactionImpl
*>(t
.get());
451 rocksdb::WriteOptions woptions
;
452 woptions
.disableWAL
= disableWAL
;
453 lgeneric_subdout(cct
, rocksdb
, 30) << __func__
;
454 RocksWBHandler bat_txc
;
455 _t
->bat
.Iterate(&bat_txc
);
456 *_dout
<< " Rocksdb transaction: " << bat_txc
.seen
<< dendl
;
458 rocksdb::Status s
= db
->Write(woptions
, &_t
->bat
);
460 RocksWBHandler rocks_txc
;
461 _t
->bat
.Iterate(&rocks_txc
);
462 derr
<< __func__
<< " error: " << s
.ToString() << " code = " << s
.code()
463 << " Rocksdb transaction: " << rocks_txc
.seen
<< dendl
;
465 utime_t lat
= ceph_clock_now() - start
;
467 if (g_conf
->rocksdb_perf
) {
468 utime_t write_memtable_time
;
469 utime_t write_delay_time
;
470 utime_t write_wal_time
;
471 utime_t write_pre_and_post_process_time
;
472 write_wal_time
.set_from_double(
473 static_cast<double>(rocksdb::perf_context
.write_wal_time
)/1000000000);
474 write_memtable_time
.set_from_double(
475 static_cast<double>(rocksdb::perf_context
.write_memtable_time
)/1000000000);
476 write_delay_time
.set_from_double(
477 static_cast<double>(rocksdb::perf_context
.write_delay_time
)/1000000000);
478 write_pre_and_post_process_time
.set_from_double(
479 static_cast<double>(rocksdb::perf_context
.write_pre_and_post_process_time
)/1000000000);
480 logger
->tinc(l_rocksdb_write_memtable_time
, write_memtable_time
);
481 logger
->tinc(l_rocksdb_write_delay_time
, write_delay_time
);
482 logger
->tinc(l_rocksdb_write_wal_time
, write_wal_time
);
483 logger
->tinc(l_rocksdb_write_pre_and_post_process_time
, write_pre_and_post_process_time
);
486 logger
->inc(l_rocksdb_txns
);
487 logger
->tinc(l_rocksdb_submit_latency
, lat
);
489 return s
.ok() ? 0 : -1;
492 int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t
)
494 utime_t start
= ceph_clock_now();
495 // enable rocksdb breakdown
496 // considering performance overhead, default is disabled
497 if (g_conf
->rocksdb_perf
) {
498 rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex
);
499 rocksdb::perf_context
.Reset();
502 RocksDBTransactionImpl
* _t
=
503 static_cast<RocksDBTransactionImpl
*>(t
.get());
504 rocksdb::WriteOptions woptions
;
505 woptions
.sync
= true;
506 woptions
.disableWAL
= disableWAL
;
507 lgeneric_subdout(cct
, rocksdb
, 30) << __func__
;
508 RocksWBHandler bat_txc
;
509 _t
->bat
.Iterate(&bat_txc
);
510 *_dout
<< " Rocksdb transaction: " << bat_txc
.seen
<< dendl
;
512 rocksdb::Status s
= db
->Write(woptions
, &_t
->bat
);
514 RocksWBHandler rocks_txc
;
515 _t
->bat
.Iterate(&rocks_txc
);
516 derr
<< __func__
<< " error: " << s
.ToString() << " code = " << s
.code()
517 << " Rocksdb transaction: " << rocks_txc
.seen
<< dendl
;
519 utime_t lat
= ceph_clock_now() - start
;
521 if (g_conf
->rocksdb_perf
) {
522 utime_t write_memtable_time
;
523 utime_t write_delay_time
;
524 utime_t write_wal_time
;
525 utime_t write_pre_and_post_process_time
;
526 write_wal_time
.set_from_double(
527 static_cast<double>(rocksdb::perf_context
.write_wal_time
)/1000000000);
528 write_memtable_time
.set_from_double(
529 static_cast<double>(rocksdb::perf_context
.write_memtable_time
)/1000000000);
530 write_delay_time
.set_from_double(
531 static_cast<double>(rocksdb::perf_context
.write_delay_time
)/1000000000);
532 write_pre_and_post_process_time
.set_from_double(
533 static_cast<double>(rocksdb::perf_context
.write_pre_and_post_process_time
)/1000000000);
534 logger
->tinc(l_rocksdb_write_memtable_time
, write_memtable_time
);
535 logger
->tinc(l_rocksdb_write_delay_time
, write_delay_time
);
536 logger
->tinc(l_rocksdb_write_wal_time
, write_wal_time
);
537 logger
->tinc(l_rocksdb_write_pre_and_post_process_time
, write_pre_and_post_process_time
);
540 logger
->inc(l_rocksdb_txns_sync
);
541 logger
->tinc(l_rocksdb_submit_sync_latency
, lat
);
543 return s
.ok() ? 0 : -1;
546 RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore
*_db
)
551 void RocksDBStore::RocksDBTransactionImpl::set(
552 const string
&prefix
,
554 const bufferlist
&to_set_bl
)
556 string key
= combine_strings(prefix
, k
);
558 // bufferlist::c_str() is non-constant, so we can't call c_str()
559 if (to_set_bl
.is_contiguous() && to_set_bl
.length() > 0) {
560 bat
.Put(rocksdb::Slice(key
),
561 rocksdb::Slice(to_set_bl
.buffers().front().c_str(),
562 to_set_bl
.length()));
565 bufferlist val
= to_set_bl
;
566 bat
.Put(rocksdb::Slice(key
),
567 rocksdb::Slice(val
.c_str(), val
.length()));
571 void RocksDBStore::RocksDBTransactionImpl::set(
572 const string
&prefix
,
573 const char *k
, size_t keylen
,
574 const bufferlist
&to_set_bl
)
577 combine_strings(prefix
, k
, keylen
, &key
);
579 // bufferlist::c_str() is non-constant, so we can't call c_str()
580 if (to_set_bl
.is_contiguous() && to_set_bl
.length() > 0) {
581 bat
.Put(rocksdb::Slice(key
),
582 rocksdb::Slice(to_set_bl
.buffers().front().c_str(),
583 to_set_bl
.length()));
586 bufferlist val
= to_set_bl
;
587 bat
.Put(rocksdb::Slice(key
),
588 rocksdb::Slice(val
.c_str(), val
.length()));
592 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string
&prefix
,
595 bat
.Delete(combine_strings(prefix
, k
));
598 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string
&prefix
,
603 combine_strings(prefix
, k
, keylen
, &key
);
607 void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string
&prefix
,
610 bat
.SingleDelete(combine_strings(prefix
, k
));
613 void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string
&prefix
)
615 KeyValueDB::Iterator it
= db
->get_iterator(prefix
);
616 for (it
->seek_to_first();
619 bat
.Delete(combine_strings(prefix
, it
->key()));
623 void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string
&prefix
,
627 if (db
->enable_rmrange
) {
628 bat
.DeleteRange(combine_strings(prefix
, start
), combine_strings(prefix
, end
));
630 auto it
= db
->get_iterator(prefix
);
631 it
->lower_bound(start
);
632 while (it
->valid()) {
633 if (it
->key() >= end
) {
636 bat
.Delete(combine_strings(prefix
, it
->key()));
642 void RocksDBStore::RocksDBTransactionImpl::merge(
643 const string
&prefix
,
645 const bufferlist
&to_set_bl
)
647 string key
= combine_strings(prefix
, k
);
649 // bufferlist::c_str() is non-constant, so we can't call c_str()
650 if (to_set_bl
.is_contiguous() && to_set_bl
.length() > 0) {
651 bat
.Merge(rocksdb::Slice(key
),
652 rocksdb::Slice(to_set_bl
.buffers().front().c_str(),
653 to_set_bl
.length()));
656 bufferlist val
= to_set_bl
;
657 bat
.Merge(rocksdb::Slice(key
),
658 rocksdb::Slice(val
.c_str(), val
.length()));
662 //gets will bypass RocksDB row cache, since it uses iterator
663 int RocksDBStore::get(
664 const string
&prefix
,
665 const std::set
<string
> &keys
,
666 std::map
<string
, bufferlist
> *out
)
668 utime_t start
= ceph_clock_now();
669 for (std::set
<string
>::const_iterator i
= keys
.begin();
670 i
!= keys
.end(); ++i
) {
672 std::string bound
= combine_strings(prefix
, *i
);
673 auto status
= db
->Get(rocksdb::ReadOptions(), rocksdb::Slice(bound
), &value
);
675 (*out
)[*i
].append(value
);
677 utime_t lat
= ceph_clock_now() - start
;
678 logger
->inc(l_rocksdb_gets
);
679 logger
->tinc(l_rocksdb_get_latency
, lat
);
683 int RocksDBStore::get(
684 const string
&prefix
,
688 assert(out
&& (out
->length() == 0));
689 utime_t start
= ceph_clock_now();
693 k
= combine_strings(prefix
, key
);
694 s
= db
->Get(rocksdb::ReadOptions(), rocksdb::Slice(k
), &value
);
700 utime_t lat
= ceph_clock_now() - start
;
701 logger
->inc(l_rocksdb_gets
);
702 logger
->tinc(l_rocksdb_get_latency
, lat
);
706 int RocksDBStore::get(
707 const string
& prefix
,
712 assert(out
&& (out
->length() == 0));
713 utime_t start
= ceph_clock_now();
716 combine_strings(prefix
, key
, keylen
, &k
);
718 s
= db
->Get(rocksdb::ReadOptions(), rocksdb::Slice(k
), &value
);
724 utime_t lat
= ceph_clock_now() - start
;
725 logger
->inc(l_rocksdb_gets
);
726 logger
->tinc(l_rocksdb_get_latency
, lat
);
730 int RocksDBStore::split_key(rocksdb::Slice in
, string
*prefix
, string
*key
)
732 size_t prefix_len
= 0;
734 // Find separator inside Slice
735 char* separator
= (char*) memchr(in
.data(), 0, in
.size());
736 if (separator
== NULL
)
738 prefix_len
= size_t(separator
- in
.data());
739 if (prefix_len
>= in
.size())
742 // Fetch prefix and/or key directly from Slice
744 *prefix
= string(in
.data(), prefix_len
);
746 *key
= string(separator
+1, in
.size()-prefix_len
-1);
750 void RocksDBStore::compact()
752 logger
->inc(l_rocksdb_compact
);
753 rocksdb::CompactRangeOptions options
;
754 db
->CompactRange(options
, nullptr, nullptr);
758 void RocksDBStore::compact_thread_entry()
760 compact_queue_lock
.Lock();
761 while (!compact_queue_stop
) {
762 while (!compact_queue
.empty()) {
763 pair
<string
,string
> range
= compact_queue
.front();
764 compact_queue
.pop_front();
765 logger
->set(l_rocksdb_compact_queue_len
, compact_queue
.size());
766 compact_queue_lock
.Unlock();
767 logger
->inc(l_rocksdb_compact_range
);
768 compact_range(range
.first
, range
.second
);
769 compact_queue_lock
.Lock();
772 compact_queue_cond
.Wait(compact_queue_lock
);
774 compact_queue_lock
.Unlock();
777 void RocksDBStore::compact_range_async(const string
& start
, const string
& end
)
779 Mutex::Locker
l(compact_queue_lock
);
781 // try to merge adjacent ranges. this is O(n), but the queue should
782 // be short. note that we do not cover all overlap cases and merge
783 // opportunities here, but we capture the ones we currently need.
784 list
< pair
<string
,string
> >::iterator p
= compact_queue
.begin();
785 while (p
!= compact_queue
.end()) {
786 if (p
->first
== start
&& p
->second
== end
) {
790 if (p
->first
<= end
&& p
->first
> start
) {
791 // merge with existing range to the right
792 compact_queue
.push_back(make_pair(start
, p
->second
));
793 compact_queue
.erase(p
);
794 logger
->inc(l_rocksdb_compact_queue_merge
);
797 if (p
->second
>= start
&& p
->second
< end
) {
798 // merge with existing range to the left
799 compact_queue
.push_back(make_pair(p
->first
, end
));
800 compact_queue
.erase(p
);
801 logger
->inc(l_rocksdb_compact_queue_merge
);
806 if (p
== compact_queue
.end()) {
807 // no merge, new entry.
808 compact_queue
.push_back(make_pair(start
, end
));
809 logger
->set(l_rocksdb_compact_queue_len
, compact_queue
.size());
811 compact_queue_cond
.Signal();
812 if (!compact_thread
.is_started()) {
813 compact_thread
.create("rstore_compact");
816 bool RocksDBStore::check_omap_dir(string
&omap_dir
)
818 rocksdb::Options options
;
819 options
.create_if_missing
= true;
821 rocksdb::Status status
= rocksdb::DB::Open(options
, omap_dir
, &db
);
826 void RocksDBStore::compact_range(const string
& start
, const string
& end
)
828 rocksdb::CompactRangeOptions options
;
829 rocksdb::Slice
cstart(start
);
830 rocksdb::Slice
cend(end
);
831 db
->CompactRange(options
, &cstart
, &cend
);
833 RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl()
837 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first()
839 dbiter
->SeekToFirst();
840 return dbiter
->status().ok() ? 0 : -1;
842 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string
&prefix
)
844 rocksdb::Slice
slice_prefix(prefix
);
845 dbiter
->Seek(slice_prefix
);
846 return dbiter
->status().ok() ? 0 : -1;
848 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last()
850 dbiter
->SeekToLast();
851 return dbiter
->status().ok() ? 0 : -1;
853 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string
&prefix
)
855 string limit
= past_prefix(prefix
);
856 rocksdb::Slice
slice_limit(limit
);
857 dbiter
->Seek(slice_limit
);
859 if (!dbiter
->Valid()) {
860 dbiter
->SeekToLast();
864 return dbiter
->status().ok() ? 0 : -1;
866 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string
&prefix
, const string
&after
)
868 lower_bound(prefix
, after
);
870 pair
<string
,string
> key
= raw_key();
871 if (key
.first
== prefix
&& key
.second
== after
)
874 return dbiter
->status().ok() ? 0 : -1;
876 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string
&prefix
, const string
&to
)
878 string bound
= combine_strings(prefix
, to
);
879 rocksdb::Slice
slice_bound(bound
);
880 dbiter
->Seek(slice_bound
);
881 return dbiter
->status().ok() ? 0 : -1;
883 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid()
885 return dbiter
->Valid();
887 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next()
892 return dbiter
->status().ok() ? 0 : -1;
894 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev()
899 return dbiter
->status().ok() ? 0 : -1;
901 string
RocksDBStore::RocksDBWholeSpaceIteratorImpl::key()
904 split_key(dbiter
->key(), 0, &out_key
);
907 pair
<string
,string
> RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key()
910 split_key(dbiter
->key(), &prefix
, &key
);
911 return make_pair(prefix
, key
);
914 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key_is_prefixed(const string
&prefix
) {
915 // Look for "prefix\0" right in rocksb::Slice
916 rocksdb::Slice key
= dbiter
->key();
917 if ((key
.size() > prefix
.length()) && (key
[prefix
.length()] == '\0')) {
918 return memcmp(key
.data(), prefix
.c_str(), prefix
.length()) == 0;
924 bufferlist
RocksDBStore::RocksDBWholeSpaceIteratorImpl::value()
926 return to_bufferlist(dbiter
->value());
929 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::key_size()
931 return dbiter
->key().size();
934 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_size()
936 return dbiter
->value().size();
939 bufferptr
RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_as_ptr()
941 rocksdb::Slice val
= dbiter
->value();
942 return bufferptr(val
.data(), val
.size());
945 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status()
947 return dbiter
->status().ok() ? 0 : -1;
950 string
RocksDBStore::past_prefix(const string
&prefix
)
952 string limit
= prefix
;
957 RocksDBStore::WholeSpaceIterator
RocksDBStore::_get_iterator()
959 return std::make_shared
<RocksDBWholeSpaceIteratorImpl
>(
960 db
->NewIterator(rocksdb::ReadOptions()));