]> git.proxmox.com Git - ceph.git/blame - ceph/src/kv/RocksDBStore.cc
update sources to v12.1.1
[ceph.git] / ceph / src / kv / RocksDBStore.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include <set>
5#include <map>
6#include <string>
7#include <memory>
8#include <errno.h>
9#include <unistd.h>
10#include <sys/types.h>
11#include <sys/stat.h>
12
13#include "rocksdb/db.h"
14#include "rocksdb/table.h"
15#include "rocksdb/env.h"
16#include "rocksdb/slice.h"
17#include "rocksdb/cache.h"
18#include "rocksdb/filter_policy.h"
19#include "rocksdb/utilities/convenience.h"
20#include "rocksdb/merge_operator.h"
21using std::string;
22#include "common/perf_counters.h"
23#include "common/debug.h"
24#include "include/str_list.h"
25#include "include/stringify.h"
26#include "include/str_map.h"
27#include "KeyValueDB.h"
28#include "RocksDBStore.h"
29
30#include "common/debug.h"
31
32#define dout_context cct
33#define dout_subsys ceph_subsys_rocksdb
34#undef dout_prefix
35#define dout_prefix *_dout << "rocksdb: "
36
31f18b77
FG
37static rocksdb::SliceParts prepare_sliceparts(const bufferlist &bl, rocksdb::Slice *slices)
38{
39 unsigned n = 0;
40 for (std::list<buffer::ptr>::const_iterator p = bl.buffers().begin();
41 p != bl.buffers().end(); ++p, ++n) {
42 slices[n].data_ = p->c_str();
43 slices[n].size_ = p->length();
44 }
45 return rocksdb::SliceParts(slices, n);
46}
47
7c673cae
FG
48//
49// One of these per rocksdb instance, implements the merge operator prefix stuff
50//
51class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperator {
52 RocksDBStore& store;
53 public:
54 const char *Name() const override {
55 // Construct a name that rocksDB will validate against. We want to
56 // do this in a way that doesn't constrain the ordering of calls
57 // to set_merge_operator, so sort the merge operators and then
58 // construct a name from all of those parts.
59 store.assoc_name.clear();
60 map<std::string,std::string> names;
61 for (auto& p : store.merge_ops) names[p.first] = p.second->name();
62 for (auto& p : names) {
63 store.assoc_name += '.';
64 store.assoc_name += p.first;
65 store.assoc_name += ':';
66 store.assoc_name += p.second;
67 }
68 return store.assoc_name.c_str();
69 }
70
71 MergeOperatorRouter(RocksDBStore &_store) : store(_store) {}
72
73 bool Merge(const rocksdb::Slice& key,
74 const rocksdb::Slice* existing_value,
75 const rocksdb::Slice& value,
76 std::string* new_value,
77 rocksdb::Logger* logger) const override {
78 // Check each prefix
79 for (auto& p : store.merge_ops) {
80 if (p.first.compare(0, p.first.length(),
81 key.data(), p.first.length()) == 0 &&
82 key.data()[p.first.length()] == 0) {
83 if (existing_value) {
84 p.second->merge(existing_value->data(), existing_value->size(),
85 value.data(), value.size(),
86 new_value);
87 } else {
88 p.second->merge_nonexistent(value.data(), value.size(), new_value);
89 }
90 break;
91 }
92 }
93 return true; // OK :)
94 }
95
96};
97
98int RocksDBStore::set_merge_operator(
99 const string& prefix,
100 std::shared_ptr<KeyValueDB::MergeOperator> mop)
101{
102 // If you fail here, it's because you can't do this on an open database
103 assert(db == nullptr);
104 merge_ops.push_back(std::make_pair(prefix,mop));
105 return 0;
106}
107
108class CephRocksdbLogger : public rocksdb::Logger {
109 CephContext *cct;
110public:
111 explicit CephRocksdbLogger(CephContext *c) : cct(c) {
112 cct->get();
113 }
114 ~CephRocksdbLogger() override {
115 cct->put();
116 }
117
118 // Write an entry to the log file with the specified format.
119 void Logv(const char* format, va_list ap) override {
120 Logv(rocksdb::INFO_LEVEL, format, ap);
121 }
122
123 // Write an entry to the log file with the specified log level
124 // and format. Any log with level under the internal log level
125 // of *this (see @SetInfoLogLevel and @GetInfoLogLevel) will not be
126 // printed.
127 void Logv(const rocksdb::InfoLogLevel log_level, const char* format,
128 va_list ap) override {
129 int v = rocksdb::NUM_INFO_LOG_LEVELS - log_level - 1;
130 dout(v);
131 char buf[65536];
132 vsnprintf(buf, sizeof(buf), format, ap);
133 *_dout << buf << dendl;
134 }
135};
136
137rocksdb::Logger *create_rocksdb_ceph_logger()
138{
139 return new CephRocksdbLogger(g_ceph_context);
140}
141
142int string2bool(string val, bool &b_val)
143{
144 if (strcasecmp(val.c_str(), "false") == 0) {
145 b_val = false;
146 return 0;
147 } else if (strcasecmp(val.c_str(), "true") == 0) {
148 b_val = true;
149 return 0;
150 } else {
151 std::string err;
152 int b = strict_strtol(val.c_str(), 10, &err);
153 if (!err.empty())
154 return -EINVAL;
155 b_val = !!b;
156 return 0;
157 }
158}
159
160int RocksDBStore::tryInterpret(const string key, const string val, rocksdb::Options &opt)
161{
162 if (key == "compaction_threads") {
163 std::string err;
164 int f = strict_sistrtoll(val.c_str(), &err);
165 if (!err.empty())
166 return -EINVAL;
167 //Low priority threadpool is used for compaction
168 opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::LOW);
169 } else if (key == "flusher_threads") {
170 std::string err;
171 int f = strict_sistrtoll(val.c_str(), &err);
172 if (!err.empty())
173 return -EINVAL;
174 //High priority threadpool is used for flusher
175 opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::HIGH);
176 } else if (key == "compact_on_mount") {
177 int ret = string2bool(val, compact_on_mount);
178 if (ret != 0)
179 return ret;
180 } else if (key == "disableWAL") {
181 int ret = string2bool(val, disableWAL);
182 if (ret != 0)
183 return ret;
184 } else {
185 //unrecognize config options.
186 return -EINVAL;
187 }
188 return 0;
189}
190
191int RocksDBStore::ParseOptionsFromString(const string opt_str, rocksdb::Options &opt)
192{
193 map<string, string> str_map;
194 int r = get_str_map(opt_str, &str_map, ",\n;");
195 if (r < 0)
196 return r;
197 map<string, string>::iterator it;
198 for(it = str_map.begin(); it != str_map.end(); ++it) {
199 string this_opt = it->first + "=" + it->second;
200 rocksdb::Status status = rocksdb::GetOptionsFromString(opt, this_opt , &opt);
201 if (!status.ok()) {
202 //unrecognized by rocksdb, try to interpret by ourselves.
203 r = tryInterpret(it->first, it->second, opt);
204 if (r < 0) {
205 derr << status.ToString() << dendl;
206 return -EINVAL;
207 }
208 }
209 lgeneric_dout(cct, 0) << " set rocksdb option " << it->first
210 << " = " << it->second << dendl;
211 }
212 return 0;
213}
214
215int RocksDBStore::init(string _options_str)
216{
217 options_str = _options_str;
218 rocksdb::Options opt;
219 //try parse options
220 if (options_str.length()) {
221 int r = ParseOptionsFromString(options_str, opt);
222 if (r != 0) {
223 return -EINVAL;
224 }
225 }
226 return 0;
227}
228
229int RocksDBStore::create_and_open(ostream &out)
230{
231 if (env) {
232 unique_ptr<rocksdb::Directory> dir;
233 env->NewDirectory(path, &dir);
234 } else {
235 int r = ::mkdir(path.c_str(), 0755);
236 if (r < 0)
237 r = -errno;
238 if (r < 0 && r != -EEXIST) {
239 derr << __func__ << " failed to create " << path << ": " << cpp_strerror(r)
240 << dendl;
241 return r;
242 }
243 }
244 return do_open(out, true);
245}
246
247int RocksDBStore::do_open(ostream &out, bool create_if_missing)
248{
249 rocksdb::Options opt;
250 rocksdb::Status status;
251
252 if (options_str.length()) {
253 int r = ParseOptionsFromString(options_str, opt);
254 if (r != 0) {
255 return -EINVAL;
256 }
257 }
258
259 if (g_conf->rocksdb_perf) {
260 dbstats = rocksdb::CreateDBStatistics();
261 opt.statistics = dbstats;
262 }
263
264 opt.create_if_missing = create_if_missing;
265 if (g_conf->rocksdb_separate_wal_dir) {
266 opt.wal_dir = path + ".wal";
267 }
268 if (g_conf->get_val<std::string>("rocksdb_db_paths").length()) {
269 list<string> paths;
270 get_str_list(g_conf->get_val<std::string>("rocksdb_db_paths"), "; \t", paths);
271 for (auto& p : paths) {
272 size_t pos = p.find(',');
273 if (pos == std::string::npos) {
274 derr << __func__ << " invalid db path item " << p << " in "
275 << g_conf->get_val<std::string>("rocksdb_db_paths") << dendl;
276 return -EINVAL;
277 }
278 string path = p.substr(0, pos);
279 string size_str = p.substr(pos + 1);
280 uint64_t size = atoll(size_str.c_str());
281 if (!size) {
282 derr << __func__ << " invalid db path item " << p << " in "
283 << g_conf->get_val<std::string>("rocksdb_db_paths") << dendl;
284 return -EINVAL;
285 }
286 opt.db_paths.push_back(rocksdb::DbPath(path, size));
287 dout(10) << __func__ << " db_path " << path << " size " << size << dendl;
288 }
289 }
290
291 if (g_conf->rocksdb_log_to_ceph_log) {
292 opt.info_log.reset(new CephRocksdbLogger(g_ceph_context));
293 }
294
295 if (priv) {
296 dout(10) << __func__ << " using custom Env " << priv << dendl;
297 opt.env = static_cast<rocksdb::Env*>(priv);
298 }
299
31f18b77 300 // caches
224ce89b 301 if (!set_cache_flag) {
31f18b77
FG
302 cache_size = g_conf->rocksdb_cache_size;
303 }
304 uint64_t row_cache_size = cache_size * g_conf->rocksdb_cache_row_ratio;
305 uint64_t block_cache_size = cache_size - row_cache_size;
224ce89b
WB
306
307 if (block_cache_size == 0) {
308 // disable block cache
309 dout(10) << __func__ << " block_cache_size " << block_cache_size
310 << ", setting no_block_cache " << dendl;
311 bbt_opts.no_block_cache = true;
31f18b77 312 } else {
224ce89b
WB
313 if (g_conf->rocksdb_cache_type == "lru") {
314 bbt_opts.block_cache = rocksdb::NewLRUCache(
315 block_cache_size,
316 g_conf->rocksdb_cache_shard_bits);
317 } else if (g_conf->rocksdb_cache_type == "clock") {
318 bbt_opts.block_cache = rocksdb::NewClockCache(
319 block_cache_size,
320 g_conf->rocksdb_cache_shard_bits);
321 } else {
322 derr << "unrecognized rocksdb_cache_type '" << g_conf->rocksdb_cache_type
323 << "'" << dendl;
324 return -EINVAL;
325 }
31f18b77 326 }
7c673cae 327 bbt_opts.block_size = g_conf->rocksdb_block_size;
31f18b77 328
224ce89b
WB
329 if (row_cache_size > 0)
330 opt.row_cache = rocksdb::NewLRUCache(row_cache_size,
31f18b77
FG
331 g_conf->rocksdb_cache_shard_bits);
332
7c673cae
FG
333 if (g_conf->kstore_rocksdb_bloom_bits_per_key > 0) {
334 dout(10) << __func__ << " set bloom filter bits per key to "
335 << g_conf->kstore_rocksdb_bloom_bits_per_key << dendl;
31f18b77
FG
336 bbt_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(
337 g_conf->kstore_rocksdb_bloom_bits_per_key));
7c673cae
FG
338 }
339 opt.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbt_opts));
31f18b77
FG
340 dout(10) << __func__ << " block size " << g_conf->rocksdb_block_size
341 << ", block_cache size " << prettybyte_t(block_cache_size)
342 << ", row_cache size " << prettybyte_t(row_cache_size)
343 << "; shards "
344 << (1 << g_conf->rocksdb_cache_shard_bits)
345 << ", type " << g_conf->rocksdb_cache_type
346 << dendl;
7c673cae
FG
347
348 opt.merge_operator.reset(new MergeOperatorRouter(*this));
349 status = rocksdb::DB::Open(opt, path, &db);
350 if (!status.ok()) {
351 derr << status.ToString() << dendl;
352 return -EINVAL;
353 }
354
355 PerfCountersBuilder plb(g_ceph_context, "rocksdb", l_rocksdb_first, l_rocksdb_last);
356 plb.add_u64_counter(l_rocksdb_gets, "get", "Gets");
357 plb.add_u64_counter(l_rocksdb_txns, "submit_transaction", "Submit transactions");
358 plb.add_u64_counter(l_rocksdb_txns_sync, "submit_transaction_sync", "Submit transactions sync");
359 plb.add_time_avg(l_rocksdb_get_latency, "get_latency", "Get latency");
360 plb.add_time_avg(l_rocksdb_submit_latency, "submit_latency", "Submit Latency");
361 plb.add_time_avg(l_rocksdb_submit_sync_latency, "submit_sync_latency", "Submit Sync Latency");
362 plb.add_u64_counter(l_rocksdb_compact, "compact", "Compactions");
363 plb.add_u64_counter(l_rocksdb_compact_range, "compact_range", "Compactions by range");
364 plb.add_u64_counter(l_rocksdb_compact_queue_merge, "compact_queue_merge", "Mergings of ranges in compaction queue");
365 plb.add_u64(l_rocksdb_compact_queue_len, "compact_queue_len", "Length of compaction queue");
366 plb.add_time_avg(l_rocksdb_write_wal_time, "rocksdb_write_wal_time", "Rocksdb write wal time");
367 plb.add_time_avg(l_rocksdb_write_memtable_time, "rocksdb_write_memtable_time", "Rocksdb write memtable time");
368 plb.add_time_avg(l_rocksdb_write_delay_time, "rocksdb_write_delay_time", "Rocksdb write delay time");
369 plb.add_time_avg(l_rocksdb_write_pre_and_post_process_time,
370 "rocksdb_write_pre_and_post_time", "total time spent on writing a record, excluding write process");
371 logger = plb.create_perf_counters();
372 cct->get_perfcounters_collection()->add(logger);
373
374 if (compact_on_mount) {
375 derr << "Compacting rocksdb store..." << dendl;
376 compact();
377 derr << "Finished compacting rocksdb store" << dendl;
378 }
379 return 0;
380}
381
382int RocksDBStore::_test_init(const string& dir)
383{
384 rocksdb::Options options;
385 options.create_if_missing = true;
386 rocksdb::DB *db;
387 rocksdb::Status status = rocksdb::DB::Open(options, dir, &db);
388 delete db;
389 db = nullptr;
390 return status.ok() ? 0 : -EIO;
391}
392
393RocksDBStore::~RocksDBStore()
394{
395 close();
396 delete logger;
397
398 // Ensure db is destroyed before dependent db_cache and filterpolicy
399 delete db;
400 db = nullptr;
401
402 if (priv) {
403 delete static_cast<rocksdb::Env*>(priv);
404 }
405}
406
407void RocksDBStore::close()
408{
409 // stop compaction thread
410 compact_queue_lock.Lock();
411 if (compact_thread.is_started()) {
412 compact_queue_stop = true;
413 compact_queue_cond.Signal();
414 compact_queue_lock.Unlock();
415 compact_thread.join();
416 } else {
417 compact_queue_lock.Unlock();
418 }
419
420 if (logger)
421 cct->get_perfcounters_collection()->remove(logger);
422}
423
424void RocksDBStore::split_stats(const std::string &s, char delim, std::vector<std::string> &elems) {
425 std::stringstream ss;
426 ss.str(s);
427 std::string item;
428 while (std::getline(ss, item, delim)) {
429 elems.push_back(item);
430 }
431}
432
433void RocksDBStore::get_statistics(Formatter *f)
434{
435 if (!g_conf->rocksdb_perf) {
436 dout(20) << __func__ << "RocksDB perf is disabled, can't probe for stats"
437 << dendl;
438 return;
439 }
440
441 if (g_conf->rocksdb_collect_compaction_stats) {
442 std::string stat_str;
443 bool status = db->GetProperty("rocksdb.stats", &stat_str);
444 if (status) {
445 f->open_object_section("rocksdb_statistics");
446 f->dump_string("rocksdb_compaction_statistics", "");
447 vector<string> stats;
448 split_stats(stat_str, '\n', stats);
449 for (auto st :stats) {
450 f->dump_string("", st);
451 }
452 f->close_section();
453 }
454 }
455 if (g_conf->rocksdb_collect_extended_stats) {
456 if (dbstats) {
457 f->open_object_section("rocksdb_extended_statistics");
458 string stat_str = dbstats->ToString();
459 vector<string> stats;
460 split_stats(stat_str, '\n', stats);
461 f->dump_string("rocksdb_extended_statistics", "");
462 for (auto st :stats) {
463 f->dump_string(".", st);
464 }
465 f->close_section();
466 }
467 f->open_object_section("rocksdbstore_perf_counters");
468 logger->dump_formatted(f,0);
469 f->close_section();
470 }
471 if (g_conf->rocksdb_collect_memory_stats) {
472 f->open_object_section("rocksdb_memtable_statistics");
473 std::string str(stringify(bbt_opts.block_cache->GetUsage()));
474 f->dump_string("block_cache_usage", str.data());
475 str.clear();
476 str.append(stringify(bbt_opts.block_cache->GetPinnedUsage()));
477 f->dump_string("block_cache_pinned_blocks_usage", str);
478 str.clear();
479 db->GetProperty("rocksdb.cur-size-all-mem-tables", &str);
480 f->dump_string("rocksdb_memtable_usage", str);
481 f->close_section();
482 }
483}
484
485int RocksDBStore::submit_transaction(KeyValueDB::Transaction t)
486{
487 utime_t start = ceph_clock_now();
488 // enable rocksdb breakdown
489 // considering performance overhead, default is disabled
490 if (g_conf->rocksdb_perf) {
491 rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
492 rocksdb::perf_context.Reset();
493 }
494
495 RocksDBTransactionImpl * _t =
496 static_cast<RocksDBTransactionImpl *>(t.get());
497 rocksdb::WriteOptions woptions;
498 woptions.disableWAL = disableWAL;
499 lgeneric_subdout(cct, rocksdb, 30) << __func__;
500 RocksWBHandler bat_txc;
501 _t->bat.Iterate(&bat_txc);
502 *_dout << " Rocksdb transaction: " << bat_txc.seen << dendl;
503
504 rocksdb::Status s = db->Write(woptions, &_t->bat);
505 if (!s.ok()) {
506 RocksWBHandler rocks_txc;
507 _t->bat.Iterate(&rocks_txc);
508 derr << __func__ << " error: " << s.ToString() << " code = " << s.code()
509 << " Rocksdb transaction: " << rocks_txc.seen << dendl;
510 }
511 utime_t lat = ceph_clock_now() - start;
512
513 if (g_conf->rocksdb_perf) {
514 utime_t write_memtable_time;
515 utime_t write_delay_time;
516 utime_t write_wal_time;
517 utime_t write_pre_and_post_process_time;
518 write_wal_time.set_from_double(
519 static_cast<double>(rocksdb::perf_context.write_wal_time)/1000000000);
520 write_memtable_time.set_from_double(
521 static_cast<double>(rocksdb::perf_context.write_memtable_time)/1000000000);
522 write_delay_time.set_from_double(
523 static_cast<double>(rocksdb::perf_context.write_delay_time)/1000000000);
524 write_pre_and_post_process_time.set_from_double(
525 static_cast<double>(rocksdb::perf_context.write_pre_and_post_process_time)/1000000000);
526 logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time);
527 logger->tinc(l_rocksdb_write_delay_time, write_delay_time);
528 logger->tinc(l_rocksdb_write_wal_time, write_wal_time);
529 logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time);
530 }
531
532 logger->inc(l_rocksdb_txns);
533 logger->tinc(l_rocksdb_submit_latency, lat);
534
535 return s.ok() ? 0 : -1;
536}
537
538int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t)
539{
540 utime_t start = ceph_clock_now();
541 // enable rocksdb breakdown
542 // considering performance overhead, default is disabled
543 if (g_conf->rocksdb_perf) {
544 rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
545 rocksdb::perf_context.Reset();
546 }
547
548 RocksDBTransactionImpl * _t =
549 static_cast<RocksDBTransactionImpl *>(t.get());
550 rocksdb::WriteOptions woptions;
551 woptions.sync = true;
552 woptions.disableWAL = disableWAL;
553 lgeneric_subdout(cct, rocksdb, 30) << __func__;
554 RocksWBHandler bat_txc;
555 _t->bat.Iterate(&bat_txc);
556 *_dout << " Rocksdb transaction: " << bat_txc.seen << dendl;
557
558 rocksdb::Status s = db->Write(woptions, &_t->bat);
559 if (!s.ok()) {
560 RocksWBHandler rocks_txc;
561 _t->bat.Iterate(&rocks_txc);
562 derr << __func__ << " error: " << s.ToString() << " code = " << s.code()
563 << " Rocksdb transaction: " << rocks_txc.seen << dendl;
564 }
565 utime_t lat = ceph_clock_now() - start;
566
567 if (g_conf->rocksdb_perf) {
568 utime_t write_memtable_time;
569 utime_t write_delay_time;
570 utime_t write_wal_time;
571 utime_t write_pre_and_post_process_time;
572 write_wal_time.set_from_double(
573 static_cast<double>(rocksdb::perf_context.write_wal_time)/1000000000);
574 write_memtable_time.set_from_double(
575 static_cast<double>(rocksdb::perf_context.write_memtable_time)/1000000000);
576 write_delay_time.set_from_double(
577 static_cast<double>(rocksdb::perf_context.write_delay_time)/1000000000);
578 write_pre_and_post_process_time.set_from_double(
579 static_cast<double>(rocksdb::perf_context.write_pre_and_post_process_time)/1000000000);
580 logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time);
581 logger->tinc(l_rocksdb_write_delay_time, write_delay_time);
582 logger->tinc(l_rocksdb_write_wal_time, write_wal_time);
583 logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time);
584 }
585
586 logger->inc(l_rocksdb_txns_sync);
587 logger->tinc(l_rocksdb_submit_sync_latency, lat);
588
589 return s.ok() ? 0 : -1;
590}
591
592RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db)
593{
594 db = _db;
595}
596
597void RocksDBStore::RocksDBTransactionImpl::set(
598 const string &prefix,
599 const string &k,
600 const bufferlist &to_set_bl)
601{
602 string key = combine_strings(prefix, k);
603
604 // bufferlist::c_str() is non-constant, so we can't call c_str()
605 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
606 bat.Put(rocksdb::Slice(key),
607 rocksdb::Slice(to_set_bl.buffers().front().c_str(),
608 to_set_bl.length()));
609 } else {
31f18b77
FG
610 rocksdb::Slice key_slice(key);
611 rocksdb::Slice value_slices[to_set_bl.buffers().size()];
612 bat.Put(nullptr, rocksdb::SliceParts(&key_slice, 1),
613 prepare_sliceparts(to_set_bl, value_slices));
7c673cae
FG
614 }
615}
616
617void RocksDBStore::RocksDBTransactionImpl::set(
618 const string &prefix,
619 const char *k, size_t keylen,
620 const bufferlist &to_set_bl)
621{
622 string key;
623 combine_strings(prefix, k, keylen, &key);
624
625 // bufferlist::c_str() is non-constant, so we can't call c_str()
626 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
627 bat.Put(rocksdb::Slice(key),
628 rocksdb::Slice(to_set_bl.buffers().front().c_str(),
629 to_set_bl.length()));
630 } else {
31f18b77
FG
631 rocksdb::Slice key_slice(key);
632 rocksdb::Slice value_slices[to_set_bl.buffers().size()];
633 bat.Put(nullptr, rocksdb::SliceParts(&key_slice, 1),
634 prepare_sliceparts(to_set_bl, value_slices));
7c673cae
FG
635 }
636}
637
638void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
639 const string &k)
640{
641 bat.Delete(combine_strings(prefix, k));
642}
643
644void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
645 const char *k,
646 size_t keylen)
647{
648 string key;
649 combine_strings(prefix, k, keylen, &key);
650 bat.Delete(key);
651}
652
653void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix,
654 const string &k)
655{
656 bat.SingleDelete(combine_strings(prefix, k));
657}
658
659void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
660{
31f18b77
FG
661 if (db->enable_rmrange) {
662 string endprefix = prefix;
663 endprefix.push_back('\x01');
664 bat.DeleteRange(combine_strings(prefix, string()),
665 combine_strings(endprefix, string()));
666 } else {
667 KeyValueDB::Iterator it = db->get_iterator(prefix);
668 for (it->seek_to_first();
669 it->valid();
670 it->next()) {
671 bat.Delete(combine_strings(prefix, it->key()));
672 }
7c673cae
FG
673 }
674}
675
676void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix,
677 const string &start,
678 const string &end)
679{
680 if (db->enable_rmrange) {
681 bat.DeleteRange(combine_strings(prefix, start), combine_strings(prefix, end));
682 } else {
683 auto it = db->get_iterator(prefix);
684 it->lower_bound(start);
685 while (it->valid()) {
686 if (it->key() >= end) {
687 break;
688 }
689 bat.Delete(combine_strings(prefix, it->key()));
690 it->next();
691 }
692 }
693}
694
695void RocksDBStore::RocksDBTransactionImpl::merge(
696 const string &prefix,
697 const string &k,
698 const bufferlist &to_set_bl)
699{
700 string key = combine_strings(prefix, k);
701
702 // bufferlist::c_str() is non-constant, so we can't call c_str()
703 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
704 bat.Merge(rocksdb::Slice(key),
705 rocksdb::Slice(to_set_bl.buffers().front().c_str(),
706 to_set_bl.length()));
707 } else {
708 // make a copy
31f18b77
FG
709 rocksdb::Slice key_slice(key);
710 rocksdb::Slice value_slices[to_set_bl.buffers().size()];
711 bat.Merge(nullptr, rocksdb::SliceParts(&key_slice, 1),
712 prepare_sliceparts(to_set_bl, value_slices));
7c673cae
FG
713 }
714}
715
716//gets will bypass RocksDB row cache, since it uses iterator
717int RocksDBStore::get(
718 const string &prefix,
719 const std::set<string> &keys,
720 std::map<string, bufferlist> *out)
721{
722 utime_t start = ceph_clock_now();
723 for (std::set<string>::const_iterator i = keys.begin();
724 i != keys.end(); ++i) {
725 std::string value;
726 std::string bound = combine_strings(prefix, *i);
727 auto status = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(bound), &value);
224ce89b 728 if (status.ok()) {
7c673cae 729 (*out)[*i].append(value);
224ce89b
WB
730 } else if (status.IsIOError()) {
731 ceph_abort_msg(cct, status.ToString());
732 }
733
7c673cae
FG
734 }
735 utime_t lat = ceph_clock_now() - start;
736 logger->inc(l_rocksdb_gets);
737 logger->tinc(l_rocksdb_get_latency, lat);
738 return 0;
739}
740
741int RocksDBStore::get(
742 const string &prefix,
743 const string &key,
744 bufferlist *out)
745{
746 assert(out && (out->length() == 0));
747 utime_t start = ceph_clock_now();
748 int r = 0;
749 string value, k;
750 rocksdb::Status s;
751 k = combine_strings(prefix, key);
752 s = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(k), &value);
753 if (s.ok()) {
754 out->append(value);
224ce89b 755 } else if (s.IsNotFound()) {
7c673cae 756 r = -ENOENT;
224ce89b
WB
757 } else {
758 ceph_abort_msg(cct, s.ToString());
7c673cae
FG
759 }
760 utime_t lat = ceph_clock_now() - start;
761 logger->inc(l_rocksdb_gets);
762 logger->tinc(l_rocksdb_get_latency, lat);
763 return r;
764}
765
766int RocksDBStore::get(
767 const string& prefix,
768 const char *key,
769 size_t keylen,
770 bufferlist *out)
771{
772 assert(out && (out->length() == 0));
773 utime_t start = ceph_clock_now();
774 int r = 0;
775 string value, k;
776 combine_strings(prefix, key, keylen, &k);
777 rocksdb::Status s;
778 s = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(k), &value);
779 if (s.ok()) {
780 out->append(value);
224ce89b 781 } else if (s.IsNotFound()) {
7c673cae 782 r = -ENOENT;
224ce89b
WB
783 } else {
784 ceph_abort_msg(cct, s.ToString());
7c673cae
FG
785 }
786 utime_t lat = ceph_clock_now() - start;
787 logger->inc(l_rocksdb_gets);
788 logger->tinc(l_rocksdb_get_latency, lat);
789 return r;
790}
791
792int RocksDBStore::split_key(rocksdb::Slice in, string *prefix, string *key)
793{
794 size_t prefix_len = 0;
795
796 // Find separator inside Slice
797 char* separator = (char*) memchr(in.data(), 0, in.size());
798 if (separator == NULL)
799 return -EINVAL;
800 prefix_len = size_t(separator - in.data());
801 if (prefix_len >= in.size())
802 return -EINVAL;
803
804 // Fetch prefix and/or key directly from Slice
805 if (prefix)
806 *prefix = string(in.data(), prefix_len);
807 if (key)
808 *key = string(separator+1, in.size()-prefix_len-1);
809 return 0;
810}
811
812void RocksDBStore::compact()
813{
814 logger->inc(l_rocksdb_compact);
815 rocksdb::CompactRangeOptions options;
816 db->CompactRange(options, nullptr, nullptr);
817}
818
819
820void RocksDBStore::compact_thread_entry()
821{
822 compact_queue_lock.Lock();
823 while (!compact_queue_stop) {
824 while (!compact_queue.empty()) {
825 pair<string,string> range = compact_queue.front();
826 compact_queue.pop_front();
827 logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
828 compact_queue_lock.Unlock();
829 logger->inc(l_rocksdb_compact_range);
830 compact_range(range.first, range.second);
831 compact_queue_lock.Lock();
832 continue;
833 }
834 compact_queue_cond.Wait(compact_queue_lock);
835 }
836 compact_queue_lock.Unlock();
837}
838
839void RocksDBStore::compact_range_async(const string& start, const string& end)
840{
841 Mutex::Locker l(compact_queue_lock);
842
843 // try to merge adjacent ranges. this is O(n), but the queue should
844 // be short. note that we do not cover all overlap cases and merge
845 // opportunities here, but we capture the ones we currently need.
846 list< pair<string,string> >::iterator p = compact_queue.begin();
847 while (p != compact_queue.end()) {
848 if (p->first == start && p->second == end) {
849 // dup; no-op
850 return;
851 }
852 if (p->first <= end && p->first > start) {
853 // merge with existing range to the right
854 compact_queue.push_back(make_pair(start, p->second));
855 compact_queue.erase(p);
856 logger->inc(l_rocksdb_compact_queue_merge);
857 break;
858 }
859 if (p->second >= start && p->second < end) {
860 // merge with existing range to the left
861 compact_queue.push_back(make_pair(p->first, end));
862 compact_queue.erase(p);
863 logger->inc(l_rocksdb_compact_queue_merge);
864 break;
865 }
866 ++p;
867 }
868 if (p == compact_queue.end()) {
869 // no merge, new entry.
870 compact_queue.push_back(make_pair(start, end));
871 logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
872 }
873 compact_queue_cond.Signal();
874 if (!compact_thread.is_started()) {
875 compact_thread.create("rstore_compact");
876 }
877}
878bool RocksDBStore::check_omap_dir(string &omap_dir)
879{
880 rocksdb::Options options;
881 options.create_if_missing = true;
882 rocksdb::DB *db;
883 rocksdb::Status status = rocksdb::DB::Open(options, omap_dir, &db);
884 delete db;
885 db = nullptr;
886 return status.ok();
887}
888void RocksDBStore::compact_range(const string& start, const string& end)
889{
890 rocksdb::CompactRangeOptions options;
891 rocksdb::Slice cstart(start);
892 rocksdb::Slice cend(end);
893 db->CompactRange(options, &cstart, &cend);
894}
895RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl()
896{
897 delete dbiter;
898}
899int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first()
900{
901 dbiter->SeekToFirst();
224ce89b 902 assert(!dbiter->status().IsIOError());
7c673cae
FG
903 return dbiter->status().ok() ? 0 : -1;
904}
905int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string &prefix)
906{
907 rocksdb::Slice slice_prefix(prefix);
908 dbiter->Seek(slice_prefix);
224ce89b 909 assert(!dbiter->status().IsIOError());
7c673cae
FG
910 return dbiter->status().ok() ? 0 : -1;
911}
912int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last()
913{
914 dbiter->SeekToLast();
224ce89b 915 assert(!dbiter->status().IsIOError());
7c673cae
FG
916 return dbiter->status().ok() ? 0 : -1;
917}
918int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string &prefix)
919{
920 string limit = past_prefix(prefix);
921 rocksdb::Slice slice_limit(limit);
922 dbiter->Seek(slice_limit);
923
924 if (!dbiter->Valid()) {
925 dbiter->SeekToLast();
926 } else {
927 dbiter->Prev();
928 }
929 return dbiter->status().ok() ? 0 : -1;
930}
931int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after)
932{
933 lower_bound(prefix, after);
934 if (valid()) {
935 pair<string,string> key = raw_key();
936 if (key.first == prefix && key.second == after)
937 next();
938 }
939 return dbiter->status().ok() ? 0 : -1;
940}
941int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to)
942{
943 string bound = combine_strings(prefix, to);
944 rocksdb::Slice slice_bound(bound);
945 dbiter->Seek(slice_bound);
946 return dbiter->status().ok() ? 0 : -1;
947}
948bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid()
949{
950 return dbiter->Valid();
951}
952int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next()
953{
954 if (valid()) {
955 dbiter->Next();
956 }
224ce89b 957 assert(!dbiter->status().IsIOError());
7c673cae
FG
958 return dbiter->status().ok() ? 0 : -1;
959}
960int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev()
961{
962 if (valid()) {
963 dbiter->Prev();
964 }
224ce89b 965 assert(!dbiter->status().IsIOError());
7c673cae
FG
966 return dbiter->status().ok() ? 0 : -1;
967}
968string RocksDBStore::RocksDBWholeSpaceIteratorImpl::key()
969{
970 string out_key;
971 split_key(dbiter->key(), 0, &out_key);
972 return out_key;
973}
974pair<string,string> RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key()
975{
976 string prefix, key;
977 split_key(dbiter->key(), &prefix, &key);
978 return make_pair(prefix, key);
979}
980
981bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key_is_prefixed(const string &prefix) {
982 // Look for "prefix\0" right in rocksb::Slice
983 rocksdb::Slice key = dbiter->key();
984 if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) {
985 return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0;
986 } else {
987 return false;
988 }
989}
990
991bufferlist RocksDBStore::RocksDBWholeSpaceIteratorImpl::value()
992{
993 return to_bufferlist(dbiter->value());
994}
995
996size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::key_size()
997{
998 return dbiter->key().size();
999}
1000
1001size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_size()
1002{
1003 return dbiter->value().size();
1004}
1005
1006bufferptr RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_as_ptr()
1007{
1008 rocksdb::Slice val = dbiter->value();
1009 return bufferptr(val.data(), val.size());
1010}
1011
1012int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status()
1013{
1014 return dbiter->status().ok() ? 0 : -1;
1015}
1016
1017string RocksDBStore::past_prefix(const string &prefix)
1018{
1019 string limit = prefix;
1020 limit.push_back(1);
1021 return limit;
1022}
1023
1024RocksDBStore::WholeSpaceIterator RocksDBStore::_get_iterator()
1025{
1026 return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
1027 db->NewIterator(rocksdb::ReadOptions()));
1028}
1029