1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
10 #include <sys/types.h>
13 #include "rocksdb/db.h"
14 #include "rocksdb/table.h"
15 #include "rocksdb/env.h"
16 #include "rocksdb/slice.h"
17 #include "rocksdb/cache.h"
18 #include "rocksdb/filter_policy.h"
19 #include "rocksdb/utilities/convenience.h"
20 #include "rocksdb/merge_operator.h"
22 #include "common/perf_counters.h"
23 #include "common/debug.h"
24 #include "include/str_list.h"
25 #include "include/stringify.h"
26 #include "include/str_map.h"
27 #include "KeyValueDB.h"
28 #include "RocksDBStore.h"
30 #include "common/debug.h"
32 #define dout_context cct
33 #define dout_subsys ceph_subsys_rocksdb
35 #define dout_prefix *_dout << "rocksdb: "
37 static rocksdb::SliceParts
prepare_sliceparts(const bufferlist
&bl
,
38 vector
<rocksdb::Slice
> *slices
)
41 for (auto& buf
: bl
.buffers()) {
42 (*slices
)[n
].data_
= buf
.c_str();
43 (*slices
)[n
].size_
= buf
.length();
46 return rocksdb::SliceParts(slices
->data(), slices
->size());
50 // One of these per rocksdb instance, implements the merge operator prefix stuff
52 class RocksDBStore::MergeOperatorRouter
: public rocksdb::AssociativeMergeOperator
{
55 const char *Name() const override
{
56 // Construct a name that rocksDB will validate against. We want to
57 // do this in a way that doesn't constrain the ordering of calls
58 // to set_merge_operator, so sort the merge operators and then
59 // construct a name from all of those parts.
60 store
.assoc_name
.clear();
61 map
<std::string
,std::string
> names
;
62 for (auto& p
: store
.merge_ops
) names
[p
.first
] = p
.second
->name();
63 for (auto& p
: names
) {
64 store
.assoc_name
+= '.';
65 store
.assoc_name
+= p
.first
;
66 store
.assoc_name
+= ':';
67 store
.assoc_name
+= p
.second
;
69 return store
.assoc_name
.c_str();
72 MergeOperatorRouter(RocksDBStore
&_store
) : store(_store
) {}
74 bool Merge(const rocksdb::Slice
& key
,
75 const rocksdb::Slice
* existing_value
,
76 const rocksdb::Slice
& value
,
77 std::string
* new_value
,
78 rocksdb::Logger
* logger
) const override
{
80 for (auto& p
: store
.merge_ops
) {
81 if (p
.first
.compare(0, p
.first
.length(),
82 key
.data(), p
.first
.length()) == 0 &&
83 key
.data()[p
.first
.length()] == 0) {
85 p
.second
->merge(existing_value
->data(), existing_value
->size(),
86 value
.data(), value
.size(),
89 p
.second
->merge_nonexistent(value
.data(), value
.size(), new_value
);
99 int RocksDBStore::set_merge_operator(
100 const string
& prefix
,
101 std::shared_ptr
<KeyValueDB::MergeOperator
> mop
)
103 // If you fail here, it's because you can't do this on an open database
104 assert(db
== nullptr);
105 merge_ops
.push_back(std::make_pair(prefix
,mop
));
109 class CephRocksdbLogger
: public rocksdb::Logger
{
112 explicit CephRocksdbLogger(CephContext
*c
) : cct(c
) {
115 ~CephRocksdbLogger() override
{
119 // Write an entry to the log file with the specified format.
120 void Logv(const char* format
, va_list ap
) override
{
121 Logv(rocksdb::INFO_LEVEL
, format
, ap
);
124 // Write an entry to the log file with the specified log level
125 // and format. Any log with level under the internal log level
126 // of *this (see @SetInfoLogLevel and @GetInfoLogLevel) will not be
128 void Logv(const rocksdb::InfoLogLevel log_level
, const char* format
,
129 va_list ap
) override
{
130 int v
= rocksdb::NUM_INFO_LOG_LEVELS
- log_level
- 1;
133 vsnprintf(buf
, sizeof(buf
), format
, ap
);
134 *_dout
<< buf
<< dendl
;
138 rocksdb::Logger
*create_rocksdb_ceph_logger()
140 return new CephRocksdbLogger(g_ceph_context
);
143 static int string2bool(const string
&val
, bool &b_val
)
145 if (strcasecmp(val
.c_str(), "false") == 0) {
148 } else if (strcasecmp(val
.c_str(), "true") == 0) {
153 int b
= strict_strtol(val
.c_str(), 10, &err
);
161 int RocksDBStore::tryInterpret(const string
&key
, const string
&val
, rocksdb::Options
&opt
)
163 if (key
== "compaction_threads") {
165 int f
= strict_sistrtoll(val
.c_str(), &err
);
168 //Low priority threadpool is used for compaction
169 opt
.env
->SetBackgroundThreads(f
, rocksdb::Env::Priority::LOW
);
170 } else if (key
== "flusher_threads") {
172 int f
= strict_sistrtoll(val
.c_str(), &err
);
175 //High priority threadpool is used for flusher
176 opt
.env
->SetBackgroundThreads(f
, rocksdb::Env::Priority::HIGH
);
177 } else if (key
== "compact_on_mount") {
178 int ret
= string2bool(val
, compact_on_mount
);
181 } else if (key
== "disableWAL") {
182 int ret
= string2bool(val
, disableWAL
);
186 //unrecognize config options.
192 int RocksDBStore::ParseOptionsFromString(const string
&opt_str
, rocksdb::Options
&opt
)
194 map
<string
, string
> str_map
;
195 int r
= get_str_map(opt_str
, &str_map
, ",\n;");
198 map
<string
, string
>::iterator it
;
199 for(it
= str_map
.begin(); it
!= str_map
.end(); ++it
) {
200 string this_opt
= it
->first
+ "=" + it
->second
;
201 rocksdb::Status status
= rocksdb::GetOptionsFromString(opt
, this_opt
, &opt
);
203 //unrecognized by rocksdb, try to interpret by ourselves.
204 r
= tryInterpret(it
->first
, it
->second
, opt
);
206 derr
<< status
.ToString() << dendl
;
210 lgeneric_dout(cct
, 0) << " set rocksdb option " << it
->first
211 << " = " << it
->second
<< dendl
;
216 int RocksDBStore::init(string _options_str
)
218 options_str
= _options_str
;
219 rocksdb::Options opt
;
221 if (options_str
.length()) {
222 int r
= ParseOptionsFromString(options_str
, opt
);
230 int RocksDBStore::create_and_open(ostream
&out
)
233 unique_ptr
<rocksdb::Directory
> dir
;
234 env
->NewDirectory(path
, &dir
);
236 int r
= ::mkdir(path
.c_str(), 0755);
239 if (r
< 0 && r
!= -EEXIST
) {
240 derr
<< __func__
<< " failed to create " << path
<< ": " << cpp_strerror(r
)
245 return do_open(out
, true);
248 int RocksDBStore::do_open(ostream
&out
, bool create_if_missing
)
250 rocksdb::Options opt
;
251 rocksdb::Status status
;
253 if (options_str
.length()) {
254 int r
= ParseOptionsFromString(options_str
, opt
);
260 if (g_conf
->rocksdb_perf
) {
261 dbstats
= rocksdb::CreateDBStatistics();
262 opt
.statistics
= dbstats
;
265 opt
.create_if_missing
= create_if_missing
;
266 if (g_conf
->rocksdb_separate_wal_dir
) {
267 opt
.wal_dir
= path
+ ".wal";
269 if (g_conf
->get_val
<std::string
>("rocksdb_db_paths").length()) {
271 get_str_list(g_conf
->get_val
<std::string
>("rocksdb_db_paths"), "; \t", paths
);
272 for (auto& p
: paths
) {
273 size_t pos
= p
.find(',');
274 if (pos
== std::string::npos
) {
275 derr
<< __func__
<< " invalid db path item " << p
<< " in "
276 << g_conf
->get_val
<std::string
>("rocksdb_db_paths") << dendl
;
279 string path
= p
.substr(0, pos
);
280 string size_str
= p
.substr(pos
+ 1);
281 uint64_t size
= atoll(size_str
.c_str());
283 derr
<< __func__
<< " invalid db path item " << p
<< " in "
284 << g_conf
->get_val
<std::string
>("rocksdb_db_paths") << dendl
;
287 opt
.db_paths
.push_back(rocksdb::DbPath(path
, size
));
288 dout(10) << __func__
<< " db_path " << path
<< " size " << size
<< dendl
;
292 if (g_conf
->rocksdb_log_to_ceph_log
) {
293 opt
.info_log
.reset(new CephRocksdbLogger(g_ceph_context
));
297 dout(10) << __func__
<< " using custom Env " << priv
<< dendl
;
298 opt
.env
= static_cast<rocksdb::Env
*>(priv
);
302 if (!set_cache_flag
) {
303 cache_size
= g_conf
->rocksdb_cache_size
;
305 uint64_t row_cache_size
= cache_size
* g_conf
->rocksdb_cache_row_ratio
;
306 uint64_t block_cache_size
= cache_size
- row_cache_size
;
308 if (block_cache_size
== 0) {
309 // disable block cache
310 dout(10) << __func__
<< " block_cache_size " << block_cache_size
311 << ", setting no_block_cache " << dendl
;
312 bbt_opts
.no_block_cache
= true;
314 if (g_conf
->rocksdb_cache_type
== "lru") {
315 bbt_opts
.block_cache
= rocksdb::NewLRUCache(
317 g_conf
->rocksdb_cache_shard_bits
);
318 } else if (g_conf
->rocksdb_cache_type
== "clock") {
319 bbt_opts
.block_cache
= rocksdb::NewClockCache(
321 g_conf
->rocksdb_cache_shard_bits
);
323 derr
<< "unrecognized rocksdb_cache_type '" << g_conf
->rocksdb_cache_type
328 bbt_opts
.block_size
= g_conf
->rocksdb_block_size
;
330 if (row_cache_size
> 0)
331 opt
.row_cache
= rocksdb::NewLRUCache(row_cache_size
,
332 g_conf
->rocksdb_cache_shard_bits
);
333 uint64_t bloom_bits
= g_conf
->get_val
<uint64_t>("rocksdb_bloom_bits_per_key");
334 if (bloom_bits
> 0) {
335 dout(10) << __func__
<< " set bloom filter bits per key to "
336 << bloom_bits
<< dendl
;
337 bbt_opts
.filter_policy
.reset(rocksdb::NewBloomFilterPolicy(bloom_bits
));
339 if (g_conf
->get_val
<std::string
>("rocksdb_index_type") == "binary_search")
340 bbt_opts
.index_type
= rocksdb::BlockBasedTableOptions::IndexType::kBinarySearch
;
341 if (g_conf
->get_val
<std::string
>("rocksdb_index_type") == "hash_search")
342 bbt_opts
.index_type
= rocksdb::BlockBasedTableOptions::IndexType::kHashSearch
;
343 if (g_conf
->get_val
<std::string
>("rocksdb_index_type") == "two_level")
344 bbt_opts
.index_type
= rocksdb::BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch
;
345 bbt_opts
.cache_index_and_filter_blocks
=
346 g_conf
->get_val
<bool>("rocksdb_cache_index_and_filter_blocks");
347 bbt_opts
.cache_index_and_filter_blocks_with_high_priority
=
348 g_conf
->get_val
<bool>("rocksdb_cache_index_and_filter_blocks_with_high_priority");
349 bbt_opts
.partition_filters
= g_conf
->get_val
<bool>("rocksdb_partition_filters");
350 if (g_conf
->get_val
<uint64_t>("rocksdb_metadata_block_size") > 0)
351 bbt_opts
.metadata_block_size
= g_conf
->get_val
<uint64_t>("rocksdb_metadata_block_size");
352 bbt_opts
.pin_l0_filter_and_index_blocks_in_cache
=
353 g_conf
->get_val
<bool>("rocksdb_pin_l0_filter_and_index_blocks_in_cache");
355 opt
.table_factory
.reset(rocksdb::NewBlockBasedTableFactory(bbt_opts
));
356 dout(10) << __func__
<< " block size " << g_conf
->rocksdb_block_size
357 << ", block_cache size " << prettybyte_t(block_cache_size
)
358 << ", row_cache size " << prettybyte_t(row_cache_size
)
360 << (1 << g_conf
->rocksdb_cache_shard_bits
)
361 << ", type " << g_conf
->rocksdb_cache_type
364 opt
.merge_operator
.reset(new MergeOperatorRouter(*this));
365 status
= rocksdb::DB::Open(opt
, path
, &db
);
367 derr
<< status
.ToString() << dendl
;
371 PerfCountersBuilder
plb(g_ceph_context
, "rocksdb", l_rocksdb_first
, l_rocksdb_last
);
372 plb
.add_u64_counter(l_rocksdb_gets
, "get", "Gets");
373 plb
.add_u64_counter(l_rocksdb_txns
, "submit_transaction", "Submit transactions");
374 plb
.add_u64_counter(l_rocksdb_txns_sync
, "submit_transaction_sync", "Submit transactions sync");
375 plb
.add_time_avg(l_rocksdb_get_latency
, "get_latency", "Get latency");
376 plb
.add_time_avg(l_rocksdb_submit_latency
, "submit_latency", "Submit Latency");
377 plb
.add_time_avg(l_rocksdb_submit_sync_latency
, "submit_sync_latency", "Submit Sync Latency");
378 plb
.add_u64_counter(l_rocksdb_compact
, "compact", "Compactions");
379 plb
.add_u64_counter(l_rocksdb_compact_range
, "compact_range", "Compactions by range");
380 plb
.add_u64_counter(l_rocksdb_compact_queue_merge
, "compact_queue_merge", "Mergings of ranges in compaction queue");
381 plb
.add_u64(l_rocksdb_compact_queue_len
, "compact_queue_len", "Length of compaction queue");
382 plb
.add_time_avg(l_rocksdb_write_wal_time
, "rocksdb_write_wal_time", "Rocksdb write wal time");
383 plb
.add_time_avg(l_rocksdb_write_memtable_time
, "rocksdb_write_memtable_time", "Rocksdb write memtable time");
384 plb
.add_time_avg(l_rocksdb_write_delay_time
, "rocksdb_write_delay_time", "Rocksdb write delay time");
385 plb
.add_time_avg(l_rocksdb_write_pre_and_post_process_time
,
386 "rocksdb_write_pre_and_post_time", "total time spent on writing a record, excluding write process");
387 logger
= plb
.create_perf_counters();
388 cct
->get_perfcounters_collection()->add(logger
);
390 if (compact_on_mount
) {
391 derr
<< "Compacting rocksdb store..." << dendl
;
393 derr
<< "Finished compacting rocksdb store" << dendl
;
398 int RocksDBStore::_test_init(const string
& dir
)
400 rocksdb::Options options
;
401 options
.create_if_missing
= true;
403 rocksdb::Status status
= rocksdb::DB::Open(options
, dir
, &db
);
406 return status
.ok() ? 0 : -EIO
;
409 RocksDBStore::~RocksDBStore()
414 // Ensure db is destroyed before dependent db_cache and filterpolicy
419 delete static_cast<rocksdb::Env
*>(priv
);
423 void RocksDBStore::close()
425 // stop compaction thread
426 compact_queue_lock
.Lock();
427 if (compact_thread
.is_started()) {
428 compact_queue_stop
= true;
429 compact_queue_cond
.Signal();
430 compact_queue_lock
.Unlock();
431 compact_thread
.join();
433 compact_queue_lock
.Unlock();
437 cct
->get_perfcounters_collection()->remove(logger
);
440 void RocksDBStore::split_stats(const std::string
&s
, char delim
, std::vector
<std::string
> &elems
) {
441 std::stringstream ss
;
444 while (std::getline(ss
, item
, delim
)) {
445 elems
.push_back(item
);
449 void RocksDBStore::get_statistics(Formatter
*f
)
451 if (!g_conf
->rocksdb_perf
) {
452 dout(20) << __func__
<< "RocksDB perf is disabled, can't probe for stats"
457 if (g_conf
->rocksdb_collect_compaction_stats
) {
458 std::string stat_str
;
459 bool status
= db
->GetProperty("rocksdb.stats", &stat_str
);
461 f
->open_object_section("rocksdb_statistics");
462 f
->dump_string("rocksdb_compaction_statistics", "");
463 vector
<string
> stats
;
464 split_stats(stat_str
, '\n', stats
);
465 for (auto st
:stats
) {
466 f
->dump_string("", st
);
471 if (g_conf
->rocksdb_collect_extended_stats
) {
473 f
->open_object_section("rocksdb_extended_statistics");
474 string stat_str
= dbstats
->ToString();
475 vector
<string
> stats
;
476 split_stats(stat_str
, '\n', stats
);
477 f
->dump_string("rocksdb_extended_statistics", "");
478 for (auto st
:stats
) {
479 f
->dump_string(".", st
);
483 f
->open_object_section("rocksdbstore_perf_counters");
484 logger
->dump_formatted(f
,0);
487 if (g_conf
->rocksdb_collect_memory_stats
) {
488 f
->open_object_section("rocksdb_memtable_statistics");
489 std::string
str(stringify(bbt_opts
.block_cache
->GetUsage()));
490 f
->dump_string("block_cache_usage", str
.data());
492 str
.append(stringify(bbt_opts
.block_cache
->GetPinnedUsage()));
493 f
->dump_string("block_cache_pinned_blocks_usage", str
);
495 db
->GetProperty("rocksdb.cur-size-all-mem-tables", &str
);
496 f
->dump_string("rocksdb_memtable_usage", str
);
501 int RocksDBStore::submit_transaction(KeyValueDB::Transaction t
)
503 utime_t start
= ceph_clock_now();
504 // enable rocksdb breakdown
505 // considering performance overhead, default is disabled
506 if (g_conf
->rocksdb_perf
) {
507 rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex
);
508 rocksdb::perf_context
.Reset();
511 RocksDBTransactionImpl
* _t
=
512 static_cast<RocksDBTransactionImpl
*>(t
.get());
513 rocksdb::WriteOptions woptions
;
514 woptions
.disableWAL
= disableWAL
;
515 lgeneric_subdout(cct
, rocksdb
, 30) << __func__
;
516 RocksWBHandler bat_txc
;
517 _t
->bat
.Iterate(&bat_txc
);
518 *_dout
<< " Rocksdb transaction: " << bat_txc
.seen
<< dendl
;
520 rocksdb::Status s
= db
->Write(woptions
, &_t
->bat
);
522 RocksWBHandler rocks_txc
;
523 _t
->bat
.Iterate(&rocks_txc
);
524 derr
<< __func__
<< " error: " << s
.ToString() << " code = " << s
.code()
525 << " Rocksdb transaction: " << rocks_txc
.seen
<< dendl
;
527 utime_t lat
= ceph_clock_now() - start
;
529 if (g_conf
->rocksdb_perf
) {
530 utime_t write_memtable_time
;
531 utime_t write_delay_time
;
532 utime_t write_wal_time
;
533 utime_t write_pre_and_post_process_time
;
534 write_wal_time
.set_from_double(
535 static_cast<double>(rocksdb::perf_context
.write_wal_time
)/1000000000);
536 write_memtable_time
.set_from_double(
537 static_cast<double>(rocksdb::perf_context
.write_memtable_time
)/1000000000);
538 write_delay_time
.set_from_double(
539 static_cast<double>(rocksdb::perf_context
.write_delay_time
)/1000000000);
540 write_pre_and_post_process_time
.set_from_double(
541 static_cast<double>(rocksdb::perf_context
.write_pre_and_post_process_time
)/1000000000);
542 logger
->tinc(l_rocksdb_write_memtable_time
, write_memtable_time
);
543 logger
->tinc(l_rocksdb_write_delay_time
, write_delay_time
);
544 logger
->tinc(l_rocksdb_write_wal_time
, write_wal_time
);
545 logger
->tinc(l_rocksdb_write_pre_and_post_process_time
, write_pre_and_post_process_time
);
548 logger
->inc(l_rocksdb_txns
);
549 logger
->tinc(l_rocksdb_submit_latency
, lat
);
551 return s
.ok() ? 0 : -1;
554 int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t
)
556 utime_t start
= ceph_clock_now();
557 // enable rocksdb breakdown
558 // considering performance overhead, default is disabled
559 if (g_conf
->rocksdb_perf
) {
560 rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex
);
561 rocksdb::perf_context
.Reset();
564 RocksDBTransactionImpl
* _t
=
565 static_cast<RocksDBTransactionImpl
*>(t
.get());
566 rocksdb::WriteOptions woptions
;
567 woptions
.sync
= true;
568 woptions
.disableWAL
= disableWAL
;
569 lgeneric_subdout(cct
, rocksdb
, 30) << __func__
;
570 RocksWBHandler bat_txc
;
571 _t
->bat
.Iterate(&bat_txc
);
572 *_dout
<< " Rocksdb transaction: " << bat_txc
.seen
<< dendl
;
574 rocksdb::Status s
= db
->Write(woptions
, &_t
->bat
);
576 RocksWBHandler rocks_txc
;
577 _t
->bat
.Iterate(&rocks_txc
);
578 derr
<< __func__
<< " error: " << s
.ToString() << " code = " << s
.code()
579 << " Rocksdb transaction: " << rocks_txc
.seen
<< dendl
;
581 utime_t lat
= ceph_clock_now() - start
;
583 if (g_conf
->rocksdb_perf
) {
584 utime_t write_memtable_time
;
585 utime_t write_delay_time
;
586 utime_t write_wal_time
;
587 utime_t write_pre_and_post_process_time
;
588 write_wal_time
.set_from_double(
589 static_cast<double>(rocksdb::perf_context
.write_wal_time
)/1000000000);
590 write_memtable_time
.set_from_double(
591 static_cast<double>(rocksdb::perf_context
.write_memtable_time
)/1000000000);
592 write_delay_time
.set_from_double(
593 static_cast<double>(rocksdb::perf_context
.write_delay_time
)/1000000000);
594 write_pre_and_post_process_time
.set_from_double(
595 static_cast<double>(rocksdb::perf_context
.write_pre_and_post_process_time
)/1000000000);
596 logger
->tinc(l_rocksdb_write_memtable_time
, write_memtable_time
);
597 logger
->tinc(l_rocksdb_write_delay_time
, write_delay_time
);
598 logger
->tinc(l_rocksdb_write_wal_time
, write_wal_time
);
599 logger
->tinc(l_rocksdb_write_pre_and_post_process_time
, write_pre_and_post_process_time
);
602 logger
->inc(l_rocksdb_txns_sync
);
603 logger
->tinc(l_rocksdb_submit_sync_latency
, lat
);
605 return s
.ok() ? 0 : -1;
608 RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore
*_db
)
613 void RocksDBStore::RocksDBTransactionImpl::set(
614 const string
&prefix
,
616 const bufferlist
&to_set_bl
)
618 string key
= combine_strings(prefix
, k
);
620 // bufferlist::c_str() is non-constant, so we can't call c_str()
621 if (to_set_bl
.is_contiguous() && to_set_bl
.length() > 0) {
622 bat
.Put(rocksdb::Slice(key
),
623 rocksdb::Slice(to_set_bl
.buffers().front().c_str(),
624 to_set_bl
.length()));
626 rocksdb::Slice
key_slice(key
);
627 vector
<rocksdb::Slice
> value_slices(to_set_bl
.buffers().size());
628 bat
.Put(nullptr, rocksdb::SliceParts(&key_slice
, 1),
629 prepare_sliceparts(to_set_bl
, &value_slices
));
633 void RocksDBStore::RocksDBTransactionImpl::set(
634 const string
&prefix
,
635 const char *k
, size_t keylen
,
636 const bufferlist
&to_set_bl
)
639 combine_strings(prefix
, k
, keylen
, &key
);
641 // bufferlist::c_str() is non-constant, so we can't call c_str()
642 if (to_set_bl
.is_contiguous() && to_set_bl
.length() > 0) {
643 bat
.Put(rocksdb::Slice(key
),
644 rocksdb::Slice(to_set_bl
.buffers().front().c_str(),
645 to_set_bl
.length()));
647 rocksdb::Slice
key_slice(key
);
648 vector
<rocksdb::Slice
> value_slices(to_set_bl
.buffers().size());
649 bat
.Put(nullptr, rocksdb::SliceParts(&key_slice
, 1),
650 prepare_sliceparts(to_set_bl
, &value_slices
));
654 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string
&prefix
,
657 bat
.Delete(combine_strings(prefix
, k
));
660 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string
&prefix
,
665 combine_strings(prefix
, k
, keylen
, &key
);
669 void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string
&prefix
,
672 bat
.SingleDelete(combine_strings(prefix
, k
));
675 void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string
&prefix
)
677 if (db
->enable_rmrange
) {
678 string endprefix
= prefix
;
679 endprefix
.push_back('\x01');
680 bat
.DeleteRange(combine_strings(prefix
, string()),
681 combine_strings(endprefix
, string()));
683 KeyValueDB::Iterator it
= db
->get_iterator(prefix
);
684 for (it
->seek_to_first();
687 bat
.Delete(combine_strings(prefix
, it
->key()));
692 void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string
&prefix
,
696 if (db
->enable_rmrange
) {
697 bat
.DeleteRange(combine_strings(prefix
, start
), combine_strings(prefix
, end
));
699 auto it
= db
->get_iterator(prefix
);
700 it
->lower_bound(start
);
701 while (it
->valid()) {
702 if (it
->key() >= end
) {
705 bat
.Delete(combine_strings(prefix
, it
->key()));
711 void RocksDBStore::RocksDBTransactionImpl::merge(
712 const string
&prefix
,
714 const bufferlist
&to_set_bl
)
716 string key
= combine_strings(prefix
, k
);
718 // bufferlist::c_str() is non-constant, so we can't call c_str()
719 if (to_set_bl
.is_contiguous() && to_set_bl
.length() > 0) {
720 bat
.Merge(rocksdb::Slice(key
),
721 rocksdb::Slice(to_set_bl
.buffers().front().c_str(),
722 to_set_bl
.length()));
725 rocksdb::Slice
key_slice(key
);
726 vector
<rocksdb::Slice
> value_slices(to_set_bl
.buffers().size());
727 bat
.Merge(nullptr, rocksdb::SliceParts(&key_slice
, 1),
728 prepare_sliceparts(to_set_bl
, &value_slices
));
732 //gets will bypass RocksDB row cache, since it uses iterator
733 int RocksDBStore::get(
734 const string
&prefix
,
735 const std::set
<string
> &keys
,
736 std::map
<string
, bufferlist
> *out
)
738 utime_t start
= ceph_clock_now();
739 for (std::set
<string
>::const_iterator i
= keys
.begin();
740 i
!= keys
.end(); ++i
) {
742 std::string bound
= combine_strings(prefix
, *i
);
743 auto status
= db
->Get(rocksdb::ReadOptions(), rocksdb::Slice(bound
), &value
);
745 (*out
)[*i
].append(value
);
746 } else if (status
.IsIOError()) {
747 ceph_abort_msg(cct
, status
.ToString());
751 utime_t lat
= ceph_clock_now() - start
;
752 logger
->inc(l_rocksdb_gets
);
753 logger
->tinc(l_rocksdb_get_latency
, lat
);
757 int RocksDBStore::get(
758 const string
&prefix
,
762 assert(out
&& (out
->length() == 0));
763 utime_t start
= ceph_clock_now();
767 k
= combine_strings(prefix
, key
);
768 s
= db
->Get(rocksdb::ReadOptions(), rocksdb::Slice(k
), &value
);
771 } else if (s
.IsNotFound()) {
774 ceph_abort_msg(cct
, s
.ToString());
776 utime_t lat
= ceph_clock_now() - start
;
777 logger
->inc(l_rocksdb_gets
);
778 logger
->tinc(l_rocksdb_get_latency
, lat
);
782 int RocksDBStore::get(
783 const string
& prefix
,
788 assert(out
&& (out
->length() == 0));
789 utime_t start
= ceph_clock_now();
792 combine_strings(prefix
, key
, keylen
, &k
);
794 s
= db
->Get(rocksdb::ReadOptions(), rocksdb::Slice(k
), &value
);
797 } else if (s
.IsNotFound()) {
800 ceph_abort_msg(cct
, s
.ToString());
802 utime_t lat
= ceph_clock_now() - start
;
803 logger
->inc(l_rocksdb_gets
);
804 logger
->tinc(l_rocksdb_get_latency
, lat
);
808 int RocksDBStore::split_key(rocksdb::Slice in
, string
*prefix
, string
*key
)
810 size_t prefix_len
= 0;
812 // Find separator inside Slice
813 char* separator
= (char*) memchr(in
.data(), 0, in
.size());
814 if (separator
== NULL
)
816 prefix_len
= size_t(separator
- in
.data());
817 if (prefix_len
>= in
.size())
820 // Fetch prefix and/or key directly from Slice
822 *prefix
= string(in
.data(), prefix_len
);
824 *key
= string(separator
+1, in
.size()-prefix_len
-1);
828 void RocksDBStore::compact()
830 logger
->inc(l_rocksdb_compact
);
831 rocksdb::CompactRangeOptions options
;
832 db
->CompactRange(options
, nullptr, nullptr);
836 void RocksDBStore::compact_thread_entry()
838 compact_queue_lock
.Lock();
839 while (!compact_queue_stop
) {
840 while (!compact_queue
.empty()) {
841 pair
<string
,string
> range
= compact_queue
.front();
842 compact_queue
.pop_front();
843 logger
->set(l_rocksdb_compact_queue_len
, compact_queue
.size());
844 compact_queue_lock
.Unlock();
845 logger
->inc(l_rocksdb_compact_range
);
846 compact_range(range
.first
, range
.second
);
847 compact_queue_lock
.Lock();
850 compact_queue_cond
.Wait(compact_queue_lock
);
852 compact_queue_lock
.Unlock();
855 void RocksDBStore::compact_range_async(const string
& start
, const string
& end
)
857 Mutex::Locker
l(compact_queue_lock
);
859 // try to merge adjacent ranges. this is O(n), but the queue should
860 // be short. note that we do not cover all overlap cases and merge
861 // opportunities here, but we capture the ones we currently need.
862 list
< pair
<string
,string
> >::iterator p
= compact_queue
.begin();
863 while (p
!= compact_queue
.end()) {
864 if (p
->first
== start
&& p
->second
== end
) {
868 if (p
->first
<= end
&& p
->first
> start
) {
869 // merge with existing range to the right
870 compact_queue
.push_back(make_pair(start
, p
->second
));
871 compact_queue
.erase(p
);
872 logger
->inc(l_rocksdb_compact_queue_merge
);
875 if (p
->second
>= start
&& p
->second
< end
) {
876 // merge with existing range to the left
877 compact_queue
.push_back(make_pair(p
->first
, end
));
878 compact_queue
.erase(p
);
879 logger
->inc(l_rocksdb_compact_queue_merge
);
884 if (p
== compact_queue
.end()) {
885 // no merge, new entry.
886 compact_queue
.push_back(make_pair(start
, end
));
887 logger
->set(l_rocksdb_compact_queue_len
, compact_queue
.size());
889 compact_queue_cond
.Signal();
890 if (!compact_thread
.is_started()) {
891 compact_thread
.create("rstore_compact");
894 bool RocksDBStore::check_omap_dir(string
&omap_dir
)
896 rocksdb::Options options
;
897 options
.create_if_missing
= true;
899 rocksdb::Status status
= rocksdb::DB::Open(options
, omap_dir
, &db
);
904 void RocksDBStore::compact_range(const string
& start
, const string
& end
)
906 rocksdb::CompactRangeOptions options
;
907 rocksdb::Slice
cstart(start
);
908 rocksdb::Slice
cend(end
);
909 db
->CompactRange(options
, &cstart
, &cend
);
911 RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl()
915 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first()
917 dbiter
->SeekToFirst();
918 assert(!dbiter
->status().IsIOError());
919 return dbiter
->status().ok() ? 0 : -1;
921 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string
&prefix
)
923 rocksdb::Slice
slice_prefix(prefix
);
924 dbiter
->Seek(slice_prefix
);
925 assert(!dbiter
->status().IsIOError());
926 return dbiter
->status().ok() ? 0 : -1;
928 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last()
930 dbiter
->SeekToLast();
931 assert(!dbiter
->status().IsIOError());
932 return dbiter
->status().ok() ? 0 : -1;
934 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string
&prefix
)
936 string limit
= past_prefix(prefix
);
937 rocksdb::Slice
slice_limit(limit
);
938 dbiter
->Seek(slice_limit
);
940 if (!dbiter
->Valid()) {
941 dbiter
->SeekToLast();
945 return dbiter
->status().ok() ? 0 : -1;
947 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string
&prefix
, const string
&after
)
949 lower_bound(prefix
, after
);
951 pair
<string
,string
> key
= raw_key();
952 if (key
.first
== prefix
&& key
.second
== after
)
955 return dbiter
->status().ok() ? 0 : -1;
957 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string
&prefix
, const string
&to
)
959 string bound
= combine_strings(prefix
, to
);
960 rocksdb::Slice
slice_bound(bound
);
961 dbiter
->Seek(slice_bound
);
962 return dbiter
->status().ok() ? 0 : -1;
964 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid()
966 return dbiter
->Valid();
968 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next()
973 assert(!dbiter
->status().IsIOError());
974 return dbiter
->status().ok() ? 0 : -1;
976 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev()
981 assert(!dbiter
->status().IsIOError());
982 return dbiter
->status().ok() ? 0 : -1;
984 string
RocksDBStore::RocksDBWholeSpaceIteratorImpl::key()
987 split_key(dbiter
->key(), 0, &out_key
);
990 pair
<string
,string
> RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key()
993 split_key(dbiter
->key(), &prefix
, &key
);
994 return make_pair(prefix
, key
);
997 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key_is_prefixed(const string
&prefix
) {
998 // Look for "prefix\0" right in rocksb::Slice
999 rocksdb::Slice key
= dbiter
->key();
1000 if ((key
.size() > prefix
.length()) && (key
[prefix
.length()] == '\0')) {
1001 return memcmp(key
.data(), prefix
.c_str(), prefix
.length()) == 0;
1007 bufferlist
RocksDBStore::RocksDBWholeSpaceIteratorImpl::value()
1009 return to_bufferlist(dbiter
->value());
1012 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::key_size()
1014 return dbiter
->key().size();
1017 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_size()
1019 return dbiter
->value().size();
1022 bufferptr
RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_as_ptr()
1024 rocksdb::Slice val
= dbiter
->value();
1025 return bufferptr(val
.data(), val
.size());
1028 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status()
1030 return dbiter
->status().ok() ? 0 : -1;
1033 string
RocksDBStore::past_prefix(const string
&prefix
)
1035 string limit
= prefix
;
1040 RocksDBStore::WholeSpaceIterator
RocksDBStore::_get_iterator()
1042 return std::make_shared
<RocksDBWholeSpaceIteratorImpl
>(
1043 db
->NewIterator(rocksdb::ReadOptions()));