1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
10 #include <sys/types.h>
13 #include "rocksdb/db.h"
14 #include "rocksdb/table.h"
15 #include "rocksdb/env.h"
16 #include "rocksdb/slice.h"
17 #include "rocksdb/cache.h"
18 #include "rocksdb/filter_policy.h"
19 #include "rocksdb/utilities/convenience.h"
20 #include "rocksdb/merge_operator.h"
22 #include "common/perf_counters.h"
23 #include "common/debug.h"
24 #include "include/str_list.h"
25 #include "include/stringify.h"
26 #include "include/str_map.h"
27 #include "KeyValueDB.h"
28 #include "RocksDBStore.h"
30 #include "common/debug.h"
32 #define dout_context cct
33 #define dout_subsys ceph_subsys_rocksdb
35 #define dout_prefix *_dout << "rocksdb: "
37 static rocksdb::SliceParts
prepare_sliceparts(const bufferlist
&bl
, rocksdb::Slice
*slices
)
40 for (std::list
<buffer::ptr
>::const_iterator p
= bl
.buffers().begin();
41 p
!= bl
.buffers().end(); ++p
, ++n
) {
42 slices
[n
].data_
= p
->c_str();
43 slices
[n
].size_
= p
->length();
45 return rocksdb::SliceParts(slices
, n
);
49 // One of these per rocksdb instance, implements the merge operator prefix stuff
51 class RocksDBStore::MergeOperatorRouter
: public rocksdb::AssociativeMergeOperator
{
54 const char *Name() const override
{
55 // Construct a name that rocksDB will validate against. We want to
56 // do this in a way that doesn't constrain the ordering of calls
57 // to set_merge_operator, so sort the merge operators and then
58 // construct a name from all of those parts.
59 store
.assoc_name
.clear();
60 map
<std::string
,std::string
> names
;
61 for (auto& p
: store
.merge_ops
) names
[p
.first
] = p
.second
->name();
62 for (auto& p
: names
) {
63 store
.assoc_name
+= '.';
64 store
.assoc_name
+= p
.first
;
65 store
.assoc_name
+= ':';
66 store
.assoc_name
+= p
.second
;
68 return store
.assoc_name
.c_str();
71 MergeOperatorRouter(RocksDBStore
&_store
) : store(_store
) {}
73 bool Merge(const rocksdb::Slice
& key
,
74 const rocksdb::Slice
* existing_value
,
75 const rocksdb::Slice
& value
,
76 std::string
* new_value
,
77 rocksdb::Logger
* logger
) const override
{
79 for (auto& p
: store
.merge_ops
) {
80 if (p
.first
.compare(0, p
.first
.length(),
81 key
.data(), p
.first
.length()) == 0 &&
82 key
.data()[p
.first
.length()] == 0) {
84 p
.second
->merge(existing_value
->data(), existing_value
->size(),
85 value
.data(), value
.size(),
88 p
.second
->merge_nonexistent(value
.data(), value
.size(), new_value
);
98 int RocksDBStore::set_merge_operator(
100 std::shared_ptr
<KeyValueDB::MergeOperator
> mop
)
102 // If you fail here, it's because you can't do this on an open database
103 assert(db
== nullptr);
104 merge_ops
.push_back(std::make_pair(prefix
,mop
));
108 class CephRocksdbLogger
: public rocksdb::Logger
{
111 explicit CephRocksdbLogger(CephContext
*c
) : cct(c
) {
114 ~CephRocksdbLogger() override
{
118 // Write an entry to the log file with the specified format.
119 void Logv(const char* format
, va_list ap
) override
{
120 Logv(rocksdb::INFO_LEVEL
, format
, ap
);
123 // Write an entry to the log file with the specified log level
124 // and format. Any log with level under the internal log level
125 // of *this (see @SetInfoLogLevel and @GetInfoLogLevel) will not be
127 void Logv(const rocksdb::InfoLogLevel log_level
, const char* format
,
128 va_list ap
) override
{
129 int v
= rocksdb::NUM_INFO_LOG_LEVELS
- log_level
- 1;
132 vsnprintf(buf
, sizeof(buf
), format
, ap
);
133 *_dout
<< buf
<< dendl
;
137 rocksdb::Logger
*create_rocksdb_ceph_logger()
139 return new CephRocksdbLogger(g_ceph_context
);
142 int string2bool(string val
, bool &b_val
)
144 if (strcasecmp(val
.c_str(), "false") == 0) {
147 } else if (strcasecmp(val
.c_str(), "true") == 0) {
152 int b
= strict_strtol(val
.c_str(), 10, &err
);
160 int RocksDBStore::tryInterpret(const string key
, const string val
, rocksdb::Options
&opt
)
162 if (key
== "compaction_threads") {
164 int f
= strict_sistrtoll(val
.c_str(), &err
);
167 //Low priority threadpool is used for compaction
168 opt
.env
->SetBackgroundThreads(f
, rocksdb::Env::Priority::LOW
);
169 } else if (key
== "flusher_threads") {
171 int f
= strict_sistrtoll(val
.c_str(), &err
);
174 //High priority threadpool is used for flusher
175 opt
.env
->SetBackgroundThreads(f
, rocksdb::Env::Priority::HIGH
);
176 } else if (key
== "compact_on_mount") {
177 int ret
= string2bool(val
, compact_on_mount
);
180 } else if (key
== "disableWAL") {
181 int ret
= string2bool(val
, disableWAL
);
185 //unrecognize config options.
191 int RocksDBStore::ParseOptionsFromString(const string opt_str
, rocksdb::Options
&opt
)
193 map
<string
, string
> str_map
;
194 int r
= get_str_map(opt_str
, &str_map
, ",\n;");
197 map
<string
, string
>::iterator it
;
198 for(it
= str_map
.begin(); it
!= str_map
.end(); ++it
) {
199 string this_opt
= it
->first
+ "=" + it
->second
;
200 rocksdb::Status status
= rocksdb::GetOptionsFromString(opt
, this_opt
, &opt
);
202 //unrecognized by rocksdb, try to interpret by ourselves.
203 r
= tryInterpret(it
->first
, it
->second
, opt
);
205 derr
<< status
.ToString() << dendl
;
209 lgeneric_dout(cct
, 0) << " set rocksdb option " << it
->first
210 << " = " << it
->second
<< dendl
;
215 int RocksDBStore::init(string _options_str
)
217 options_str
= _options_str
;
218 rocksdb::Options opt
;
220 if (options_str
.length()) {
221 int r
= ParseOptionsFromString(options_str
, opt
);
229 int RocksDBStore::create_and_open(ostream
&out
)
232 unique_ptr
<rocksdb::Directory
> dir
;
233 env
->NewDirectory(path
, &dir
);
235 int r
= ::mkdir(path
.c_str(), 0755);
238 if (r
< 0 && r
!= -EEXIST
) {
239 derr
<< __func__
<< " failed to create " << path
<< ": " << cpp_strerror(r
)
244 return do_open(out
, true);
247 int RocksDBStore::do_open(ostream
&out
, bool create_if_missing
)
249 rocksdb::Options opt
;
250 rocksdb::Status status
;
252 if (options_str
.length()) {
253 int r
= ParseOptionsFromString(options_str
, opt
);
259 if (g_conf
->rocksdb_perf
) {
260 dbstats
= rocksdb::CreateDBStatistics();
261 opt
.statistics
= dbstats
;
264 opt
.create_if_missing
= create_if_missing
;
265 if (g_conf
->rocksdb_separate_wal_dir
) {
266 opt
.wal_dir
= path
+ ".wal";
268 if (g_conf
->get_val
<std::string
>("rocksdb_db_paths").length()) {
270 get_str_list(g_conf
->get_val
<std::string
>("rocksdb_db_paths"), "; \t", paths
);
271 for (auto& p
: paths
) {
272 size_t pos
= p
.find(',');
273 if (pos
== std::string::npos
) {
274 derr
<< __func__
<< " invalid db path item " << p
<< " in "
275 << g_conf
->get_val
<std::string
>("rocksdb_db_paths") << dendl
;
278 string path
= p
.substr(0, pos
);
279 string size_str
= p
.substr(pos
+ 1);
280 uint64_t size
= atoll(size_str
.c_str());
282 derr
<< __func__
<< " invalid db path item " << p
<< " in "
283 << g_conf
->get_val
<std::string
>("rocksdb_db_paths") << dendl
;
286 opt
.db_paths
.push_back(rocksdb::DbPath(path
, size
));
287 dout(10) << __func__
<< " db_path " << path
<< " size " << size
<< dendl
;
291 if (g_conf
->rocksdb_log_to_ceph_log
) {
292 opt
.info_log
.reset(new CephRocksdbLogger(g_ceph_context
));
296 dout(10) << __func__
<< " using custom Env " << priv
<< dendl
;
297 opt
.env
= static_cast<rocksdb::Env
*>(priv
);
302 cache_size
= g_conf
->rocksdb_cache_size
;
304 uint64_t row_cache_size
= cache_size
* g_conf
->rocksdb_cache_row_ratio
;
305 uint64_t block_cache_size
= cache_size
- row_cache_size
;
306 if (g_conf
->rocksdb_cache_type
== "lru") {
307 bbt_opts
.block_cache
= rocksdb::NewLRUCache(
309 g_conf
->rocksdb_cache_shard_bits
);
310 } else if (g_conf
->rocksdb_cache_type
== "clock") {
311 bbt_opts
.block_cache
= rocksdb::NewClockCache(
313 g_conf
->rocksdb_cache_shard_bits
);
315 derr
<< "unrecognized rocksdb_cache_type '" << g_conf
->rocksdb_cache_type
319 bbt_opts
.block_size
= g_conf
->rocksdb_block_size
;
321 opt
.row_cache
= rocksdb::NewLRUCache(row_cache_size
,
322 g_conf
->rocksdb_cache_shard_bits
);
324 if (g_conf
->kstore_rocksdb_bloom_bits_per_key
> 0) {
325 dout(10) << __func__
<< " set bloom filter bits per key to "
326 << g_conf
->kstore_rocksdb_bloom_bits_per_key
<< dendl
;
327 bbt_opts
.filter_policy
.reset(rocksdb::NewBloomFilterPolicy(
328 g_conf
->kstore_rocksdb_bloom_bits_per_key
));
330 opt
.table_factory
.reset(rocksdb::NewBlockBasedTableFactory(bbt_opts
));
331 dout(10) << __func__
<< " block size " << g_conf
->rocksdb_block_size
332 << ", block_cache size " << prettybyte_t(block_cache_size
)
333 << ", row_cache size " << prettybyte_t(row_cache_size
)
335 << (1 << g_conf
->rocksdb_cache_shard_bits
)
336 << ", type " << g_conf
->rocksdb_cache_type
339 opt
.merge_operator
.reset(new MergeOperatorRouter(*this));
340 status
= rocksdb::DB::Open(opt
, path
, &db
);
342 derr
<< status
.ToString() << dendl
;
346 PerfCountersBuilder
plb(g_ceph_context
, "rocksdb", l_rocksdb_first
, l_rocksdb_last
);
347 plb
.add_u64_counter(l_rocksdb_gets
, "get", "Gets");
348 plb
.add_u64_counter(l_rocksdb_txns
, "submit_transaction", "Submit transactions");
349 plb
.add_u64_counter(l_rocksdb_txns_sync
, "submit_transaction_sync", "Submit transactions sync");
350 plb
.add_time_avg(l_rocksdb_get_latency
, "get_latency", "Get latency");
351 plb
.add_time_avg(l_rocksdb_submit_latency
, "submit_latency", "Submit Latency");
352 plb
.add_time_avg(l_rocksdb_submit_sync_latency
, "submit_sync_latency", "Submit Sync Latency");
353 plb
.add_u64_counter(l_rocksdb_compact
, "compact", "Compactions");
354 plb
.add_u64_counter(l_rocksdb_compact_range
, "compact_range", "Compactions by range");
355 plb
.add_u64_counter(l_rocksdb_compact_queue_merge
, "compact_queue_merge", "Mergings of ranges in compaction queue");
356 plb
.add_u64(l_rocksdb_compact_queue_len
, "compact_queue_len", "Length of compaction queue");
357 plb
.add_time_avg(l_rocksdb_write_wal_time
, "rocksdb_write_wal_time", "Rocksdb write wal time");
358 plb
.add_time_avg(l_rocksdb_write_memtable_time
, "rocksdb_write_memtable_time", "Rocksdb write memtable time");
359 plb
.add_time_avg(l_rocksdb_write_delay_time
, "rocksdb_write_delay_time", "Rocksdb write delay time");
360 plb
.add_time_avg(l_rocksdb_write_pre_and_post_process_time
,
361 "rocksdb_write_pre_and_post_time", "total time spent on writing a record, excluding write process");
362 logger
= plb
.create_perf_counters();
363 cct
->get_perfcounters_collection()->add(logger
);
365 if (compact_on_mount
) {
366 derr
<< "Compacting rocksdb store..." << dendl
;
368 derr
<< "Finished compacting rocksdb store" << dendl
;
373 int RocksDBStore::_test_init(const string
& dir
)
375 rocksdb::Options options
;
376 options
.create_if_missing
= true;
378 rocksdb::Status status
= rocksdb::DB::Open(options
, dir
, &db
);
381 return status
.ok() ? 0 : -EIO
;
384 RocksDBStore::~RocksDBStore()
389 // Ensure db is destroyed before dependent db_cache and filterpolicy
394 delete static_cast<rocksdb::Env
*>(priv
);
398 void RocksDBStore::close()
400 // stop compaction thread
401 compact_queue_lock
.Lock();
402 if (compact_thread
.is_started()) {
403 compact_queue_stop
= true;
404 compact_queue_cond
.Signal();
405 compact_queue_lock
.Unlock();
406 compact_thread
.join();
408 compact_queue_lock
.Unlock();
412 cct
->get_perfcounters_collection()->remove(logger
);
415 void RocksDBStore::split_stats(const std::string
&s
, char delim
, std::vector
<std::string
> &elems
) {
416 std::stringstream ss
;
419 while (std::getline(ss
, item
, delim
)) {
420 elems
.push_back(item
);
424 void RocksDBStore::get_statistics(Formatter
*f
)
426 if (!g_conf
->rocksdb_perf
) {
427 dout(20) << __func__
<< "RocksDB perf is disabled, can't probe for stats"
432 if (g_conf
->rocksdb_collect_compaction_stats
) {
433 std::string stat_str
;
434 bool status
= db
->GetProperty("rocksdb.stats", &stat_str
);
436 f
->open_object_section("rocksdb_statistics");
437 f
->dump_string("rocksdb_compaction_statistics", "");
438 vector
<string
> stats
;
439 split_stats(stat_str
, '\n', stats
);
440 for (auto st
:stats
) {
441 f
->dump_string("", st
);
446 if (g_conf
->rocksdb_collect_extended_stats
) {
448 f
->open_object_section("rocksdb_extended_statistics");
449 string stat_str
= dbstats
->ToString();
450 vector
<string
> stats
;
451 split_stats(stat_str
, '\n', stats
);
452 f
->dump_string("rocksdb_extended_statistics", "");
453 for (auto st
:stats
) {
454 f
->dump_string(".", st
);
458 f
->open_object_section("rocksdbstore_perf_counters");
459 logger
->dump_formatted(f
,0);
462 if (g_conf
->rocksdb_collect_memory_stats
) {
463 f
->open_object_section("rocksdb_memtable_statistics");
464 std::string
str(stringify(bbt_opts
.block_cache
->GetUsage()));
465 f
->dump_string("block_cache_usage", str
.data());
467 str
.append(stringify(bbt_opts
.block_cache
->GetPinnedUsage()));
468 f
->dump_string("block_cache_pinned_blocks_usage", str
);
470 db
->GetProperty("rocksdb.cur-size-all-mem-tables", &str
);
471 f
->dump_string("rocksdb_memtable_usage", str
);
476 int RocksDBStore::submit_transaction(KeyValueDB::Transaction t
)
478 utime_t start
= ceph_clock_now();
479 // enable rocksdb breakdown
480 // considering performance overhead, default is disabled
481 if (g_conf
->rocksdb_perf
) {
482 rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex
);
483 rocksdb::perf_context
.Reset();
486 RocksDBTransactionImpl
* _t
=
487 static_cast<RocksDBTransactionImpl
*>(t
.get());
488 rocksdb::WriteOptions woptions
;
489 woptions
.disableWAL
= disableWAL
;
490 lgeneric_subdout(cct
, rocksdb
, 30) << __func__
;
491 RocksWBHandler bat_txc
;
492 _t
->bat
.Iterate(&bat_txc
);
493 *_dout
<< " Rocksdb transaction: " << bat_txc
.seen
<< dendl
;
495 rocksdb::Status s
= db
->Write(woptions
, &_t
->bat
);
497 RocksWBHandler rocks_txc
;
498 _t
->bat
.Iterate(&rocks_txc
);
499 derr
<< __func__
<< " error: " << s
.ToString() << " code = " << s
.code()
500 << " Rocksdb transaction: " << rocks_txc
.seen
<< dendl
;
502 utime_t lat
= ceph_clock_now() - start
;
504 if (g_conf
->rocksdb_perf
) {
505 utime_t write_memtable_time
;
506 utime_t write_delay_time
;
507 utime_t write_wal_time
;
508 utime_t write_pre_and_post_process_time
;
509 write_wal_time
.set_from_double(
510 static_cast<double>(rocksdb::perf_context
.write_wal_time
)/1000000000);
511 write_memtable_time
.set_from_double(
512 static_cast<double>(rocksdb::perf_context
.write_memtable_time
)/1000000000);
513 write_delay_time
.set_from_double(
514 static_cast<double>(rocksdb::perf_context
.write_delay_time
)/1000000000);
515 write_pre_and_post_process_time
.set_from_double(
516 static_cast<double>(rocksdb::perf_context
.write_pre_and_post_process_time
)/1000000000);
517 logger
->tinc(l_rocksdb_write_memtable_time
, write_memtable_time
);
518 logger
->tinc(l_rocksdb_write_delay_time
, write_delay_time
);
519 logger
->tinc(l_rocksdb_write_wal_time
, write_wal_time
);
520 logger
->tinc(l_rocksdb_write_pre_and_post_process_time
, write_pre_and_post_process_time
);
523 logger
->inc(l_rocksdb_txns
);
524 logger
->tinc(l_rocksdb_submit_latency
, lat
);
526 return s
.ok() ? 0 : -1;
529 int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t
)
531 utime_t start
= ceph_clock_now();
532 // enable rocksdb breakdown
533 // considering performance overhead, default is disabled
534 if (g_conf
->rocksdb_perf
) {
535 rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex
);
536 rocksdb::perf_context
.Reset();
539 RocksDBTransactionImpl
* _t
=
540 static_cast<RocksDBTransactionImpl
*>(t
.get());
541 rocksdb::WriteOptions woptions
;
542 woptions
.sync
= true;
543 woptions
.disableWAL
= disableWAL
;
544 lgeneric_subdout(cct
, rocksdb
, 30) << __func__
;
545 RocksWBHandler bat_txc
;
546 _t
->bat
.Iterate(&bat_txc
);
547 *_dout
<< " Rocksdb transaction: " << bat_txc
.seen
<< dendl
;
549 rocksdb::Status s
= db
->Write(woptions
, &_t
->bat
);
551 RocksWBHandler rocks_txc
;
552 _t
->bat
.Iterate(&rocks_txc
);
553 derr
<< __func__
<< " error: " << s
.ToString() << " code = " << s
.code()
554 << " Rocksdb transaction: " << rocks_txc
.seen
<< dendl
;
556 utime_t lat
= ceph_clock_now() - start
;
558 if (g_conf
->rocksdb_perf
) {
559 utime_t write_memtable_time
;
560 utime_t write_delay_time
;
561 utime_t write_wal_time
;
562 utime_t write_pre_and_post_process_time
;
563 write_wal_time
.set_from_double(
564 static_cast<double>(rocksdb::perf_context
.write_wal_time
)/1000000000);
565 write_memtable_time
.set_from_double(
566 static_cast<double>(rocksdb::perf_context
.write_memtable_time
)/1000000000);
567 write_delay_time
.set_from_double(
568 static_cast<double>(rocksdb::perf_context
.write_delay_time
)/1000000000);
569 write_pre_and_post_process_time
.set_from_double(
570 static_cast<double>(rocksdb::perf_context
.write_pre_and_post_process_time
)/1000000000);
571 logger
->tinc(l_rocksdb_write_memtable_time
, write_memtable_time
);
572 logger
->tinc(l_rocksdb_write_delay_time
, write_delay_time
);
573 logger
->tinc(l_rocksdb_write_wal_time
, write_wal_time
);
574 logger
->tinc(l_rocksdb_write_pre_and_post_process_time
, write_pre_and_post_process_time
);
577 logger
->inc(l_rocksdb_txns_sync
);
578 logger
->tinc(l_rocksdb_submit_sync_latency
, lat
);
580 return s
.ok() ? 0 : -1;
583 RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore
*_db
)
588 void RocksDBStore::RocksDBTransactionImpl::set(
589 const string
&prefix
,
591 const bufferlist
&to_set_bl
)
593 string key
= combine_strings(prefix
, k
);
595 // bufferlist::c_str() is non-constant, so we can't call c_str()
596 if (to_set_bl
.is_contiguous() && to_set_bl
.length() > 0) {
597 bat
.Put(rocksdb::Slice(key
),
598 rocksdb::Slice(to_set_bl
.buffers().front().c_str(),
599 to_set_bl
.length()));
601 rocksdb::Slice
key_slice(key
);
602 rocksdb::Slice value_slices
[to_set_bl
.buffers().size()];
603 bat
.Put(nullptr, rocksdb::SliceParts(&key_slice
, 1),
604 prepare_sliceparts(to_set_bl
, value_slices
));
608 void RocksDBStore::RocksDBTransactionImpl::set(
609 const string
&prefix
,
610 const char *k
, size_t keylen
,
611 const bufferlist
&to_set_bl
)
614 combine_strings(prefix
, k
, keylen
, &key
);
616 // bufferlist::c_str() is non-constant, so we can't call c_str()
617 if (to_set_bl
.is_contiguous() && to_set_bl
.length() > 0) {
618 bat
.Put(rocksdb::Slice(key
),
619 rocksdb::Slice(to_set_bl
.buffers().front().c_str(),
620 to_set_bl
.length()));
622 rocksdb::Slice
key_slice(key
);
623 rocksdb::Slice value_slices
[to_set_bl
.buffers().size()];
624 bat
.Put(nullptr, rocksdb::SliceParts(&key_slice
, 1),
625 prepare_sliceparts(to_set_bl
, value_slices
));
629 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string
&prefix
,
632 bat
.Delete(combine_strings(prefix
, k
));
635 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string
&prefix
,
640 combine_strings(prefix
, k
, keylen
, &key
);
644 void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string
&prefix
,
647 bat
.SingleDelete(combine_strings(prefix
, k
));
650 void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string
&prefix
)
652 if (db
->enable_rmrange
) {
653 string endprefix
= prefix
;
654 endprefix
.push_back('\x01');
655 bat
.DeleteRange(combine_strings(prefix
, string()),
656 combine_strings(endprefix
, string()));
658 KeyValueDB::Iterator it
= db
->get_iterator(prefix
);
659 for (it
->seek_to_first();
662 bat
.Delete(combine_strings(prefix
, it
->key()));
667 void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string
&prefix
,
671 if (db
->enable_rmrange
) {
672 bat
.DeleteRange(combine_strings(prefix
, start
), combine_strings(prefix
, end
));
674 auto it
= db
->get_iterator(prefix
);
675 it
->lower_bound(start
);
676 while (it
->valid()) {
677 if (it
->key() >= end
) {
680 bat
.Delete(combine_strings(prefix
, it
->key()));
686 void RocksDBStore::RocksDBTransactionImpl::merge(
687 const string
&prefix
,
689 const bufferlist
&to_set_bl
)
691 string key
= combine_strings(prefix
, k
);
693 // bufferlist::c_str() is non-constant, so we can't call c_str()
694 if (to_set_bl
.is_contiguous() && to_set_bl
.length() > 0) {
695 bat
.Merge(rocksdb::Slice(key
),
696 rocksdb::Slice(to_set_bl
.buffers().front().c_str(),
697 to_set_bl
.length()));
700 rocksdb::Slice
key_slice(key
);
701 rocksdb::Slice value_slices
[to_set_bl
.buffers().size()];
702 bat
.Merge(nullptr, rocksdb::SliceParts(&key_slice
, 1),
703 prepare_sliceparts(to_set_bl
, value_slices
));
707 //gets will bypass RocksDB row cache, since it uses iterator
708 int RocksDBStore::get(
709 const string
&prefix
,
710 const std::set
<string
> &keys
,
711 std::map
<string
, bufferlist
> *out
)
713 utime_t start
= ceph_clock_now();
714 for (std::set
<string
>::const_iterator i
= keys
.begin();
715 i
!= keys
.end(); ++i
) {
717 std::string bound
= combine_strings(prefix
, *i
);
718 auto status
= db
->Get(rocksdb::ReadOptions(), rocksdb::Slice(bound
), &value
);
720 (*out
)[*i
].append(value
);
722 utime_t lat
= ceph_clock_now() - start
;
723 logger
->inc(l_rocksdb_gets
);
724 logger
->tinc(l_rocksdb_get_latency
, lat
);
728 int RocksDBStore::get(
729 const string
&prefix
,
733 assert(out
&& (out
->length() == 0));
734 utime_t start
= ceph_clock_now();
738 k
= combine_strings(prefix
, key
);
739 s
= db
->Get(rocksdb::ReadOptions(), rocksdb::Slice(k
), &value
);
745 utime_t lat
= ceph_clock_now() - start
;
746 logger
->inc(l_rocksdb_gets
);
747 logger
->tinc(l_rocksdb_get_latency
, lat
);
751 int RocksDBStore::get(
752 const string
& prefix
,
757 assert(out
&& (out
->length() == 0));
758 utime_t start
= ceph_clock_now();
761 combine_strings(prefix
, key
, keylen
, &k
);
763 s
= db
->Get(rocksdb::ReadOptions(), rocksdb::Slice(k
), &value
);
769 utime_t lat
= ceph_clock_now() - start
;
770 logger
->inc(l_rocksdb_gets
);
771 logger
->tinc(l_rocksdb_get_latency
, lat
);
775 int RocksDBStore::split_key(rocksdb::Slice in
, string
*prefix
, string
*key
)
777 size_t prefix_len
= 0;
779 // Find separator inside Slice
780 char* separator
= (char*) memchr(in
.data(), 0, in
.size());
781 if (separator
== NULL
)
783 prefix_len
= size_t(separator
- in
.data());
784 if (prefix_len
>= in
.size())
787 // Fetch prefix and/or key directly from Slice
789 *prefix
= string(in
.data(), prefix_len
);
791 *key
= string(separator
+1, in
.size()-prefix_len
-1);
795 void RocksDBStore::compact()
797 logger
->inc(l_rocksdb_compact
);
798 rocksdb::CompactRangeOptions options
;
799 db
->CompactRange(options
, nullptr, nullptr);
803 void RocksDBStore::compact_thread_entry()
805 compact_queue_lock
.Lock();
806 while (!compact_queue_stop
) {
807 while (!compact_queue
.empty()) {
808 pair
<string
,string
> range
= compact_queue
.front();
809 compact_queue
.pop_front();
810 logger
->set(l_rocksdb_compact_queue_len
, compact_queue
.size());
811 compact_queue_lock
.Unlock();
812 logger
->inc(l_rocksdb_compact_range
);
813 compact_range(range
.first
, range
.second
);
814 compact_queue_lock
.Lock();
817 compact_queue_cond
.Wait(compact_queue_lock
);
819 compact_queue_lock
.Unlock();
822 void RocksDBStore::compact_range_async(const string
& start
, const string
& end
)
824 Mutex::Locker
l(compact_queue_lock
);
826 // try to merge adjacent ranges. this is O(n), but the queue should
827 // be short. note that we do not cover all overlap cases and merge
828 // opportunities here, but we capture the ones we currently need.
829 list
< pair
<string
,string
> >::iterator p
= compact_queue
.begin();
830 while (p
!= compact_queue
.end()) {
831 if (p
->first
== start
&& p
->second
== end
) {
835 if (p
->first
<= end
&& p
->first
> start
) {
836 // merge with existing range to the right
837 compact_queue
.push_back(make_pair(start
, p
->second
));
838 compact_queue
.erase(p
);
839 logger
->inc(l_rocksdb_compact_queue_merge
);
842 if (p
->second
>= start
&& p
->second
< end
) {
843 // merge with existing range to the left
844 compact_queue
.push_back(make_pair(p
->first
, end
));
845 compact_queue
.erase(p
);
846 logger
->inc(l_rocksdb_compact_queue_merge
);
851 if (p
== compact_queue
.end()) {
852 // no merge, new entry.
853 compact_queue
.push_back(make_pair(start
, end
));
854 logger
->set(l_rocksdb_compact_queue_len
, compact_queue
.size());
856 compact_queue_cond
.Signal();
857 if (!compact_thread
.is_started()) {
858 compact_thread
.create("rstore_compact");
861 bool RocksDBStore::check_omap_dir(string
&omap_dir
)
863 rocksdb::Options options
;
864 options
.create_if_missing
= true;
866 rocksdb::Status status
= rocksdb::DB::Open(options
, omap_dir
, &db
);
871 void RocksDBStore::compact_range(const string
& start
, const string
& end
)
873 rocksdb::CompactRangeOptions options
;
874 rocksdb::Slice
cstart(start
);
875 rocksdb::Slice
cend(end
);
876 db
->CompactRange(options
, &cstart
, &cend
);
878 RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl()
882 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first()
884 dbiter
->SeekToFirst();
885 return dbiter
->status().ok() ? 0 : -1;
887 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string
&prefix
)
889 rocksdb::Slice
slice_prefix(prefix
);
890 dbiter
->Seek(slice_prefix
);
891 return dbiter
->status().ok() ? 0 : -1;
893 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last()
895 dbiter
->SeekToLast();
896 return dbiter
->status().ok() ? 0 : -1;
898 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string
&prefix
)
900 string limit
= past_prefix(prefix
);
901 rocksdb::Slice
slice_limit(limit
);
902 dbiter
->Seek(slice_limit
);
904 if (!dbiter
->Valid()) {
905 dbiter
->SeekToLast();
909 return dbiter
->status().ok() ? 0 : -1;
911 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string
&prefix
, const string
&after
)
913 lower_bound(prefix
, after
);
915 pair
<string
,string
> key
= raw_key();
916 if (key
.first
== prefix
&& key
.second
== after
)
919 return dbiter
->status().ok() ? 0 : -1;
921 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string
&prefix
, const string
&to
)
923 string bound
= combine_strings(prefix
, to
);
924 rocksdb::Slice
slice_bound(bound
);
925 dbiter
->Seek(slice_bound
);
926 return dbiter
->status().ok() ? 0 : -1;
928 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid()
930 return dbiter
->Valid();
932 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next()
937 return dbiter
->status().ok() ? 0 : -1;
939 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev()
944 return dbiter
->status().ok() ? 0 : -1;
946 string
RocksDBStore::RocksDBWholeSpaceIteratorImpl::key()
949 split_key(dbiter
->key(), 0, &out_key
);
952 pair
<string
,string
> RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key()
955 split_key(dbiter
->key(), &prefix
, &key
);
956 return make_pair(prefix
, key
);
959 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key_is_prefixed(const string
&prefix
) {
960 // Look for "prefix\0" right in rocksb::Slice
961 rocksdb::Slice key
= dbiter
->key();
962 if ((key
.size() > prefix
.length()) && (key
[prefix
.length()] == '\0')) {
963 return memcmp(key
.data(), prefix
.c_str(), prefix
.length()) == 0;
969 bufferlist
RocksDBStore::RocksDBWholeSpaceIteratorImpl::value()
971 return to_bufferlist(dbiter
->value());
974 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::key_size()
976 return dbiter
->key().size();
979 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_size()
981 return dbiter
->value().size();
984 bufferptr
RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_as_ptr()
986 rocksdb::Slice val
= dbiter
->value();
987 return bufferptr(val
.data(), val
.size());
990 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status()
992 return dbiter
->status().ok() ? 0 : -1;
995 string
RocksDBStore::past_prefix(const string
&prefix
)
997 string limit
= prefix
;
1002 RocksDBStore::WholeSpaceIterator
RocksDBStore::_get_iterator()
1004 return std::make_shared
<RocksDBWholeSpaceIteratorImpl
>(
1005 db
->NewIterator(rocksdb::ReadOptions()));