1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
10 #include <sys/types.h>
13 #include "rocksdb/db.h"
14 #include "rocksdb/table.h"
15 #include "rocksdb/env.h"
16 #include "rocksdb/slice.h"
17 #include "rocksdb/cache.h"
18 #include "rocksdb/filter_policy.h"
19 #include "rocksdb/utilities/convenience.h"
20 #include "rocksdb/merge_operator.h"
23 #include "common/perf_counters.h"
24 #include "common/PriorityCache.h"
25 #include "include/str_list.h"
26 #include "include/stringify.h"
27 #include "include/str_map.h"
28 #include "KeyValueDB.h"
29 #include "RocksDBStore.h"
31 #include "common/debug.h"
33 #define dout_context cct
34 #define dout_subsys ceph_subsys_rocksdb
36 #define dout_prefix *_dout << "rocksdb: "
38 static bufferlist
to_bufferlist(rocksdb::Slice in
) {
40 bl
.append(bufferptr(in
.data(), in
.size()));
44 static rocksdb::SliceParts
prepare_sliceparts(const bufferlist
&bl
,
45 vector
<rocksdb::Slice
> *slices
)
48 for (auto& buf
: bl
.buffers()) {
49 (*slices
)[n
].data_
= buf
.c_str();
50 (*slices
)[n
].size_
= buf
.length();
53 return rocksdb::SliceParts(slices
->data(), slices
->size());
58 // One of these for the default rocksdb column family, routing each prefix
59 // to the appropriate MergeOperator.
61 class RocksDBStore::MergeOperatorRouter
62 : public rocksdb::AssociativeMergeOperator
66 const char *Name() const override
{
67 // Construct a name that rocksDB will validate against. We want to
68 // do this in a way that doesn't constrain the ordering of calls
69 // to set_merge_operator, so sort the merge operators and then
70 // construct a name from all of those parts.
71 store
.assoc_name
.clear();
72 map
<std::string
,std::string
> names
;
74 for (auto& p
: store
.merge_ops
) {
75 names
[p
.first
] = p
.second
->name();
77 for (auto& p
: store
.cf_handles
) {
80 for (auto& p
: names
) {
81 store
.assoc_name
+= '.';
82 store
.assoc_name
+= p
.first
;
83 store
.assoc_name
+= ':';
84 store
.assoc_name
+= p
.second
;
86 return store
.assoc_name
.c_str();
89 explicit MergeOperatorRouter(RocksDBStore
&_store
) : store(_store
) {}
91 bool Merge(const rocksdb::Slice
& key
,
92 const rocksdb::Slice
* existing_value
,
93 const rocksdb::Slice
& value
,
94 std::string
* new_value
,
95 rocksdb::Logger
* logger
) const override
{
96 // for default column family
97 // extract prefix from key and compare against each registered merge op;
98 // even though merge operator for explicit CF is included in merge_ops,
99 // it won't be picked up, since it won't match.
100 for (auto& p
: store
.merge_ops
) {
101 if (p
.first
.compare(0, p
.first
.length(),
102 key
.data(), p
.first
.length()) == 0 &&
103 key
.data()[p
.first
.length()] == 0) {
104 if (existing_value
) {
105 p
.second
->merge(existing_value
->data(), existing_value
->size(),
106 value
.data(), value
.size(),
109 p
.second
->merge_nonexistent(value
.data(), value
.size(), new_value
);
114 return true; // OK :)
119 // One of these per non-default column family, linked directly to the
120 // merge operator for that CF/prefix (if any).
122 class RocksDBStore::MergeOperatorLinker
123 : public rocksdb::AssociativeMergeOperator
126 std::shared_ptr
<KeyValueDB::MergeOperator
> mop
;
128 explicit MergeOperatorLinker(const std::shared_ptr
<KeyValueDB::MergeOperator
> &o
) : mop(o
) {}
130 const char *Name() const override
{
134 bool Merge(const rocksdb::Slice
& key
,
135 const rocksdb::Slice
* existing_value
,
136 const rocksdb::Slice
& value
,
137 std::string
* new_value
,
138 rocksdb::Logger
* logger
) const override
{
139 if (existing_value
) {
140 mop
->merge(existing_value
->data(), existing_value
->size(),
141 value
.data(), value
.size(),
144 mop
->merge_nonexistent(value
.data(), value
.size(), new_value
);
150 int RocksDBStore::set_merge_operator(
151 const string
& prefix
,
152 std::shared_ptr
<KeyValueDB::MergeOperator
> mop
)
154 // If you fail here, it's because you can't do this on an open database
155 ceph_assert(db
== nullptr);
156 merge_ops
.push_back(std::make_pair(prefix
,mop
));
160 class CephRocksdbLogger
: public rocksdb::Logger
{
163 explicit CephRocksdbLogger(CephContext
*c
) : cct(c
) {
166 ~CephRocksdbLogger() override
{
170 // Write an entry to the log file with the specified format.
171 void Logv(const char* format
, va_list ap
) override
{
172 Logv(rocksdb::INFO_LEVEL
, format
, ap
);
175 // Write an entry to the log file with the specified log level
176 // and format. Any log with level under the internal log level
177 // of *this (see @SetInfoLogLevel and @GetInfoLogLevel) will not be
179 void Logv(const rocksdb::InfoLogLevel log_level
, const char* format
,
180 va_list ap
) override
{
181 int v
= rocksdb::NUM_INFO_LOG_LEVELS
- log_level
- 1;
182 dout(ceph::dout::need_dynamic(v
));
184 vsnprintf(buf
, sizeof(buf
), format
, ap
);
185 *_dout
<< buf
<< dendl
;
189 rocksdb::Logger
*create_rocksdb_ceph_logger()
191 return new CephRocksdbLogger(g_ceph_context
);
194 static int string2bool(const string
&val
, bool &b_val
)
196 if (strcasecmp(val
.c_str(), "false") == 0) {
199 } else if (strcasecmp(val
.c_str(), "true") == 0) {
204 int b
= strict_strtol(val
.c_str(), 10, &err
);
212 int RocksDBStore::tryInterpret(const string
&key
, const string
&val
, rocksdb::Options
&opt
)
214 if (key
== "compaction_threads") {
216 int f
= strict_iecstrtoll(val
.c_str(), &err
);
219 //Low priority threadpool is used for compaction
220 opt
.env
->SetBackgroundThreads(f
, rocksdb::Env::Priority::LOW
);
221 } else if (key
== "flusher_threads") {
223 int f
= strict_iecstrtoll(val
.c_str(), &err
);
226 //High priority threadpool is used for flusher
227 opt
.env
->SetBackgroundThreads(f
, rocksdb::Env::Priority::HIGH
);
228 } else if (key
== "compact_on_mount") {
229 int ret
= string2bool(val
, compact_on_mount
);
232 } else if (key
== "disableWAL") {
233 int ret
= string2bool(val
, disableWAL
);
237 //unrecognize config options.
243 int RocksDBStore::ParseOptionsFromString(const string
&opt_str
, rocksdb::Options
&opt
)
245 map
<string
, string
> str_map
;
246 int r
= get_str_map(opt_str
, &str_map
, ",\n;");
249 map
<string
, string
>::iterator it
;
250 for(it
= str_map
.begin(); it
!= str_map
.end(); ++it
) {
251 string this_opt
= it
->first
+ "=" + it
->second
;
252 rocksdb::Status status
= rocksdb::GetOptionsFromString(opt
, this_opt
, &opt
);
254 //unrecognized by rocksdb, try to interpret by ourselves.
255 r
= tryInterpret(it
->first
, it
->second
, opt
);
257 derr
<< status
.ToString() << dendl
;
261 lgeneric_dout(cct
, 0) << " set rocksdb option " << it
->first
262 << " = " << it
->second
<< dendl
;
267 int RocksDBStore::init(string _options_str
)
269 options_str
= _options_str
;
270 rocksdb::Options opt
;
272 if (options_str
.length()) {
273 int r
= ParseOptionsFromString(options_str
, opt
);
281 int RocksDBStore::create_db_dir()
284 unique_ptr
<rocksdb::Directory
> dir
;
285 env
->NewDirectory(path
, &dir
);
287 int r
= ::mkdir(path
.c_str(), 0755);
290 if (r
< 0 && r
!= -EEXIST
) {
291 derr
<< __func__
<< " failed to create " << path
<< ": " << cpp_strerror(r
)
299 int RocksDBStore::install_cf_mergeop(
300 const string
&cf_name
,
301 rocksdb::ColumnFamilyOptions
*cf_opt
)
303 ceph_assert(cf_opt
!= nullptr);
304 cf_opt
->merge_operator
.reset();
305 for (auto& i
: merge_ops
) {
306 if (i
.first
== cf_name
) {
307 cf_opt
->merge_operator
.reset(new MergeOperatorLinker(i
.second
));
313 int RocksDBStore::create_and_open(ostream
&out
,
314 const vector
<ColumnFamily
>& cfs
)
316 int r
= create_db_dir();
320 return do_open(out
, true, false, nullptr);
322 return do_open(out
, true, false, &cfs
);
326 int RocksDBStore::load_rocksdb_options(bool create_if_missing
, rocksdb::Options
& opt
)
328 rocksdb::Status status
;
330 if (options_str
.length()) {
331 int r
= ParseOptionsFromString(options_str
, opt
);
337 if (g_conf()->rocksdb_perf
) {
338 dbstats
= rocksdb::CreateDBStatistics();
339 opt
.statistics
= dbstats
;
342 opt
.create_if_missing
= create_if_missing
;
343 if (kv_options
.count("separate_wal_dir")) {
344 opt
.wal_dir
= path
+ ".wal";
347 // Since ceph::for_each_substr doesn't return a value and
348 // std::stoull does throw, we may as well just catch everything here.
350 if (kv_options
.count("db_paths")) {
352 get_str_list(kv_options
["db_paths"], "; \t", paths
);
353 for (auto& p
: paths
) {
354 size_t pos
= p
.find(',');
355 if (pos
== std::string::npos
) {
356 derr
<< __func__
<< " invalid db path item " << p
<< " in "
357 << kv_options
["db_paths"] << dendl
;
360 string path
= p
.substr(0, pos
);
361 string size_str
= p
.substr(pos
+ 1);
362 uint64_t size
= atoll(size_str
.c_str());
364 derr
<< __func__
<< " invalid db path item " << p
<< " in "
365 << kv_options
["db_paths"] << dendl
;
368 opt
.db_paths
.push_back(rocksdb::DbPath(path
, size
));
369 dout(10) << __func__
<< " db_path " << path
<< " size " << size
<< dendl
;
372 } catch (const std::system_error
& e
) {
373 return -e
.code().value();
376 if (g_conf()->rocksdb_log_to_ceph_log
) {
377 opt
.info_log
.reset(new CephRocksdbLogger(g_ceph_context
));
381 dout(10) << __func__
<< " using custom Env " << priv
<< dendl
;
382 opt
.env
= static_cast<rocksdb::Env
*>(priv
);
386 if (!set_cache_flag
) {
387 cache_size
= g_conf()->rocksdb_cache_size
;
389 uint64_t row_cache_size
= cache_size
* g_conf()->rocksdb_cache_row_ratio
;
390 uint64_t block_cache_size
= cache_size
- row_cache_size
;
392 if (g_conf()->rocksdb_cache_type
== "binned_lru") {
393 bbt_opts
.block_cache
= rocksdb_cache::NewBinnedLRUCache(
396 g_conf()->rocksdb_cache_shard_bits
);
397 } else if (g_conf()->rocksdb_cache_type
== "lru") {
398 bbt_opts
.block_cache
= rocksdb::NewLRUCache(
400 g_conf()->rocksdb_cache_shard_bits
);
401 } else if (g_conf()->rocksdb_cache_type
== "clock") {
402 bbt_opts
.block_cache
= rocksdb::NewClockCache(
404 g_conf()->rocksdb_cache_shard_bits
);
405 if (!bbt_opts
.block_cache
) {
406 derr
<< "rocksdb_cache_type '" << g_conf()->rocksdb_cache_type
407 << "' chosen, but RocksDB not compiled with LibTBB. "
412 derr
<< "unrecognized rocksdb_cache_type '" << g_conf()->rocksdb_cache_type
416 bbt_opts
.block_size
= g_conf()->rocksdb_block_size
;
418 if (row_cache_size
> 0)
419 opt
.row_cache
= rocksdb::NewLRUCache(row_cache_size
,
420 g_conf()->rocksdb_cache_shard_bits
);
421 uint64_t bloom_bits
= g_conf().get_val
<uint64_t>("rocksdb_bloom_bits_per_key");
422 if (bloom_bits
> 0) {
423 dout(10) << __func__
<< " set bloom filter bits per key to "
424 << bloom_bits
<< dendl
;
425 bbt_opts
.filter_policy
.reset(rocksdb::NewBloomFilterPolicy(bloom_bits
));
427 using std::placeholders::_1
;
428 if (g_conf().with_val
<std::string
>("rocksdb_index_type",
429 std::bind(std::equal_to
<std::string
>(), _1
,
431 bbt_opts
.index_type
= rocksdb::BlockBasedTableOptions::IndexType::kBinarySearch
;
432 if (g_conf().with_val
<std::string
>("rocksdb_index_type",
433 std::bind(std::equal_to
<std::string
>(), _1
,
435 bbt_opts
.index_type
= rocksdb::BlockBasedTableOptions::IndexType::kHashSearch
;
436 if (g_conf().with_val
<std::string
>("rocksdb_index_type",
437 std::bind(std::equal_to
<std::string
>(), _1
,
439 bbt_opts
.index_type
= rocksdb::BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch
;
440 if (!bbt_opts
.no_block_cache
) {
441 bbt_opts
.cache_index_and_filter_blocks
=
442 g_conf().get_val
<bool>("rocksdb_cache_index_and_filter_blocks");
443 bbt_opts
.cache_index_and_filter_blocks_with_high_priority
=
444 g_conf().get_val
<bool>("rocksdb_cache_index_and_filter_blocks_with_high_priority");
445 bbt_opts
.pin_l0_filter_and_index_blocks_in_cache
=
446 g_conf().get_val
<bool>("rocksdb_pin_l0_filter_and_index_blocks_in_cache");
448 bbt_opts
.partition_filters
= g_conf().get_val
<bool>("rocksdb_partition_filters");
449 if (g_conf().get_val
<Option::size_t>("rocksdb_metadata_block_size") > 0)
450 bbt_opts
.metadata_block_size
= g_conf().get_val
<Option::size_t>("rocksdb_metadata_block_size");
452 opt
.table_factory
.reset(rocksdb::NewBlockBasedTableFactory(bbt_opts
));
453 dout(10) << __func__
<< " block size " << g_conf()->rocksdb_block_size
454 << ", block_cache size " << byte_u_t(block_cache_size
)
455 << ", row_cache size " << byte_u_t(row_cache_size
)
457 << (1 << g_conf()->rocksdb_cache_shard_bits
)
458 << ", type " << g_conf()->rocksdb_cache_type
461 opt
.merge_operator
.reset(new MergeOperatorRouter(*this));
466 int RocksDBStore::do_open(ostream
&out
,
467 bool create_if_missing
,
469 const vector
<ColumnFamily
>* cfs
)
471 ceph_assert(!(create_if_missing
&& open_readonly
));
472 rocksdb::Options opt
;
473 int r
= load_rocksdb_options(create_if_missing
, opt
);
475 dout(1) << __func__
<< " load rocksdb options failed" << dendl
;
478 rocksdb::Status status
;
479 if (create_if_missing
) {
480 status
= rocksdb::DB::Open(opt
, path
, &db
);
482 derr
<< status
.ToString() << dendl
;
485 // create and open column families
487 for (auto& p
: *cfs
) {
488 // copy default CF settings, block cache, merge operators as
489 // the base for new CF
490 rocksdb::ColumnFamilyOptions
cf_opt(opt
);
491 // user input options will override the base options
492 status
= rocksdb::GetColumnFamilyOptionsFromString(
493 cf_opt
, p
.option
, &cf_opt
);
495 derr
<< __func__
<< " invalid db column family option string for CF: "
499 install_cf_mergeop(p
.name
, &cf_opt
);
500 rocksdb::ColumnFamilyHandle
*cf
;
501 status
= db
->CreateColumnFamily(cf_opt
, p
.name
, &cf
);
503 derr
<< __func__
<< " Failed to create rocksdb column family: "
507 // store the new CF handle
508 add_column_family(p
.name
, static_cast<void*>(cf
));
511 default_cf
= db
->DefaultColumnFamily();
513 std::vector
<string
> existing_cfs
;
514 status
= rocksdb::DB::ListColumnFamilies(
515 rocksdb::DBOptions(opt
),
518 dout(1) << __func__
<< " column families: " << existing_cfs
<< dendl
;
519 if (existing_cfs
.empty()) {
520 // no column families
522 status
= rocksdb::DB::Open(opt
, path
, &db
);
524 status
= rocksdb::DB::OpenForReadOnly(opt
, path
, &db
);
527 derr
<< status
.ToString() << dendl
;
530 default_cf
= db
->DefaultColumnFamily();
532 // we cannot change column families for a created database. so, map
533 // what options we are given to whatever cf's already exist.
534 std::vector
<rocksdb::ColumnFamilyDescriptor
> column_families
;
535 for (auto& n
: existing_cfs
) {
536 // copy default CF settings, block cache, merge operators as
537 // the base for new CF
538 rocksdb::ColumnFamilyOptions
cf_opt(opt
);
541 for (auto& i
: *cfs
) {
544 status
= rocksdb::GetColumnFamilyOptionsFromString(
545 cf_opt
, i
.option
, &cf_opt
);
547 derr
<< __func__
<< " invalid db column family options for CF '"
548 << i
.name
<< "': " << i
.option
<< dendl
;
554 if (n
!= rocksdb::kDefaultColumnFamilyName
) {
555 install_cf_mergeop(n
, &cf_opt
);
557 column_families
.push_back(rocksdb::ColumnFamilyDescriptor(n
, cf_opt
));
558 if (!found
&& n
!= rocksdb::kDefaultColumnFamilyName
) {
559 dout(1) << __func__
<< " column family '" << n
560 << "' exists but not expected" << dendl
;
563 std::vector
<rocksdb::ColumnFamilyHandle
*> handles
;
565 status
= rocksdb::DB::OpenForReadOnly(rocksdb::DBOptions(opt
),
566 path
, column_families
,
569 status
= rocksdb::DB::Open(rocksdb::DBOptions(opt
),
570 path
, column_families
, &handles
, &db
);
573 derr
<< status
.ToString() << dendl
;
576 for (unsigned i
= 0; i
< existing_cfs
.size(); ++i
) {
577 if (existing_cfs
[i
] == rocksdb::kDefaultColumnFamilyName
) {
578 default_cf
= handles
[i
];
579 must_close_default_cf
= true;
581 add_column_family(existing_cfs
[i
], static_cast<void*>(handles
[i
]));
586 ceph_assert(default_cf
!= nullptr);
588 PerfCountersBuilder
plb(g_ceph_context
, "rocksdb", l_rocksdb_first
, l_rocksdb_last
);
589 plb
.add_u64_counter(l_rocksdb_gets
, "get", "Gets");
590 plb
.add_u64_counter(l_rocksdb_txns
, "submit_transaction", "Submit transactions");
591 plb
.add_u64_counter(l_rocksdb_txns_sync
, "submit_transaction_sync", "Submit transactions sync");
592 plb
.add_time_avg(l_rocksdb_get_latency
, "get_latency", "Get latency");
593 plb
.add_time_avg(l_rocksdb_submit_latency
, "submit_latency", "Submit Latency");
594 plb
.add_time_avg(l_rocksdb_submit_sync_latency
, "submit_sync_latency", "Submit Sync Latency");
595 plb
.add_u64_counter(l_rocksdb_compact
, "compact", "Compactions");
596 plb
.add_u64_counter(l_rocksdb_compact_range
, "compact_range", "Compactions by range");
597 plb
.add_u64_counter(l_rocksdb_compact_queue_merge
, "compact_queue_merge", "Mergings of ranges in compaction queue");
598 plb
.add_u64(l_rocksdb_compact_queue_len
, "compact_queue_len", "Length of compaction queue");
599 plb
.add_time_avg(l_rocksdb_write_wal_time
, "rocksdb_write_wal_time", "Rocksdb write wal time");
600 plb
.add_time_avg(l_rocksdb_write_memtable_time
, "rocksdb_write_memtable_time", "Rocksdb write memtable time");
601 plb
.add_time_avg(l_rocksdb_write_delay_time
, "rocksdb_write_delay_time", "Rocksdb write delay time");
602 plb
.add_time_avg(l_rocksdb_write_pre_and_post_process_time
,
603 "rocksdb_write_pre_and_post_time", "total time spent on writing a record, excluding write process");
604 logger
= plb
.create_perf_counters();
605 cct
->get_perfcounters_collection()->add(logger
);
607 if (compact_on_mount
) {
608 derr
<< "Compacting rocksdb store..." << dendl
;
610 derr
<< "Finished compacting rocksdb store" << dendl
;
615 int RocksDBStore::_test_init(const string
& dir
)
617 rocksdb::Options options
;
618 options
.create_if_missing
= true;
620 rocksdb::Status status
= rocksdb::DB::Open(options
, dir
, &db
);
623 return status
.ok() ? 0 : -EIO
;
626 RocksDBStore::~RocksDBStore()
631 // Ensure db is destroyed before dependent db_cache and filterpolicy
632 for (auto& p
: cf_handles
) {
633 db
->DestroyColumnFamilyHandle(
634 static_cast<rocksdb::ColumnFamilyHandle
*>(p
.second
));
637 if (must_close_default_cf
) {
638 db
->DestroyColumnFamilyHandle(default_cf
);
639 must_close_default_cf
= false;
641 default_cf
= nullptr;
646 delete static_cast<rocksdb::Env
*>(priv
);
650 void RocksDBStore::close()
652 // stop compaction thread
653 compact_queue_lock
.Lock();
654 if (compact_thread
.is_started()) {
655 compact_queue_stop
= true;
656 compact_queue_cond
.Signal();
657 compact_queue_lock
.Unlock();
658 compact_thread
.join();
660 compact_queue_lock
.Unlock();
664 cct
->get_perfcounters_collection()->remove(logger
);
667 int RocksDBStore::repair(std::ostream
&out
)
669 rocksdb::Options opt
;
670 int r
= load_rocksdb_options(false, opt
);
672 dout(1) << __func__
<< " load rocksdb options failed" << dendl
;
673 out
<< "load rocksdb options failed" << std::endl
;
676 rocksdb::Status status
= rocksdb::RepairDB(path
, opt
);
680 out
<< "repair rocksdb failed : " << status
.ToString() << std::endl
;
685 void RocksDBStore::split_stats(const std::string
&s
, char delim
, std::vector
<std::string
> &elems
) {
686 std::stringstream ss
;
689 while (std::getline(ss
, item
, delim
)) {
690 elems
.push_back(item
);
694 int64_t RocksDBStore::estimate_prefix_size(const string
& prefix
)
696 auto cf
= get_cf_handle(prefix
);
699 //rocksdb::DB::INCLUDE_MEMTABLES | // do not include memtables...
700 rocksdb::DB::INCLUDE_FILES
;
702 string
start(1, '\x00');
703 string
limit("\xff\xff\xff\xff");
704 rocksdb::Range
r(start
, limit
);
705 db
->GetApproximateSizes(cf
, &r
, 1, &size
, flags
);
707 string limit
= prefix
+ "\xff\xff\xff\xff";
708 rocksdb::Range
r(prefix
, limit
);
709 db
->GetApproximateSizes(default_cf
,
710 &r
, 1, &size
, flags
);
715 void RocksDBStore::get_statistics(Formatter
*f
)
717 if (!g_conf()->rocksdb_perf
) {
718 dout(20) << __func__
<< " RocksDB perf is disabled, can't probe for stats"
723 if (g_conf()->rocksdb_collect_compaction_stats
) {
724 std::string stat_str
;
725 bool status
= db
->GetProperty("rocksdb.stats", &stat_str
);
727 f
->open_object_section("rocksdb_statistics");
728 f
->dump_string("rocksdb_compaction_statistics", "");
729 vector
<string
> stats
;
730 split_stats(stat_str
, '\n', stats
);
731 for (auto st
:stats
) {
732 f
->dump_string("", st
);
737 if (g_conf()->rocksdb_collect_extended_stats
) {
739 f
->open_object_section("rocksdb_extended_statistics");
740 string stat_str
= dbstats
->ToString();
741 vector
<string
> stats
;
742 split_stats(stat_str
, '\n', stats
);
743 f
->dump_string("rocksdb_extended_statistics", "");
744 for (auto st
:stats
) {
745 f
->dump_string(".", st
);
749 f
->open_object_section("rocksdbstore_perf_counters");
750 logger
->dump_formatted(f
,0);
753 if (g_conf()->rocksdb_collect_memory_stats
) {
754 f
->open_object_section("rocksdb_memtable_statistics");
756 if (!bbt_opts
.no_block_cache
) {
757 str
.append(stringify(bbt_opts
.block_cache
->GetUsage()));
758 f
->dump_string("block_cache_usage", str
.data());
760 str
.append(stringify(bbt_opts
.block_cache
->GetPinnedUsage()));
761 f
->dump_string("block_cache_pinned_blocks_usage", str
);
764 db
->GetProperty("rocksdb.cur-size-all-mem-tables", &str
);
765 f
->dump_string("rocksdb_memtable_usage", str
);
767 db
->GetProperty("rocksdb.estimate-table-readers-mem", &str
);
768 f
->dump_string("rocksdb_index_filter_blocks_usage", str
);
773 int RocksDBStore::submit_common(rocksdb::WriteOptions
& woptions
, KeyValueDB::Transaction t
)
775 // enable rocksdb breakdown
776 // considering performance overhead, default is disabled
777 if (g_conf()->rocksdb_perf
) {
778 rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex
);
779 rocksdb::get_perf_context()->Reset();
782 RocksDBTransactionImpl
* _t
=
783 static_cast<RocksDBTransactionImpl
*>(t
.get());
784 woptions
.disableWAL
= disableWAL
;
785 lgeneric_subdout(cct
, rocksdb
, 30) << __func__
;
786 RocksWBHandler bat_txc
;
787 _t
->bat
.Iterate(&bat_txc
);
788 *_dout
<< " Rocksdb transaction: " << bat_txc
.seen
<< dendl
;
790 rocksdb::Status s
= db
->Write(woptions
, &_t
->bat
);
792 RocksWBHandler rocks_txc
;
793 _t
->bat
.Iterate(&rocks_txc
);
794 derr
<< __func__
<< " error: " << s
.ToString() << " code = " << s
.code()
795 << " Rocksdb transaction: " << rocks_txc
.seen
<< dendl
;
798 if (g_conf()->rocksdb_perf
) {
799 utime_t write_memtable_time
;
800 utime_t write_delay_time
;
801 utime_t write_wal_time
;
802 utime_t write_pre_and_post_process_time
;
803 write_wal_time
.set_from_double(
804 static_cast<double>(rocksdb::get_perf_context()->write_wal_time
)/1000000000);
805 write_memtable_time
.set_from_double(
806 static_cast<double>(rocksdb::get_perf_context()->write_memtable_time
)/1000000000);
807 write_delay_time
.set_from_double(
808 static_cast<double>(rocksdb::get_perf_context()->write_delay_time
)/1000000000);
809 write_pre_and_post_process_time
.set_from_double(
810 static_cast<double>(rocksdb::get_perf_context()->write_pre_and_post_process_time
)/1000000000);
811 logger
->tinc(l_rocksdb_write_memtable_time
, write_memtable_time
);
812 logger
->tinc(l_rocksdb_write_delay_time
, write_delay_time
);
813 logger
->tinc(l_rocksdb_write_wal_time
, write_wal_time
);
814 logger
->tinc(l_rocksdb_write_pre_and_post_process_time
, write_pre_and_post_process_time
);
817 return s
.ok() ? 0 : -1;
820 int RocksDBStore::submit_transaction(KeyValueDB::Transaction t
)
822 utime_t start
= ceph_clock_now();
823 rocksdb::WriteOptions woptions
;
824 woptions
.sync
= false;
826 int result
= submit_common(woptions
, t
);
828 utime_t lat
= ceph_clock_now() - start
;
829 logger
->inc(l_rocksdb_txns
);
830 logger
->tinc(l_rocksdb_submit_latency
, lat
);
835 int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t
)
837 utime_t start
= ceph_clock_now();
838 rocksdb::WriteOptions woptions
;
839 // if disableWAL, sync can't set
840 woptions
.sync
= !disableWAL
;
842 int result
= submit_common(woptions
, t
);
844 utime_t lat
= ceph_clock_now() - start
;
845 logger
->inc(l_rocksdb_txns_sync
);
846 logger
->tinc(l_rocksdb_submit_sync_latency
, lat
);
851 RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore
*_db
)
856 void RocksDBStore::RocksDBTransactionImpl::put_bat(
857 rocksdb::WriteBatch
& bat
,
858 rocksdb::ColumnFamilyHandle
*cf
,
860 const bufferlist
&to_set_bl
)
862 // bufferlist::c_str() is non-constant, so we can't call c_str()
863 if (to_set_bl
.is_contiguous() && to_set_bl
.length() > 0) {
866 rocksdb::Slice(to_set_bl
.buffers().front().c_str(),
867 to_set_bl
.length()));
869 rocksdb::Slice
key_slice(key
);
870 vector
<rocksdb::Slice
> value_slices(to_set_bl
.buffers().size());
872 rocksdb::SliceParts(&key_slice
, 1),
873 prepare_sliceparts(to_set_bl
, &value_slices
));
877 void RocksDBStore::RocksDBTransactionImpl::set(
878 const string
&prefix
,
880 const bufferlist
&to_set_bl
)
882 auto cf
= db
->get_cf_handle(prefix
);
884 put_bat(bat
, cf
, k
, to_set_bl
);
886 string key
= combine_strings(prefix
, k
);
887 put_bat(bat
, db
->default_cf
, key
, to_set_bl
);
891 void RocksDBStore::RocksDBTransactionImpl::set(
892 const string
&prefix
,
893 const char *k
, size_t keylen
,
894 const bufferlist
&to_set_bl
)
896 auto cf
= db
->get_cf_handle(prefix
);
898 string
key(k
, keylen
); // fixme?
899 put_bat(bat
, cf
, key
, to_set_bl
);
902 combine_strings(prefix
, k
, keylen
, &key
);
903 put_bat(bat
, cf
, key
, to_set_bl
);
907 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string
&prefix
,
910 auto cf
= db
->get_cf_handle(prefix
);
912 bat
.Delete(cf
, rocksdb::Slice(k
));
914 bat
.Delete(db
->default_cf
, combine_strings(prefix
, k
));
918 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string
&prefix
,
922 auto cf
= db
->get_cf_handle(prefix
);
924 bat
.Delete(cf
, rocksdb::Slice(k
, keylen
));
927 combine_strings(prefix
, k
, keylen
, &key
);
928 bat
.Delete(db
->default_cf
, rocksdb::Slice(key
));
932 void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string
&prefix
,
935 auto cf
= db
->get_cf_handle(prefix
);
937 bat
.SingleDelete(cf
, k
);
939 bat
.SingleDelete(db
->default_cf
, combine_strings(prefix
, k
));
943 void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string
&prefix
)
945 auto cf
= db
->get_cf_handle(prefix
);
947 if (db
->enable_rmrange
) {
948 string
endprefix("\xff\xff\xff\xff"); // FIXME: this is cheating...
949 if (db
->max_items_rmrange
) {
950 uint64_t cnt
= db
->max_items_rmrange
;
952 auto it
= db
->get_iterator(prefix
);
953 for (it
->seek_to_first();
957 bat
.RollbackToSavePoint();
958 bat
.DeleteRange(cf
, string(), endprefix
);
961 bat
.Delete(cf
, rocksdb::Slice(it
->key()));
966 bat
.DeleteRange(cf
, string(), endprefix
);
969 auto it
= db
->get_iterator(prefix
);
970 for (it
->seek_to_first();
973 bat
.Delete(cf
, rocksdb::Slice(it
->key()));
977 if (db
->enable_rmrange
) {
978 string endprefix
= prefix
;
979 endprefix
.push_back('\x01');
980 if (db
->max_items_rmrange
) {
981 uint64_t cnt
= db
->max_items_rmrange
;
983 auto it
= db
->get_iterator(prefix
);
984 for (it
->seek_to_first();
988 bat
.RollbackToSavePoint();
989 bat
.DeleteRange(db
->default_cf
,
990 combine_strings(prefix
, string()),
991 combine_strings(endprefix
, string()));
994 bat
.Delete(db
->default_cf
, combine_strings(prefix
, it
->key()));
999 bat
.DeleteRange(db
->default_cf
,
1000 combine_strings(prefix
, string()),
1001 combine_strings(endprefix
, string()));
1004 auto it
= db
->get_iterator(prefix
);
1005 for (it
->seek_to_first();
1008 bat
.Delete(db
->default_cf
, combine_strings(prefix
, it
->key()));
1014 void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string
&prefix
,
1015 const string
&start
,
1018 auto cf
= db
->get_cf_handle(prefix
);
1020 if (db
->enable_rmrange
) {
1021 if (db
->max_items_rmrange
) {
1022 uint64_t cnt
= db
->max_items_rmrange
;
1023 auto it
= db
->get_iterator(prefix
);
1025 it
->lower_bound(start
);
1026 while (it
->valid()) {
1027 if (it
->key() >= end
) {
1031 bat
.RollbackToSavePoint();
1032 bat
.DeleteRange(cf
, rocksdb::Slice(start
), rocksdb::Slice(end
));
1035 bat
.Delete(cf
, rocksdb::Slice(it
->key()));
1041 bat
.DeleteRange(cf
, rocksdb::Slice(start
), rocksdb::Slice(end
));
1044 auto it
= db
->get_iterator(prefix
);
1045 it
->lower_bound(start
);
1046 while (it
->valid()) {
1047 if (it
->key() >= end
) {
1050 bat
.Delete(cf
, rocksdb::Slice(it
->key()));
1055 if (db
->enable_rmrange
) {
1056 if (db
->max_items_rmrange
) {
1057 uint64_t cnt
= db
->max_items_rmrange
;
1058 auto it
= db
->get_iterator(prefix
);
1060 it
->lower_bound(start
);
1061 while (it
->valid()) {
1062 if (it
->key() >= end
) {
1066 bat
.RollbackToSavePoint();
1069 rocksdb::Slice(combine_strings(prefix
, start
)),
1070 rocksdb::Slice(combine_strings(prefix
, end
)));
1073 bat
.Delete(db
->default_cf
,
1074 combine_strings(prefix
, it
->key()));
1082 rocksdb::Slice(combine_strings(prefix
, start
)),
1083 rocksdb::Slice(combine_strings(prefix
, end
)));
1086 auto it
= db
->get_iterator(prefix
);
1087 it
->lower_bound(start
);
1088 while (it
->valid()) {
1089 if (it
->key() >= end
) {
1092 bat
.Delete(db
->default_cf
,
1093 combine_strings(prefix
, it
->key()));
1100 void RocksDBStore::RocksDBTransactionImpl::merge(
1101 const string
&prefix
,
1103 const bufferlist
&to_set_bl
)
1105 auto cf
= db
->get_cf_handle(prefix
);
1107 // bufferlist::c_str() is non-constant, so we can't call c_str()
1108 if (to_set_bl
.is_contiguous() && to_set_bl
.length() > 0) {
1112 rocksdb::Slice(to_set_bl
.buffers().front().c_str(), to_set_bl
.length()));
1115 rocksdb::Slice
key_slice(k
);
1116 vector
<rocksdb::Slice
> value_slices(to_set_bl
.buffers().size());
1117 bat
.Merge(cf
, rocksdb::SliceParts(&key_slice
, 1),
1118 prepare_sliceparts(to_set_bl
, &value_slices
));
1121 string key
= combine_strings(prefix
, k
);
1122 // bufferlist::c_str() is non-constant, so we can't call c_str()
1123 if (to_set_bl
.is_contiguous() && to_set_bl
.length() > 0) {
1126 rocksdb::Slice(key
),
1127 rocksdb::Slice(to_set_bl
.buffers().front().c_str(), to_set_bl
.length()));
1130 rocksdb::Slice
key_slice(key
);
1131 vector
<rocksdb::Slice
> value_slices(to_set_bl
.buffers().size());
1134 rocksdb::SliceParts(&key_slice
, 1),
1135 prepare_sliceparts(to_set_bl
, &value_slices
));
1140 int RocksDBStore::get(
1141 const string
&prefix
,
1142 const std::set
<string
> &keys
,
1143 std::map
<string
, bufferlist
> *out
)
1145 utime_t start
= ceph_clock_now();
1146 auto cf
= get_cf_handle(prefix
);
1148 for (auto& key
: keys
) {
1150 auto status
= db
->Get(rocksdb::ReadOptions(),
1152 rocksdb::Slice(key
),
1155 (*out
)[key
].append(value
);
1156 } else if (status
.IsIOError()) {
1157 ceph_abort_msg(status
.getState());
1161 for (auto& key
: keys
) {
1163 string k
= combine_strings(prefix
, key
);
1164 auto status
= db
->Get(rocksdb::ReadOptions(),
1169 (*out
)[key
].append(value
);
1170 } else if (status
.IsIOError()) {
1171 ceph_abort_msg(status
.getState());
1175 utime_t lat
= ceph_clock_now() - start
;
1176 logger
->inc(l_rocksdb_gets
);
1177 logger
->tinc(l_rocksdb_get_latency
, lat
);
1181 int RocksDBStore::get(
1182 const string
&prefix
,
1186 ceph_assert(out
&& (out
->length() == 0));
1187 utime_t start
= ceph_clock_now();
1191 auto cf
= get_cf_handle(prefix
);
1193 s
= db
->Get(rocksdb::ReadOptions(),
1195 rocksdb::Slice(key
),
1198 string k
= combine_strings(prefix
, key
);
1199 s
= db
->Get(rocksdb::ReadOptions(),
1206 } else if (s
.IsNotFound()) {
1209 ceph_abort_msg(s
.getState());
1211 utime_t lat
= ceph_clock_now() - start
;
1212 logger
->inc(l_rocksdb_gets
);
1213 logger
->tinc(l_rocksdb_get_latency
, lat
);
1217 int RocksDBStore::get(
1218 const string
& prefix
,
1223 ceph_assert(out
&& (out
->length() == 0));
1224 utime_t start
= ceph_clock_now();
1228 auto cf
= get_cf_handle(prefix
);
1230 s
= db
->Get(rocksdb::ReadOptions(),
1232 rocksdb::Slice(key
, keylen
),
1236 combine_strings(prefix
, key
, keylen
, &k
);
1237 s
= db
->Get(rocksdb::ReadOptions(),
1244 } else if (s
.IsNotFound()) {
1247 ceph_abort_msg(s
.getState());
1249 utime_t lat
= ceph_clock_now() - start
;
1250 logger
->inc(l_rocksdb_gets
);
1251 logger
->tinc(l_rocksdb_get_latency
, lat
);
1255 int RocksDBStore::split_key(rocksdb::Slice in
, string
*prefix
, string
*key
)
1257 size_t prefix_len
= 0;
1259 // Find separator inside Slice
1260 char* separator
= (char*) memchr(in
.data(), 0, in
.size());
1261 if (separator
== NULL
)
1263 prefix_len
= size_t(separator
- in
.data());
1264 if (prefix_len
>= in
.size())
1267 // Fetch prefix and/or key directly from Slice
1269 *prefix
= string(in
.data(), prefix_len
);
1271 *key
= string(separator
+1, in
.size()-prefix_len
-1);
1275 void RocksDBStore::compact()
1277 logger
->inc(l_rocksdb_compact
);
1278 rocksdb::CompactRangeOptions options
;
1279 db
->CompactRange(options
, default_cf
, nullptr, nullptr);
1280 for (auto cf
: cf_handles
) {
1283 static_cast<rocksdb::ColumnFamilyHandle
*>(cf
.second
),
1289 void RocksDBStore::compact_thread_entry()
1291 compact_queue_lock
.Lock();
1292 while (!compact_queue_stop
) {
1293 while (!compact_queue
.empty()) {
1294 pair
<string
,string
> range
= compact_queue
.front();
1295 compact_queue
.pop_front();
1296 logger
->set(l_rocksdb_compact_queue_len
, compact_queue
.size());
1297 compact_queue_lock
.Unlock();
1298 logger
->inc(l_rocksdb_compact_range
);
1299 if (range
.first
.empty() && range
.second
.empty()) {
1302 compact_range(range
.first
, range
.second
);
1304 compact_queue_lock
.Lock();
1307 compact_queue_cond
.Wait(compact_queue_lock
);
1309 compact_queue_lock
.Unlock();
1312 void RocksDBStore::compact_range_async(const string
& start
, const string
& end
)
1314 std::lock_guard
l(compact_queue_lock
);
1316 // try to merge adjacent ranges. this is O(n), but the queue should
1317 // be short. note that we do not cover all overlap cases and merge
1318 // opportunities here, but we capture the ones we currently need.
1319 list
< pair
<string
,string
> >::iterator p
= compact_queue
.begin();
1320 while (p
!= compact_queue
.end()) {
1321 if (p
->first
== start
&& p
->second
== end
) {
1325 if (p
->first
<= end
&& p
->first
> start
) {
1326 // merge with existing range to the right
1327 compact_queue
.push_back(make_pair(start
, p
->second
));
1328 compact_queue
.erase(p
);
1329 logger
->inc(l_rocksdb_compact_queue_merge
);
1332 if (p
->second
>= start
&& p
->second
< end
) {
1333 // merge with existing range to the left
1334 compact_queue
.push_back(make_pair(p
->first
, end
));
1335 compact_queue
.erase(p
);
1336 logger
->inc(l_rocksdb_compact_queue_merge
);
1341 if (p
== compact_queue
.end()) {
1342 // no merge, new entry.
1343 compact_queue
.push_back(make_pair(start
, end
));
1344 logger
->set(l_rocksdb_compact_queue_len
, compact_queue
.size());
1346 compact_queue_cond
.Signal();
1347 if (!compact_thread
.is_started()) {
1348 compact_thread
.create("rstore_compact");
1351 bool RocksDBStore::check_omap_dir(string
&omap_dir
)
1353 rocksdb::Options options
;
1354 options
.create_if_missing
= true;
1356 rocksdb::Status status
= rocksdb::DB::Open(options
, omap_dir
, &db
);
1361 void RocksDBStore::compact_range(const string
& start
, const string
& end
)
1363 rocksdb::CompactRangeOptions options
;
1364 rocksdb::Slice
cstart(start
);
1365 rocksdb::Slice
cend(end
);
1366 db
->CompactRange(options
, &cstart
, &cend
);
1369 RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl()
1373 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first()
1375 dbiter
->SeekToFirst();
1376 ceph_assert(!dbiter
->status().IsIOError());
1377 return dbiter
->status().ok() ? 0 : -1;
1379 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string
&prefix
)
1381 rocksdb::Slice
slice_prefix(prefix
);
1382 dbiter
->Seek(slice_prefix
);
1383 ceph_assert(!dbiter
->status().IsIOError());
1384 return dbiter
->status().ok() ? 0 : -1;
1386 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last()
1388 dbiter
->SeekToLast();
1389 ceph_assert(!dbiter
->status().IsIOError());
1390 return dbiter
->status().ok() ? 0 : -1;
1392 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string
&prefix
)
1394 string limit
= past_prefix(prefix
);
1395 rocksdb::Slice
slice_limit(limit
);
1396 dbiter
->Seek(slice_limit
);
1398 if (!dbiter
->Valid()) {
1399 dbiter
->SeekToLast();
1403 return dbiter
->status().ok() ? 0 : -1;
1405 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string
&prefix
, const string
&after
)
1407 lower_bound(prefix
, after
);
1409 pair
<string
,string
> key
= raw_key();
1410 if (key
.first
== prefix
&& key
.second
== after
)
1413 return dbiter
->status().ok() ? 0 : -1;
1415 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string
&prefix
, const string
&to
)
1417 string bound
= combine_strings(prefix
, to
);
1418 rocksdb::Slice
slice_bound(bound
);
1419 dbiter
->Seek(slice_bound
);
1420 return dbiter
->status().ok() ? 0 : -1;
1422 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid()
1424 return dbiter
->Valid();
1426 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next()
1431 ceph_assert(!dbiter
->status().IsIOError());
1432 return dbiter
->status().ok() ? 0 : -1;
1434 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev()
1439 ceph_assert(!dbiter
->status().IsIOError());
1440 return dbiter
->status().ok() ? 0 : -1;
1442 string
RocksDBStore::RocksDBWholeSpaceIteratorImpl::key()
1445 split_key(dbiter
->key(), 0, &out_key
);
1448 pair
<string
,string
> RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key()
1451 split_key(dbiter
->key(), &prefix
, &key
);
1452 return make_pair(prefix
, key
);
1455 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key_is_prefixed(const string
&prefix
) {
1456 // Look for "prefix\0" right in rocksb::Slice
1457 rocksdb::Slice key
= dbiter
->key();
1458 if ((key
.size() > prefix
.length()) && (key
[prefix
.length()] == '\0')) {
1459 return memcmp(key
.data(), prefix
.c_str(), prefix
.length()) == 0;
1465 bufferlist
RocksDBStore::RocksDBWholeSpaceIteratorImpl::value()
1467 return to_bufferlist(dbiter
->value());
1470 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::key_size()
1472 return dbiter
->key().size();
1475 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_size()
1477 return dbiter
->value().size();
1480 bufferptr
RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_as_ptr()
1482 rocksdb::Slice val
= dbiter
->value();
1483 return bufferptr(val
.data(), val
.size());
1486 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status()
1488 return dbiter
->status().ok() ? 0 : -1;
1491 string
RocksDBStore::past_prefix(const string
&prefix
)
1493 string limit
= prefix
;
1498 RocksDBStore::WholeSpaceIterator
RocksDBStore::get_wholespace_iterator()
1500 return std::make_shared
<RocksDBWholeSpaceIteratorImpl
>(
1501 db
->NewIterator(rocksdb::ReadOptions(), default_cf
));
1504 class CFIteratorImpl
: public KeyValueDB::IteratorImpl
{
1507 rocksdb::Iterator
*dbiter
;
1509 explicit CFIteratorImpl(const std::string
& p
,
1510 rocksdb::Iterator
*iter
)
1511 : prefix(p
), dbiter(iter
) { }
1516 int seek_to_first() override
{
1517 dbiter
->SeekToFirst();
1518 return dbiter
->status().ok() ? 0 : -1;
1520 int seek_to_last() override
{
1521 dbiter
->SeekToLast();
1522 return dbiter
->status().ok() ? 0 : -1;
1524 int upper_bound(const string
&after
) override
{
1526 if (valid() && (key() == after
)) {
1529 return dbiter
->status().ok() ? 0 : -1;
1531 int lower_bound(const string
&to
) override
{
1532 rocksdb::Slice
slice_bound(to
);
1533 dbiter
->Seek(slice_bound
);
1534 return dbiter
->status().ok() ? 0 : -1;
1536 int next() override
{
1540 return dbiter
->status().ok() ? 0 : -1;
1542 int prev() override
{
1546 return dbiter
->status().ok() ? 0 : -1;
1548 bool valid() override
{
1549 return dbiter
->Valid();
1551 string
key() override
{
1552 return dbiter
->key().ToString();
1554 std::pair
<std::string
, std::string
> raw_key() override
{
1555 return make_pair(prefix
, key());
1557 bufferlist
value() override
{
1558 return to_bufferlist(dbiter
->value());
1560 bufferptr
value_as_ptr() override
{
1561 rocksdb::Slice val
= dbiter
->value();
1562 return bufferptr(val
.data(), val
.size());
1564 int status() override
{
1565 return dbiter
->status().ok() ? 0 : -1;
1569 KeyValueDB::Iterator
RocksDBStore::get_iterator(const std::string
& prefix
)
1571 rocksdb::ColumnFamilyHandle
*cf_handle
=
1572 static_cast<rocksdb::ColumnFamilyHandle
*>(get_cf_handle(prefix
));
1574 return std::make_shared
<CFIteratorImpl
>(
1576 db
->NewIterator(rocksdb::ReadOptions(), cf_handle
));
1578 return KeyValueDB::get_iterator(prefix
);