1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
10 #include <sys/types.h>
13 #include "rocksdb/db.h"
14 #include "rocksdb/table.h"
15 #include "rocksdb/env.h"
16 #include "rocksdb/slice.h"
17 #include "rocksdb/cache.h"
18 #include "rocksdb/filter_policy.h"
19 #include "rocksdb/utilities/convenience.h"
20 #include "rocksdb/merge_operator.h"
21 #include "kv/rocksdb_cache/BinnedLRUCache.h"
24 #include "common/perf_counters.h"
25 #include "common/PriorityCache.h"
26 #include "include/str_list.h"
27 #include "include/stringify.h"
28 #include "include/str_map.h"
29 #include "KeyValueDB.h"
30 #include "RocksDBStore.h"
32 #include "common/debug.h"
34 #define dout_context cct
35 #define dout_subsys ceph_subsys_rocksdb
37 #define dout_prefix *_dout << "rocksdb: "
39 static rocksdb::SliceParts
prepare_sliceparts(const bufferlist
&bl
,
40 vector
<rocksdb::Slice
> *slices
)
43 for (auto& buf
: bl
.buffers()) {
44 (*slices
)[n
].data_
= buf
.c_str();
45 (*slices
)[n
].size_
= buf
.length();
48 return rocksdb::SliceParts(slices
->data(), slices
->size());
52 // One of these per rocksdb instance, implements the merge operator prefix stuff
54 class RocksDBStore::MergeOperatorRouter
: public rocksdb::AssociativeMergeOperator
{
57 const char *Name() const override
{
58 // Construct a name that rocksDB will validate against. We want to
59 // do this in a way that doesn't constrain the ordering of calls
60 // to set_merge_operator, so sort the merge operators and then
61 // construct a name from all of those parts.
62 store
.assoc_name
.clear();
63 map
<std::string
,std::string
> names
;
64 for (auto& p
: store
.merge_ops
) names
[p
.first
] = p
.second
->name();
65 for (auto& p
: names
) {
66 store
.assoc_name
+= '.';
67 store
.assoc_name
+= p
.first
;
68 store
.assoc_name
+= ':';
69 store
.assoc_name
+= p
.second
;
71 return store
.assoc_name
.c_str();
74 MergeOperatorRouter(RocksDBStore
&_store
) : store(_store
) {}
76 bool Merge(const rocksdb::Slice
& key
,
77 const rocksdb::Slice
* existing_value
,
78 const rocksdb::Slice
& value
,
79 std::string
* new_value
,
80 rocksdb::Logger
* logger
) const override
{
82 for (auto& p
: store
.merge_ops
) {
83 if (p
.first
.compare(0, p
.first
.length(),
84 key
.data(), p
.first
.length()) == 0 &&
85 key
.data()[p
.first
.length()] == 0) {
87 p
.second
->merge(existing_value
->data(), existing_value
->size(),
88 value
.data(), value
.size(),
91 p
.second
->merge_nonexistent(value
.data(), value
.size(), new_value
);
101 int RocksDBStore::set_merge_operator(
102 const string
& prefix
,
103 std::shared_ptr
<KeyValueDB::MergeOperator
> mop
)
105 // If you fail here, it's because you can't do this on an open database
106 assert(db
== nullptr);
107 merge_ops
.push_back(std::make_pair(prefix
,mop
));
111 class CephRocksdbLogger
: public rocksdb::Logger
{
114 explicit CephRocksdbLogger(CephContext
*c
) : cct(c
) {
117 ~CephRocksdbLogger() override
{
121 // Write an entry to the log file with the specified format.
122 void Logv(const char* format
, va_list ap
) override
{
123 Logv(rocksdb::INFO_LEVEL
, format
, ap
);
126 // Write an entry to the log file with the specified log level
127 // and format. Any log with level under the internal log level
128 // of *this (see @SetInfoLogLevel and @GetInfoLogLevel) will not be
130 void Logv(const rocksdb::InfoLogLevel log_level
, const char* format
,
131 va_list ap
) override
{
132 int v
= rocksdb::NUM_INFO_LOG_LEVELS
- log_level
- 1;
135 vsnprintf(buf
, sizeof(buf
), format
, ap
);
136 *_dout
<< buf
<< dendl
;
140 rocksdb::Logger
*create_rocksdb_ceph_logger()
142 return new CephRocksdbLogger(g_ceph_context
);
145 static int string2bool(const string
&val
, bool &b_val
)
147 if (strcasecmp(val
.c_str(), "false") == 0) {
150 } else if (strcasecmp(val
.c_str(), "true") == 0) {
155 int b
= strict_strtol(val
.c_str(), 10, &err
);
163 int RocksDBStore::tryInterpret(const string
&key
, const string
&val
, rocksdb::Options
&opt
)
165 if (key
== "compaction_threads") {
167 int f
= strict_iecstrtoll(val
.c_str(), &err
);
170 //Low priority threadpool is used for compaction
171 opt
.env
->SetBackgroundThreads(f
, rocksdb::Env::Priority::LOW
);
172 } else if (key
== "flusher_threads") {
174 int f
= strict_iecstrtoll(val
.c_str(), &err
);
177 //High priority threadpool is used for flusher
178 opt
.env
->SetBackgroundThreads(f
, rocksdb::Env::Priority::HIGH
);
179 } else if (key
== "compact_on_mount") {
180 int ret
= string2bool(val
, compact_on_mount
);
183 } else if (key
== "disableWAL") {
184 int ret
= string2bool(val
, disableWAL
);
188 //unrecognize config options.
194 int RocksDBStore::ParseOptionsFromString(const string
&opt_str
, rocksdb::Options
&opt
)
196 map
<string
, string
> str_map
;
197 int r
= get_str_map(opt_str
, &str_map
, ",\n;");
200 map
<string
, string
>::iterator it
;
201 for(it
= str_map
.begin(); it
!= str_map
.end(); ++it
) {
202 string this_opt
= it
->first
+ "=" + it
->second
;
203 rocksdb::Status status
= rocksdb::GetOptionsFromString(opt
, this_opt
, &opt
);
205 //unrecognized by rocksdb, try to interpret by ourselves.
206 r
= tryInterpret(it
->first
, it
->second
, opt
);
208 derr
<< status
.ToString() << dendl
;
212 lgeneric_dout(cct
, 0) << " set rocksdb option " << it
->first
213 << " = " << it
->second
<< dendl
;
218 int RocksDBStore::init(string _options_str
)
220 options_str
= _options_str
;
221 rocksdb::Options opt
;
223 if (options_str
.length()) {
224 int r
= ParseOptionsFromString(options_str
, opt
);
232 int RocksDBStore::create_and_open(ostream
&out
)
235 unique_ptr
<rocksdb::Directory
> dir
;
236 env
->NewDirectory(path
, &dir
);
238 int r
= ::mkdir(path
.c_str(), 0755);
241 if (r
< 0 && r
!= -EEXIST
) {
242 derr
<< __func__
<< " failed to create " << path
<< ": " << cpp_strerror(r
)
247 return do_open(out
, true);
250 int RocksDBStore::do_open(ostream
&out
, bool create_if_missing
)
252 rocksdb::Options opt
;
253 rocksdb::Status status
;
255 if (options_str
.length()) {
256 int r
= ParseOptionsFromString(options_str
, opt
);
262 if (g_conf
->rocksdb_perf
) {
263 dbstats
= rocksdb::CreateDBStatistics();
264 opt
.statistics
= dbstats
;
267 opt
.create_if_missing
= create_if_missing
;
268 if (g_conf
->rocksdb_separate_wal_dir
) {
269 opt
.wal_dir
= path
+ ".wal";
271 if (g_conf
->get_val
<std::string
>("rocksdb_db_paths").length()) {
273 get_str_list(g_conf
->get_val
<std::string
>("rocksdb_db_paths"), "; \t", paths
);
274 for (auto& p
: paths
) {
275 size_t pos
= p
.find(',');
276 if (pos
== std::string::npos
) {
277 derr
<< __func__
<< " invalid db path item " << p
<< " in "
278 << g_conf
->get_val
<std::string
>("rocksdb_db_paths") << dendl
;
281 string path
= p
.substr(0, pos
);
282 string size_str
= p
.substr(pos
+ 1);
283 uint64_t size
= atoll(size_str
.c_str());
285 derr
<< __func__
<< " invalid db path item " << p
<< " in "
286 << g_conf
->get_val
<std::string
>("rocksdb_db_paths") << dendl
;
289 opt
.db_paths
.push_back(rocksdb::DbPath(path
, size
));
290 dout(10) << __func__
<< " db_path " << path
<< " size " << size
<< dendl
;
294 if (g_conf
->rocksdb_log_to_ceph_log
) {
295 opt
.info_log
.reset(new CephRocksdbLogger(g_ceph_context
));
299 dout(10) << __func__
<< " using custom Env " << priv
<< dendl
;
300 opt
.env
= static_cast<rocksdb::Env
*>(priv
);
304 if (!set_cache_flag
) {
305 cache_size
= g_conf
->rocksdb_cache_size
;
307 uint64_t row_cache_size
= cache_size
* g_conf
->rocksdb_cache_row_ratio
;
308 uint64_t block_cache_size
= cache_size
- row_cache_size
;
310 if (g_conf
->rocksdb_cache_type
== "binned_lru") {
311 bbt_opts
.block_cache
= rocksdb_cache::NewBinnedLRUCache(
313 g_conf
->rocksdb_cache_shard_bits
);
314 } else if (g_conf
->rocksdb_cache_type
== "lru") {
315 bbt_opts
.block_cache
= rocksdb::NewLRUCache(
317 g_conf
->rocksdb_cache_shard_bits
);
318 } else if (g_conf
->rocksdb_cache_type
== "clock") {
319 bbt_opts
.block_cache
= rocksdb::NewClockCache(
321 g_conf
->rocksdb_cache_shard_bits
);
322 if (!bbt_opts
.block_cache
) {
323 derr
<< "rocksdb_cache_type '" << g_conf
->rocksdb_cache_type
324 << "' chosen, but RocksDB not compiled with LibTBB. "
329 derr
<< "unrecognized rocksdb_cache_type '" << g_conf
->rocksdb_cache_type
333 bbt_opts
.block_size
= g_conf
->rocksdb_block_size
;
335 if (row_cache_size
> 0)
336 opt
.row_cache
= rocksdb::NewLRUCache(row_cache_size
,
337 g_conf
->rocksdb_cache_shard_bits
);
338 uint64_t bloom_bits
= g_conf
->get_val
<uint64_t>("rocksdb_bloom_bits_per_key");
339 if (bloom_bits
> 0) {
340 dout(10) << __func__
<< " set bloom filter bits per key to "
341 << bloom_bits
<< dendl
;
342 bbt_opts
.filter_policy
.reset(rocksdb::NewBloomFilterPolicy(bloom_bits
));
344 if (g_conf
->get_val
<std::string
>("rocksdb_index_type") == "binary_search")
345 bbt_opts
.index_type
= rocksdb::BlockBasedTableOptions::IndexType::kBinarySearch
;
346 if (g_conf
->get_val
<std::string
>("rocksdb_index_type") == "hash_search")
347 bbt_opts
.index_type
= rocksdb::BlockBasedTableOptions::IndexType::kHashSearch
;
348 if (g_conf
->get_val
<std::string
>("rocksdb_index_type") == "two_level")
349 bbt_opts
.index_type
= rocksdb::BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch
;
350 bbt_opts
.cache_index_and_filter_blocks
=
351 g_conf
->get_val
<bool>("rocksdb_cache_index_and_filter_blocks");
352 bbt_opts
.cache_index_and_filter_blocks_with_high_priority
=
353 g_conf
->get_val
<bool>("rocksdb_cache_index_and_filter_blocks_with_high_priority");
354 bbt_opts
.partition_filters
= g_conf
->get_val
<bool>("rocksdb_partition_filters");
355 if (g_conf
->get_val
<uint64_t>("rocksdb_metadata_block_size") > 0)
356 bbt_opts
.metadata_block_size
= g_conf
->get_val
<uint64_t>("rocksdb_metadata_block_size");
357 bbt_opts
.pin_l0_filter_and_index_blocks_in_cache
=
358 g_conf
->get_val
<bool>("rocksdb_pin_l0_filter_and_index_blocks_in_cache");
360 opt
.table_factory
.reset(rocksdb::NewBlockBasedTableFactory(bbt_opts
));
361 dout(10) << __func__
<< " block size " << g_conf
->rocksdb_block_size
362 << ", block_cache size " << byte_u_t(block_cache_size
)
363 << ", row_cache size " << byte_u_t(row_cache_size
)
365 << (1 << g_conf
->rocksdb_cache_shard_bits
)
366 << ", type " << g_conf
->rocksdb_cache_type
369 opt
.merge_operator
.reset(new MergeOperatorRouter(*this));
370 status
= rocksdb::DB::Open(opt
, path
, &db
);
372 derr
<< status
.ToString() << dendl
;
376 PerfCountersBuilder
plb(g_ceph_context
, "rocksdb", l_rocksdb_first
, l_rocksdb_last
);
377 plb
.add_u64_counter(l_rocksdb_gets
, "get", "Gets");
378 plb
.add_u64_counter(l_rocksdb_txns
, "submit_transaction", "Submit transactions");
379 plb
.add_u64_counter(l_rocksdb_txns_sync
, "submit_transaction_sync", "Submit transactions sync");
380 plb
.add_time_avg(l_rocksdb_get_latency
, "get_latency", "Get latency");
381 plb
.add_time_avg(l_rocksdb_submit_latency
, "submit_latency", "Submit Latency");
382 plb
.add_time_avg(l_rocksdb_submit_sync_latency
, "submit_sync_latency", "Submit Sync Latency");
383 plb
.add_u64_counter(l_rocksdb_compact
, "compact", "Compactions");
384 plb
.add_u64_counter(l_rocksdb_compact_range
, "compact_range", "Compactions by range");
385 plb
.add_u64_counter(l_rocksdb_compact_queue_merge
, "compact_queue_merge", "Mergings of ranges in compaction queue");
386 plb
.add_u64(l_rocksdb_compact_queue_len
, "compact_queue_len", "Length of compaction queue");
387 plb
.add_time_avg(l_rocksdb_write_wal_time
, "rocksdb_write_wal_time", "Rocksdb write wal time");
388 plb
.add_time_avg(l_rocksdb_write_memtable_time
, "rocksdb_write_memtable_time", "Rocksdb write memtable time");
389 plb
.add_time_avg(l_rocksdb_write_delay_time
, "rocksdb_write_delay_time", "Rocksdb write delay time");
390 plb
.add_time_avg(l_rocksdb_write_pre_and_post_process_time
,
391 "rocksdb_write_pre_and_post_time", "total time spent on writing a record, excluding write process");
392 logger
= plb
.create_perf_counters();
393 cct
->get_perfcounters_collection()->add(logger
);
395 if (compact_on_mount
) {
396 derr
<< "Compacting rocksdb store..." << dendl
;
398 derr
<< "Finished compacting rocksdb store" << dendl
;
403 int RocksDBStore::_test_init(const string
& dir
)
405 rocksdb::Options options
;
406 options
.create_if_missing
= true;
408 rocksdb::Status status
= rocksdb::DB::Open(options
, dir
, &db
);
411 return status
.ok() ? 0 : -EIO
;
414 RocksDBStore::~RocksDBStore()
419 // Ensure db is destroyed before dependent db_cache and filterpolicy
424 delete static_cast<rocksdb::Env
*>(priv
);
428 void RocksDBStore::close()
430 // stop compaction thread
431 compact_queue_lock
.Lock();
432 if (compact_thread
.is_started()) {
433 compact_queue_stop
= true;
434 compact_queue_cond
.Signal();
435 compact_queue_lock
.Unlock();
436 compact_thread
.join();
438 compact_queue_lock
.Unlock();
442 cct
->get_perfcounters_collection()->remove(logger
);
445 void RocksDBStore::split_stats(const std::string
&s
, char delim
, std::vector
<std::string
> &elems
) {
446 std::stringstream ss
;
449 while (std::getline(ss
, item
, delim
)) {
450 elems
.push_back(item
);
454 void RocksDBStore::get_statistics(Formatter
*f
)
456 if (!g_conf
->rocksdb_perf
) {
457 dout(20) << __func__
<< "RocksDB perf is disabled, can't probe for stats"
462 if (g_conf
->rocksdb_collect_compaction_stats
) {
463 std::string stat_str
;
464 bool status
= db
->GetProperty("rocksdb.stats", &stat_str
);
466 f
->open_object_section("rocksdb_statistics");
467 f
->dump_string("rocksdb_compaction_statistics", "");
468 vector
<string
> stats
;
469 split_stats(stat_str
, '\n', stats
);
470 for (auto st
:stats
) {
471 f
->dump_string("", st
);
476 if (g_conf
->rocksdb_collect_extended_stats
) {
478 f
->open_object_section("rocksdb_extended_statistics");
479 string stat_str
= dbstats
->ToString();
480 vector
<string
> stats
;
481 split_stats(stat_str
, '\n', stats
);
482 f
->dump_string("rocksdb_extended_statistics", "");
483 for (auto st
:stats
) {
484 f
->dump_string(".", st
);
488 f
->open_object_section("rocksdbstore_perf_counters");
489 logger
->dump_formatted(f
,0);
492 if (g_conf
->rocksdb_collect_memory_stats
) {
493 f
->open_object_section("rocksdb_memtable_statistics");
494 std::string
str(stringify(bbt_opts
.block_cache
->GetUsage()));
495 f
->dump_string("block_cache_usage", str
.data());
497 str
.append(stringify(bbt_opts
.block_cache
->GetPinnedUsage()));
498 f
->dump_string("block_cache_pinned_blocks_usage", str
);
500 db
->GetProperty("rocksdb.cur-size-all-mem-tables", &str
);
501 f
->dump_string("rocksdb_memtable_usage", str
);
506 int RocksDBStore::submit_transaction(KeyValueDB::Transaction t
)
508 utime_t start
= ceph_clock_now();
509 // enable rocksdb breakdown
510 // considering performance overhead, default is disabled
511 if (g_conf
->rocksdb_perf
) {
512 rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex
);
513 rocksdb::perf_context
.Reset();
516 RocksDBTransactionImpl
* _t
=
517 static_cast<RocksDBTransactionImpl
*>(t
.get());
518 rocksdb::WriteOptions woptions
;
519 woptions
.disableWAL
= disableWAL
;
520 lgeneric_subdout(cct
, rocksdb
, 30) << __func__
;
521 RocksWBHandler bat_txc
;
522 _t
->bat
.Iterate(&bat_txc
);
523 *_dout
<< " Rocksdb transaction: " << bat_txc
.seen
<< dendl
;
525 rocksdb::Status s
= db
->Write(woptions
, &_t
->bat
);
527 RocksWBHandler rocks_txc
;
528 _t
->bat
.Iterate(&rocks_txc
);
529 derr
<< __func__
<< " error: " << s
.ToString() << " code = " << s
.code()
530 << " Rocksdb transaction: " << rocks_txc
.seen
<< dendl
;
532 utime_t lat
= ceph_clock_now() - start
;
534 if (g_conf
->rocksdb_perf
) {
535 utime_t write_memtable_time
;
536 utime_t write_delay_time
;
537 utime_t write_wal_time
;
538 utime_t write_pre_and_post_process_time
;
539 write_wal_time
.set_from_double(
540 static_cast<double>(rocksdb::perf_context
.write_wal_time
)/1000000000);
541 write_memtable_time
.set_from_double(
542 static_cast<double>(rocksdb::perf_context
.write_memtable_time
)/1000000000);
543 write_delay_time
.set_from_double(
544 static_cast<double>(rocksdb::perf_context
.write_delay_time
)/1000000000);
545 write_pre_and_post_process_time
.set_from_double(
546 static_cast<double>(rocksdb::perf_context
.write_pre_and_post_process_time
)/1000000000);
547 logger
->tinc(l_rocksdb_write_memtable_time
, write_memtable_time
);
548 logger
->tinc(l_rocksdb_write_delay_time
, write_delay_time
);
549 logger
->tinc(l_rocksdb_write_wal_time
, write_wal_time
);
550 logger
->tinc(l_rocksdb_write_pre_and_post_process_time
, write_pre_and_post_process_time
);
553 logger
->inc(l_rocksdb_txns
);
554 logger
->tinc(l_rocksdb_submit_latency
, lat
);
556 return s
.ok() ? 0 : -1;
559 int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t
)
561 utime_t start
= ceph_clock_now();
562 // enable rocksdb breakdown
563 // considering performance overhead, default is disabled
564 if (g_conf
->rocksdb_perf
) {
565 rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex
);
566 rocksdb::perf_context
.Reset();
569 RocksDBTransactionImpl
* _t
=
570 static_cast<RocksDBTransactionImpl
*>(t
.get());
571 rocksdb::WriteOptions woptions
;
572 woptions
.sync
= true;
573 woptions
.disableWAL
= disableWAL
;
574 lgeneric_subdout(cct
, rocksdb
, 30) << __func__
;
575 RocksWBHandler bat_txc
;
576 _t
->bat
.Iterate(&bat_txc
);
577 *_dout
<< " Rocksdb transaction: " << bat_txc
.seen
<< dendl
;
579 rocksdb::Status s
= db
->Write(woptions
, &_t
->bat
);
581 RocksWBHandler rocks_txc
;
582 _t
->bat
.Iterate(&rocks_txc
);
583 derr
<< __func__
<< " error: " << s
.ToString() << " code = " << s
.code()
584 << " Rocksdb transaction: " << rocks_txc
.seen
<< dendl
;
586 utime_t lat
= ceph_clock_now() - start
;
588 if (g_conf
->rocksdb_perf
) {
589 utime_t write_memtable_time
;
590 utime_t write_delay_time
;
591 utime_t write_wal_time
;
592 utime_t write_pre_and_post_process_time
;
593 write_wal_time
.set_from_double(
594 static_cast<double>(rocksdb::perf_context
.write_wal_time
)/1000000000);
595 write_memtable_time
.set_from_double(
596 static_cast<double>(rocksdb::perf_context
.write_memtable_time
)/1000000000);
597 write_delay_time
.set_from_double(
598 static_cast<double>(rocksdb::perf_context
.write_delay_time
)/1000000000);
599 write_pre_and_post_process_time
.set_from_double(
600 static_cast<double>(rocksdb::perf_context
.write_pre_and_post_process_time
)/1000000000);
601 logger
->tinc(l_rocksdb_write_memtable_time
, write_memtable_time
);
602 logger
->tinc(l_rocksdb_write_delay_time
, write_delay_time
);
603 logger
->tinc(l_rocksdb_write_wal_time
, write_wal_time
);
604 logger
->tinc(l_rocksdb_write_pre_and_post_process_time
, write_pre_and_post_process_time
);
607 logger
->inc(l_rocksdb_txns_sync
);
608 logger
->tinc(l_rocksdb_submit_sync_latency
, lat
);
610 return s
.ok() ? 0 : -1;
613 RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore
*_db
)
618 void RocksDBStore::RocksDBTransactionImpl::set(
619 const string
&prefix
,
621 const bufferlist
&to_set_bl
)
623 string key
= combine_strings(prefix
, k
);
625 // bufferlist::c_str() is non-constant, so we can't call c_str()
626 if (to_set_bl
.is_contiguous() && to_set_bl
.length() > 0) {
627 bat
.Put(rocksdb::Slice(key
),
628 rocksdb::Slice(to_set_bl
.buffers().front().c_str(),
629 to_set_bl
.length()));
631 rocksdb::Slice
key_slice(key
);
632 vector
<rocksdb::Slice
> value_slices(to_set_bl
.buffers().size());
633 bat
.Put(nullptr, rocksdb::SliceParts(&key_slice
, 1),
634 prepare_sliceparts(to_set_bl
, &value_slices
));
638 void RocksDBStore::RocksDBTransactionImpl::set(
639 const string
&prefix
,
640 const char *k
, size_t keylen
,
641 const bufferlist
&to_set_bl
)
644 combine_strings(prefix
, k
, keylen
, &key
);
646 // bufferlist::c_str() is non-constant, so we can't call c_str()
647 if (to_set_bl
.is_contiguous() && to_set_bl
.length() > 0) {
648 bat
.Put(rocksdb::Slice(key
),
649 rocksdb::Slice(to_set_bl
.buffers().front().c_str(),
650 to_set_bl
.length()));
652 rocksdb::Slice
key_slice(key
);
653 vector
<rocksdb::Slice
> value_slices(to_set_bl
.buffers().size());
654 bat
.Put(nullptr, rocksdb::SliceParts(&key_slice
, 1),
655 prepare_sliceparts(to_set_bl
, &value_slices
));
659 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string
&prefix
,
662 bat
.Delete(combine_strings(prefix
, k
));
665 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string
&prefix
,
670 combine_strings(prefix
, k
, keylen
, &key
);
674 void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string
&prefix
,
677 bat
.SingleDelete(combine_strings(prefix
, k
));
680 void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string
&prefix
)
682 if (db
->enable_rmrange
) {
683 string endprefix
= prefix
;
684 endprefix
.push_back('\x01');
685 bat
.DeleteRange(combine_strings(prefix
, string()),
686 combine_strings(endprefix
, string()));
688 KeyValueDB::Iterator it
= db
->get_iterator(prefix
);
689 for (it
->seek_to_first();
692 bat
.Delete(combine_strings(prefix
, it
->key()));
697 void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string
&prefix
,
701 if (db
->enable_rmrange
) {
702 bat
.DeleteRange(combine_strings(prefix
, start
), combine_strings(prefix
, end
));
704 auto it
= db
->get_iterator(prefix
);
705 it
->lower_bound(start
);
706 while (it
->valid()) {
707 if (it
->key() >= end
) {
710 bat
.Delete(combine_strings(prefix
, it
->key()));
716 void RocksDBStore::RocksDBTransactionImpl::merge(
717 const string
&prefix
,
719 const bufferlist
&to_set_bl
)
721 string key
= combine_strings(prefix
, k
);
723 // bufferlist::c_str() is non-constant, so we can't call c_str()
724 if (to_set_bl
.is_contiguous() && to_set_bl
.length() > 0) {
725 bat
.Merge(rocksdb::Slice(key
),
726 rocksdb::Slice(to_set_bl
.buffers().front().c_str(),
727 to_set_bl
.length()));
730 rocksdb::Slice
key_slice(key
);
731 vector
<rocksdb::Slice
> value_slices(to_set_bl
.buffers().size());
732 bat
.Merge(nullptr, rocksdb::SliceParts(&key_slice
, 1),
733 prepare_sliceparts(to_set_bl
, &value_slices
));
737 //gets will bypass RocksDB row cache, since it uses iterator
738 int RocksDBStore::get(
739 const string
&prefix
,
740 const std::set
<string
> &keys
,
741 std::map
<string
, bufferlist
> *out
)
743 utime_t start
= ceph_clock_now();
744 for (std::set
<string
>::const_iterator i
= keys
.begin();
745 i
!= keys
.end(); ++i
) {
747 std::string bound
= combine_strings(prefix
, *i
);
748 auto status
= db
->Get(rocksdb::ReadOptions(), rocksdb::Slice(bound
), &value
);
750 (*out
)[*i
].append(value
);
751 } else if (status
.IsIOError()) {
752 ceph_abort_msg(cct
, status
.ToString());
756 utime_t lat
= ceph_clock_now() - start
;
757 logger
->inc(l_rocksdb_gets
);
758 logger
->tinc(l_rocksdb_get_latency
, lat
);
762 int RocksDBStore::get(
763 const string
&prefix
,
767 assert(out
&& (out
->length() == 0));
768 utime_t start
= ceph_clock_now();
772 k
= combine_strings(prefix
, key
);
773 s
= db
->Get(rocksdb::ReadOptions(), rocksdb::Slice(k
), &value
);
776 } else if (s
.IsNotFound()) {
779 ceph_abort_msg(cct
, s
.ToString());
781 utime_t lat
= ceph_clock_now() - start
;
782 logger
->inc(l_rocksdb_gets
);
783 logger
->tinc(l_rocksdb_get_latency
, lat
);
787 int RocksDBStore::get(
788 const string
& prefix
,
793 assert(out
&& (out
->length() == 0));
794 utime_t start
= ceph_clock_now();
797 combine_strings(prefix
, key
, keylen
, &k
);
799 s
= db
->Get(rocksdb::ReadOptions(), rocksdb::Slice(k
), &value
);
802 } else if (s
.IsNotFound()) {
805 ceph_abort_msg(cct
, s
.ToString());
807 utime_t lat
= ceph_clock_now() - start
;
808 logger
->inc(l_rocksdb_gets
);
809 logger
->tinc(l_rocksdb_get_latency
, lat
);
813 int RocksDBStore::split_key(rocksdb::Slice in
, string
*prefix
, string
*key
)
815 size_t prefix_len
= 0;
817 // Find separator inside Slice
818 char* separator
= (char*) memchr(in
.data(), 0, in
.size());
819 if (separator
== NULL
)
821 prefix_len
= size_t(separator
- in
.data());
822 if (prefix_len
>= in
.size())
825 // Fetch prefix and/or key directly from Slice
827 *prefix
= string(in
.data(), prefix_len
);
829 *key
= string(separator
+1, in
.size()-prefix_len
-1);
833 void RocksDBStore::compact()
835 logger
->inc(l_rocksdb_compact
);
836 rocksdb::CompactRangeOptions options
;
837 db
->CompactRange(options
, nullptr, nullptr);
841 void RocksDBStore::compact_thread_entry()
843 compact_queue_lock
.Lock();
844 while (!compact_queue_stop
) {
845 while (!compact_queue
.empty()) {
846 pair
<string
,string
> range
= compact_queue
.front();
847 compact_queue
.pop_front();
848 logger
->set(l_rocksdb_compact_queue_len
, compact_queue
.size());
849 compact_queue_lock
.Unlock();
850 logger
->inc(l_rocksdb_compact_range
);
851 compact_range(range
.first
, range
.second
);
852 compact_queue_lock
.Lock();
855 compact_queue_cond
.Wait(compact_queue_lock
);
857 compact_queue_lock
.Unlock();
860 void RocksDBStore::compact_range_async(const string
& start
, const string
& end
)
862 Mutex::Locker
l(compact_queue_lock
);
864 // try to merge adjacent ranges. this is O(n), but the queue should
865 // be short. note that we do not cover all overlap cases and merge
866 // opportunities here, but we capture the ones we currently need.
867 list
< pair
<string
,string
> >::iterator p
= compact_queue
.begin();
868 while (p
!= compact_queue
.end()) {
869 if (p
->first
== start
&& p
->second
== end
) {
873 if (p
->first
<= end
&& p
->first
> start
) {
874 // merge with existing range to the right
875 compact_queue
.push_back(make_pair(start
, p
->second
));
876 compact_queue
.erase(p
);
877 logger
->inc(l_rocksdb_compact_queue_merge
);
880 if (p
->second
>= start
&& p
->second
< end
) {
881 // merge with existing range to the left
882 compact_queue
.push_back(make_pair(p
->first
, end
));
883 compact_queue
.erase(p
);
884 logger
->inc(l_rocksdb_compact_queue_merge
);
889 if (p
== compact_queue
.end()) {
890 // no merge, new entry.
891 compact_queue
.push_back(make_pair(start
, end
));
892 logger
->set(l_rocksdb_compact_queue_len
, compact_queue
.size());
894 compact_queue_cond
.Signal();
895 if (!compact_thread
.is_started()) {
896 compact_thread
.create("rstore_compact");
899 bool RocksDBStore::check_omap_dir(string
&omap_dir
)
901 rocksdb::Options options
;
902 options
.create_if_missing
= true;
904 rocksdb::Status status
= rocksdb::DB::Open(options
, omap_dir
, &db
);
909 void RocksDBStore::compact_range(const string
& start
, const string
& end
)
911 rocksdb::CompactRangeOptions options
;
912 rocksdb::Slice
cstart(start
);
913 rocksdb::Slice
cend(end
);
914 db
->CompactRange(options
, &cstart
, &cend
);
917 int64_t RocksDBStore::request_cache_bytes(PriorityCache::Priority pri
, uint64_t chunk_bytes
) const
919 auto cache
= bbt_opts
.block_cache
;
921 int64_t assigned
= get_cache_bytes(pri
);
925 // PRI0 is for rocksdb's high priority items (indexes/filters)
926 case PriorityCache::Priority::PRI0
:
928 usage
+= cache
->GetPinnedUsage();
929 if (g_conf
->rocksdb_cache_type
== "binned_lru") {
931 std::static_pointer_cast
<rocksdb_cache::BinnedLRUCache
>(cache
);
932 usage
+= binned_cache
->GetHighPriPoolUsage();
936 // All other cache items are currently shoved into the LAST priority.
937 case PriorityCache::Priority::LAST
:
939 usage
= get_cache_usage() - cache
->GetPinnedUsage();
940 if (g_conf
->rocksdb_cache_type
== "binned_lru") {
942 std::static_pointer_cast
<rocksdb_cache::BinnedLRUCache
>(cache
);
943 usage
-= binned_cache
->GetHighPriPoolUsage();
950 request
= PriorityCache::get_chunk(usage
, chunk_bytes
);
951 request
= (request
> assigned
) ? request
- assigned
: 0;
952 dout(10) << __func__
<< " Priority: " << static_cast<uint32_t>(pri
)
953 << " Usage: " << usage
<< " Request: " << request
<< dendl
;
957 int64_t RocksDBStore::get_cache_usage() const
959 return static_cast<int64_t>(bbt_opts
.block_cache
->GetUsage());
962 int64_t RocksDBStore::commit_cache_size()
964 size_t old_bytes
= bbt_opts
.block_cache
->GetCapacity();
965 int64_t total_bytes
= get_cache_bytes();
966 dout(10) << __func__
<< " old: " << old_bytes
967 << " new: " << total_bytes
<< dendl
;
968 bbt_opts
.block_cache
->SetCapacity((size_t) total_bytes
);
970 // Set the high priority pool ratio is this is the binned LRU cache.
971 if (g_conf
->rocksdb_cache_type
== "binned_lru") {
973 std::static_pointer_cast
<rocksdb_cache::BinnedLRUCache
>(bbt_opts
.block_cache
);
974 int64_t high_pri_bytes
= get_cache_bytes(PriorityCache::Priority::PRI0
);
975 double ratio
= (double) high_pri_bytes
/ total_bytes
;
976 dout(10) << __func__
<< " High Pri Pool Ratio set to " << ratio
<< dendl
;
977 binned_cache
->SetHighPriPoolRatio(ratio
);
982 int64_t RocksDBStore::get_cache_capacity() {
983 return bbt_opts
.block_cache
->GetCapacity();
986 RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl()
990 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first()
992 dbiter
->SeekToFirst();
993 assert(!dbiter
->status().IsIOError());
994 return dbiter
->status().ok() ? 0 : -1;
996 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string
&prefix
)
998 rocksdb::Slice
slice_prefix(prefix
);
999 dbiter
->Seek(slice_prefix
);
1000 assert(!dbiter
->status().IsIOError());
1001 return dbiter
->status().ok() ? 0 : -1;
1003 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last()
1005 dbiter
->SeekToLast();
1006 assert(!dbiter
->status().IsIOError());
1007 return dbiter
->status().ok() ? 0 : -1;
1009 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string
&prefix
)
1011 string limit
= past_prefix(prefix
);
1012 rocksdb::Slice
slice_limit(limit
);
1013 dbiter
->Seek(slice_limit
);
1015 if (!dbiter
->Valid()) {
1016 dbiter
->SeekToLast();
1020 return dbiter
->status().ok() ? 0 : -1;
1022 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string
&prefix
, const string
&after
)
1024 lower_bound(prefix
, after
);
1026 pair
<string
,string
> key
= raw_key();
1027 if (key
.first
== prefix
&& key
.second
== after
)
1030 return dbiter
->status().ok() ? 0 : -1;
1032 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string
&prefix
, const string
&to
)
1034 string bound
= combine_strings(prefix
, to
);
1035 rocksdb::Slice
slice_bound(bound
);
1036 dbiter
->Seek(slice_bound
);
1037 return dbiter
->status().ok() ? 0 : -1;
1039 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid()
1041 return dbiter
->Valid();
1043 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next()
1048 assert(!dbiter
->status().IsIOError());
1049 return dbiter
->status().ok() ? 0 : -1;
1051 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev()
1056 assert(!dbiter
->status().IsIOError());
1057 return dbiter
->status().ok() ? 0 : -1;
1059 string
RocksDBStore::RocksDBWholeSpaceIteratorImpl::key()
1062 split_key(dbiter
->key(), 0, &out_key
);
1065 pair
<string
,string
> RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key()
1068 split_key(dbiter
->key(), &prefix
, &key
);
1069 return make_pair(prefix
, key
);
1072 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key_is_prefixed(const string
&prefix
) {
1073 // Look for "prefix\0" right in rocksb::Slice
1074 rocksdb::Slice key
= dbiter
->key();
1075 if ((key
.size() > prefix
.length()) && (key
[prefix
.length()] == '\0')) {
1076 return memcmp(key
.data(), prefix
.c_str(), prefix
.length()) == 0;
1082 bufferlist
RocksDBStore::RocksDBWholeSpaceIteratorImpl::value()
1084 return to_bufferlist(dbiter
->value());
1087 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::key_size()
1089 return dbiter
->key().size();
1092 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_size()
1094 return dbiter
->value().size();
1097 bufferptr
RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_as_ptr()
1099 rocksdb::Slice val
= dbiter
->value();
1100 return bufferptr(val
.data(), val
.size());
1103 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status()
1105 return dbiter
->status().ok() ? 0 : -1;
1108 string
RocksDBStore::past_prefix(const string
&prefix
)
1110 string limit
= prefix
;
1115 RocksDBStore::WholeSpaceIterator
RocksDBStore::_get_iterator()
1117 return std::make_shared
<RocksDBWholeSpaceIteratorImpl
>(
1118 db
->NewIterator(rocksdb::ReadOptions()));