]> git.proxmox.com Git - ceph.git/blob - ceph/src/kv/RocksDBStore.cc
0ea96ca12381d04a783bd614e57637aa38e25498
[ceph.git] / ceph / src / kv / RocksDBStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <set>
5 #include <map>
6 #include <string>
7 #include <memory>
8 #include <errno.h>
9 #include <unistd.h>
10 #include <sys/types.h>
11 #include <sys/stat.h>
12
13 #include "rocksdb/db.h"
14 #include "rocksdb/table.h"
15 #include "rocksdb/env.h"
16 #include "rocksdb/slice.h"
17 #include "rocksdb/cache.h"
18 #include "rocksdb/filter_policy.h"
19 #include "rocksdb/utilities/convenience.h"
20 #include "rocksdb/merge_operator.h"
21 using std::string;
22 #include "common/perf_counters.h"
23 #include "common/debug.h"
24 #include "include/str_list.h"
25 #include "include/stringify.h"
26 #include "include/str_map.h"
27 #include "KeyValueDB.h"
28 #include "RocksDBStore.h"
29
30 #include "common/debug.h"
31
32 #define dout_context cct
33 #define dout_subsys ceph_subsys_rocksdb
34 #undef dout_prefix
35 #define dout_prefix *_dout << "rocksdb: "
36
37 static rocksdb::SliceParts prepare_sliceparts(const bufferlist &bl, rocksdb::Slice *slices)
38 {
39 unsigned n = 0;
40 for (std::list<buffer::ptr>::const_iterator p = bl.buffers().begin();
41 p != bl.buffers().end(); ++p, ++n) {
42 slices[n].data_ = p->c_str();
43 slices[n].size_ = p->length();
44 }
45 return rocksdb::SliceParts(slices, n);
46 }
47
48 //
49 // One of these per rocksdb instance, implements the merge operator prefix stuff
50 //
51 class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperator {
52 RocksDBStore& store;
53 public:
54 const char *Name() const override {
55 // Construct a name that rocksDB will validate against. We want to
56 // do this in a way that doesn't constrain the ordering of calls
57 // to set_merge_operator, so sort the merge operators and then
58 // construct a name from all of those parts.
59 store.assoc_name.clear();
60 map<std::string,std::string> names;
61 for (auto& p : store.merge_ops) names[p.first] = p.second->name();
62 for (auto& p : names) {
63 store.assoc_name += '.';
64 store.assoc_name += p.first;
65 store.assoc_name += ':';
66 store.assoc_name += p.second;
67 }
68 return store.assoc_name.c_str();
69 }
70
71 MergeOperatorRouter(RocksDBStore &_store) : store(_store) {}
72
73 bool Merge(const rocksdb::Slice& key,
74 const rocksdb::Slice* existing_value,
75 const rocksdb::Slice& value,
76 std::string* new_value,
77 rocksdb::Logger* logger) const override {
78 // Check each prefix
79 for (auto& p : store.merge_ops) {
80 if (p.first.compare(0, p.first.length(),
81 key.data(), p.first.length()) == 0 &&
82 key.data()[p.first.length()] == 0) {
83 if (existing_value) {
84 p.second->merge(existing_value->data(), existing_value->size(),
85 value.data(), value.size(),
86 new_value);
87 } else {
88 p.second->merge_nonexistent(value.data(), value.size(), new_value);
89 }
90 break;
91 }
92 }
93 return true; // OK :)
94 }
95
96 };
97
98 int RocksDBStore::set_merge_operator(
99 const string& prefix,
100 std::shared_ptr<KeyValueDB::MergeOperator> mop)
101 {
102 // If you fail here, it's because you can't do this on an open database
103 assert(db == nullptr);
104 merge_ops.push_back(std::make_pair(prefix,mop));
105 return 0;
106 }
107
108 class CephRocksdbLogger : public rocksdb::Logger {
109 CephContext *cct;
110 public:
111 explicit CephRocksdbLogger(CephContext *c) : cct(c) {
112 cct->get();
113 }
114 ~CephRocksdbLogger() override {
115 cct->put();
116 }
117
118 // Write an entry to the log file with the specified format.
119 void Logv(const char* format, va_list ap) override {
120 Logv(rocksdb::INFO_LEVEL, format, ap);
121 }
122
123 // Write an entry to the log file with the specified log level
124 // and format. Any log with level under the internal log level
125 // of *this (see @SetInfoLogLevel and @GetInfoLogLevel) will not be
126 // printed.
127 void Logv(const rocksdb::InfoLogLevel log_level, const char* format,
128 va_list ap) override {
129 int v = rocksdb::NUM_INFO_LOG_LEVELS - log_level - 1;
130 dout(v);
131 char buf[65536];
132 vsnprintf(buf, sizeof(buf), format, ap);
133 *_dout << buf << dendl;
134 }
135 };
136
137 rocksdb::Logger *create_rocksdb_ceph_logger()
138 {
139 return new CephRocksdbLogger(g_ceph_context);
140 }
141
142 int string2bool(string val, bool &b_val)
143 {
144 if (strcasecmp(val.c_str(), "false") == 0) {
145 b_val = false;
146 return 0;
147 } else if (strcasecmp(val.c_str(), "true") == 0) {
148 b_val = true;
149 return 0;
150 } else {
151 std::string err;
152 int b = strict_strtol(val.c_str(), 10, &err);
153 if (!err.empty())
154 return -EINVAL;
155 b_val = !!b;
156 return 0;
157 }
158 }
159
160 int RocksDBStore::tryInterpret(const string key, const string val, rocksdb::Options &opt)
161 {
162 if (key == "compaction_threads") {
163 std::string err;
164 int f = strict_sistrtoll(val.c_str(), &err);
165 if (!err.empty())
166 return -EINVAL;
167 //Low priority threadpool is used for compaction
168 opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::LOW);
169 } else if (key == "flusher_threads") {
170 std::string err;
171 int f = strict_sistrtoll(val.c_str(), &err);
172 if (!err.empty())
173 return -EINVAL;
174 //High priority threadpool is used for flusher
175 opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::HIGH);
176 } else if (key == "compact_on_mount") {
177 int ret = string2bool(val, compact_on_mount);
178 if (ret != 0)
179 return ret;
180 } else if (key == "disableWAL") {
181 int ret = string2bool(val, disableWAL);
182 if (ret != 0)
183 return ret;
184 } else {
185 //unrecognize config options.
186 return -EINVAL;
187 }
188 return 0;
189 }
190
191 int RocksDBStore::ParseOptionsFromString(const string opt_str, rocksdb::Options &opt)
192 {
193 map<string, string> str_map;
194 int r = get_str_map(opt_str, &str_map, ",\n;");
195 if (r < 0)
196 return r;
197 map<string, string>::iterator it;
198 for(it = str_map.begin(); it != str_map.end(); ++it) {
199 string this_opt = it->first + "=" + it->second;
200 rocksdb::Status status = rocksdb::GetOptionsFromString(opt, this_opt , &opt);
201 if (!status.ok()) {
202 //unrecognized by rocksdb, try to interpret by ourselves.
203 r = tryInterpret(it->first, it->second, opt);
204 if (r < 0) {
205 derr << status.ToString() << dendl;
206 return -EINVAL;
207 }
208 }
209 lgeneric_dout(cct, 0) << " set rocksdb option " << it->first
210 << " = " << it->second << dendl;
211 }
212 return 0;
213 }
214
215 int RocksDBStore::init(string _options_str)
216 {
217 options_str = _options_str;
218 rocksdb::Options opt;
219 //try parse options
220 if (options_str.length()) {
221 int r = ParseOptionsFromString(options_str, opt);
222 if (r != 0) {
223 return -EINVAL;
224 }
225 }
226 return 0;
227 }
228
229 int RocksDBStore::create_and_open(ostream &out)
230 {
231 if (env) {
232 unique_ptr<rocksdb::Directory> dir;
233 env->NewDirectory(path, &dir);
234 } else {
235 int r = ::mkdir(path.c_str(), 0755);
236 if (r < 0)
237 r = -errno;
238 if (r < 0 && r != -EEXIST) {
239 derr << __func__ << " failed to create " << path << ": " << cpp_strerror(r)
240 << dendl;
241 return r;
242 }
243 }
244 return do_open(out, true);
245 }
246
247 int RocksDBStore::do_open(ostream &out, bool create_if_missing)
248 {
249 rocksdb::Options opt;
250 rocksdb::Status status;
251
252 if (options_str.length()) {
253 int r = ParseOptionsFromString(options_str, opt);
254 if (r != 0) {
255 return -EINVAL;
256 }
257 }
258
259 if (g_conf->rocksdb_perf) {
260 dbstats = rocksdb::CreateDBStatistics();
261 opt.statistics = dbstats;
262 }
263
264 opt.create_if_missing = create_if_missing;
265 if (g_conf->rocksdb_separate_wal_dir) {
266 opt.wal_dir = path + ".wal";
267 }
268 if (g_conf->get_val<std::string>("rocksdb_db_paths").length()) {
269 list<string> paths;
270 get_str_list(g_conf->get_val<std::string>("rocksdb_db_paths"), "; \t", paths);
271 for (auto& p : paths) {
272 size_t pos = p.find(',');
273 if (pos == std::string::npos) {
274 derr << __func__ << " invalid db path item " << p << " in "
275 << g_conf->get_val<std::string>("rocksdb_db_paths") << dendl;
276 return -EINVAL;
277 }
278 string path = p.substr(0, pos);
279 string size_str = p.substr(pos + 1);
280 uint64_t size = atoll(size_str.c_str());
281 if (!size) {
282 derr << __func__ << " invalid db path item " << p << " in "
283 << g_conf->get_val<std::string>("rocksdb_db_paths") << dendl;
284 return -EINVAL;
285 }
286 opt.db_paths.push_back(rocksdb::DbPath(path, size));
287 dout(10) << __func__ << " db_path " << path << " size " << size << dendl;
288 }
289 }
290
291 if (g_conf->rocksdb_log_to_ceph_log) {
292 opt.info_log.reset(new CephRocksdbLogger(g_ceph_context));
293 }
294
295 if (priv) {
296 dout(10) << __func__ << " using custom Env " << priv << dendl;
297 opt.env = static_cast<rocksdb::Env*>(priv);
298 }
299
300 // caches
301 if (!cache_size) {
302 cache_size = g_conf->rocksdb_cache_size;
303 }
304 uint64_t row_cache_size = cache_size * g_conf->rocksdb_cache_row_ratio;
305 uint64_t block_cache_size = cache_size - row_cache_size;
306 if (g_conf->rocksdb_cache_type == "lru") {
307 bbt_opts.block_cache = rocksdb::NewLRUCache(
308 block_cache_size,
309 g_conf->rocksdb_cache_shard_bits);
310 } else if (g_conf->rocksdb_cache_type == "clock") {
311 bbt_opts.block_cache = rocksdb::NewClockCache(
312 block_cache_size,
313 g_conf->rocksdb_cache_shard_bits);
314 } else {
315 derr << "unrecognized rocksdb_cache_type '" << g_conf->rocksdb_cache_type
316 << "'" << dendl;
317 return -EINVAL;
318 }
319 bbt_opts.block_size = g_conf->rocksdb_block_size;
320
321 opt.row_cache = rocksdb::NewLRUCache(row_cache_size,
322 g_conf->rocksdb_cache_shard_bits);
323
324 if (g_conf->kstore_rocksdb_bloom_bits_per_key > 0) {
325 dout(10) << __func__ << " set bloom filter bits per key to "
326 << g_conf->kstore_rocksdb_bloom_bits_per_key << dendl;
327 bbt_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(
328 g_conf->kstore_rocksdb_bloom_bits_per_key));
329 }
330 opt.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbt_opts));
331 dout(10) << __func__ << " block size " << g_conf->rocksdb_block_size
332 << ", block_cache size " << prettybyte_t(block_cache_size)
333 << ", row_cache size " << prettybyte_t(row_cache_size)
334 << "; shards "
335 << (1 << g_conf->rocksdb_cache_shard_bits)
336 << ", type " << g_conf->rocksdb_cache_type
337 << dendl;
338
339 opt.merge_operator.reset(new MergeOperatorRouter(*this));
340 status = rocksdb::DB::Open(opt, path, &db);
341 if (!status.ok()) {
342 derr << status.ToString() << dendl;
343 return -EINVAL;
344 }
345
346 PerfCountersBuilder plb(g_ceph_context, "rocksdb", l_rocksdb_first, l_rocksdb_last);
347 plb.add_u64_counter(l_rocksdb_gets, "get", "Gets");
348 plb.add_u64_counter(l_rocksdb_txns, "submit_transaction", "Submit transactions");
349 plb.add_u64_counter(l_rocksdb_txns_sync, "submit_transaction_sync", "Submit transactions sync");
350 plb.add_time_avg(l_rocksdb_get_latency, "get_latency", "Get latency");
351 plb.add_time_avg(l_rocksdb_submit_latency, "submit_latency", "Submit Latency");
352 plb.add_time_avg(l_rocksdb_submit_sync_latency, "submit_sync_latency", "Submit Sync Latency");
353 plb.add_u64_counter(l_rocksdb_compact, "compact", "Compactions");
354 plb.add_u64_counter(l_rocksdb_compact_range, "compact_range", "Compactions by range");
355 plb.add_u64_counter(l_rocksdb_compact_queue_merge, "compact_queue_merge", "Mergings of ranges in compaction queue");
356 plb.add_u64(l_rocksdb_compact_queue_len, "compact_queue_len", "Length of compaction queue");
357 plb.add_time_avg(l_rocksdb_write_wal_time, "rocksdb_write_wal_time", "Rocksdb write wal time");
358 plb.add_time_avg(l_rocksdb_write_memtable_time, "rocksdb_write_memtable_time", "Rocksdb write memtable time");
359 plb.add_time_avg(l_rocksdb_write_delay_time, "rocksdb_write_delay_time", "Rocksdb write delay time");
360 plb.add_time_avg(l_rocksdb_write_pre_and_post_process_time,
361 "rocksdb_write_pre_and_post_time", "total time spent on writing a record, excluding write process");
362 logger = plb.create_perf_counters();
363 cct->get_perfcounters_collection()->add(logger);
364
365 if (compact_on_mount) {
366 derr << "Compacting rocksdb store..." << dendl;
367 compact();
368 derr << "Finished compacting rocksdb store" << dendl;
369 }
370 return 0;
371 }
372
373 int RocksDBStore::_test_init(const string& dir)
374 {
375 rocksdb::Options options;
376 options.create_if_missing = true;
377 rocksdb::DB *db;
378 rocksdb::Status status = rocksdb::DB::Open(options, dir, &db);
379 delete db;
380 db = nullptr;
381 return status.ok() ? 0 : -EIO;
382 }
383
384 RocksDBStore::~RocksDBStore()
385 {
386 close();
387 delete logger;
388
389 // Ensure db is destroyed before dependent db_cache and filterpolicy
390 delete db;
391 db = nullptr;
392
393 if (priv) {
394 delete static_cast<rocksdb::Env*>(priv);
395 }
396 }
397
398 void RocksDBStore::close()
399 {
400 // stop compaction thread
401 compact_queue_lock.Lock();
402 if (compact_thread.is_started()) {
403 compact_queue_stop = true;
404 compact_queue_cond.Signal();
405 compact_queue_lock.Unlock();
406 compact_thread.join();
407 } else {
408 compact_queue_lock.Unlock();
409 }
410
411 if (logger)
412 cct->get_perfcounters_collection()->remove(logger);
413 }
414
415 void RocksDBStore::split_stats(const std::string &s, char delim, std::vector<std::string> &elems) {
416 std::stringstream ss;
417 ss.str(s);
418 std::string item;
419 while (std::getline(ss, item, delim)) {
420 elems.push_back(item);
421 }
422 }
423
424 void RocksDBStore::get_statistics(Formatter *f)
425 {
426 if (!g_conf->rocksdb_perf) {
427 dout(20) << __func__ << "RocksDB perf is disabled, can't probe for stats"
428 << dendl;
429 return;
430 }
431
432 if (g_conf->rocksdb_collect_compaction_stats) {
433 std::string stat_str;
434 bool status = db->GetProperty("rocksdb.stats", &stat_str);
435 if (status) {
436 f->open_object_section("rocksdb_statistics");
437 f->dump_string("rocksdb_compaction_statistics", "");
438 vector<string> stats;
439 split_stats(stat_str, '\n', stats);
440 for (auto st :stats) {
441 f->dump_string("", st);
442 }
443 f->close_section();
444 }
445 }
446 if (g_conf->rocksdb_collect_extended_stats) {
447 if (dbstats) {
448 f->open_object_section("rocksdb_extended_statistics");
449 string stat_str = dbstats->ToString();
450 vector<string> stats;
451 split_stats(stat_str, '\n', stats);
452 f->dump_string("rocksdb_extended_statistics", "");
453 for (auto st :stats) {
454 f->dump_string(".", st);
455 }
456 f->close_section();
457 }
458 f->open_object_section("rocksdbstore_perf_counters");
459 logger->dump_formatted(f,0);
460 f->close_section();
461 }
462 if (g_conf->rocksdb_collect_memory_stats) {
463 f->open_object_section("rocksdb_memtable_statistics");
464 std::string str(stringify(bbt_opts.block_cache->GetUsage()));
465 f->dump_string("block_cache_usage", str.data());
466 str.clear();
467 str.append(stringify(bbt_opts.block_cache->GetPinnedUsage()));
468 f->dump_string("block_cache_pinned_blocks_usage", str);
469 str.clear();
470 db->GetProperty("rocksdb.cur-size-all-mem-tables", &str);
471 f->dump_string("rocksdb_memtable_usage", str);
472 f->close_section();
473 }
474 }
475
476 int RocksDBStore::submit_transaction(KeyValueDB::Transaction t)
477 {
478 utime_t start = ceph_clock_now();
479 // enable rocksdb breakdown
480 // considering performance overhead, default is disabled
481 if (g_conf->rocksdb_perf) {
482 rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
483 rocksdb::perf_context.Reset();
484 }
485
486 RocksDBTransactionImpl * _t =
487 static_cast<RocksDBTransactionImpl *>(t.get());
488 rocksdb::WriteOptions woptions;
489 woptions.disableWAL = disableWAL;
490 lgeneric_subdout(cct, rocksdb, 30) << __func__;
491 RocksWBHandler bat_txc;
492 _t->bat.Iterate(&bat_txc);
493 *_dout << " Rocksdb transaction: " << bat_txc.seen << dendl;
494
495 rocksdb::Status s = db->Write(woptions, &_t->bat);
496 if (!s.ok()) {
497 RocksWBHandler rocks_txc;
498 _t->bat.Iterate(&rocks_txc);
499 derr << __func__ << " error: " << s.ToString() << " code = " << s.code()
500 << " Rocksdb transaction: " << rocks_txc.seen << dendl;
501 }
502 utime_t lat = ceph_clock_now() - start;
503
504 if (g_conf->rocksdb_perf) {
505 utime_t write_memtable_time;
506 utime_t write_delay_time;
507 utime_t write_wal_time;
508 utime_t write_pre_and_post_process_time;
509 write_wal_time.set_from_double(
510 static_cast<double>(rocksdb::perf_context.write_wal_time)/1000000000);
511 write_memtable_time.set_from_double(
512 static_cast<double>(rocksdb::perf_context.write_memtable_time)/1000000000);
513 write_delay_time.set_from_double(
514 static_cast<double>(rocksdb::perf_context.write_delay_time)/1000000000);
515 write_pre_and_post_process_time.set_from_double(
516 static_cast<double>(rocksdb::perf_context.write_pre_and_post_process_time)/1000000000);
517 logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time);
518 logger->tinc(l_rocksdb_write_delay_time, write_delay_time);
519 logger->tinc(l_rocksdb_write_wal_time, write_wal_time);
520 logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time);
521 }
522
523 logger->inc(l_rocksdb_txns);
524 logger->tinc(l_rocksdb_submit_latency, lat);
525
526 return s.ok() ? 0 : -1;
527 }
528
529 int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t)
530 {
531 utime_t start = ceph_clock_now();
532 // enable rocksdb breakdown
533 // considering performance overhead, default is disabled
534 if (g_conf->rocksdb_perf) {
535 rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
536 rocksdb::perf_context.Reset();
537 }
538
539 RocksDBTransactionImpl * _t =
540 static_cast<RocksDBTransactionImpl *>(t.get());
541 rocksdb::WriteOptions woptions;
542 woptions.sync = true;
543 woptions.disableWAL = disableWAL;
544 lgeneric_subdout(cct, rocksdb, 30) << __func__;
545 RocksWBHandler bat_txc;
546 _t->bat.Iterate(&bat_txc);
547 *_dout << " Rocksdb transaction: " << bat_txc.seen << dendl;
548
549 rocksdb::Status s = db->Write(woptions, &_t->bat);
550 if (!s.ok()) {
551 RocksWBHandler rocks_txc;
552 _t->bat.Iterate(&rocks_txc);
553 derr << __func__ << " error: " << s.ToString() << " code = " << s.code()
554 << " Rocksdb transaction: " << rocks_txc.seen << dendl;
555 }
556 utime_t lat = ceph_clock_now() - start;
557
558 if (g_conf->rocksdb_perf) {
559 utime_t write_memtable_time;
560 utime_t write_delay_time;
561 utime_t write_wal_time;
562 utime_t write_pre_and_post_process_time;
563 write_wal_time.set_from_double(
564 static_cast<double>(rocksdb::perf_context.write_wal_time)/1000000000);
565 write_memtable_time.set_from_double(
566 static_cast<double>(rocksdb::perf_context.write_memtable_time)/1000000000);
567 write_delay_time.set_from_double(
568 static_cast<double>(rocksdb::perf_context.write_delay_time)/1000000000);
569 write_pre_and_post_process_time.set_from_double(
570 static_cast<double>(rocksdb::perf_context.write_pre_and_post_process_time)/1000000000);
571 logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time);
572 logger->tinc(l_rocksdb_write_delay_time, write_delay_time);
573 logger->tinc(l_rocksdb_write_wal_time, write_wal_time);
574 logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time);
575 }
576
577 logger->inc(l_rocksdb_txns_sync);
578 logger->tinc(l_rocksdb_submit_sync_latency, lat);
579
580 return s.ok() ? 0 : -1;
581 }
582
583 RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db)
584 {
585 db = _db;
586 }
587
588 void RocksDBStore::RocksDBTransactionImpl::set(
589 const string &prefix,
590 const string &k,
591 const bufferlist &to_set_bl)
592 {
593 string key = combine_strings(prefix, k);
594
595 // bufferlist::c_str() is non-constant, so we can't call c_str()
596 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
597 bat.Put(rocksdb::Slice(key),
598 rocksdb::Slice(to_set_bl.buffers().front().c_str(),
599 to_set_bl.length()));
600 } else {
601 rocksdb::Slice key_slice(key);
602 rocksdb::Slice value_slices[to_set_bl.buffers().size()];
603 bat.Put(nullptr, rocksdb::SliceParts(&key_slice, 1),
604 prepare_sliceparts(to_set_bl, value_slices));
605 }
606 }
607
608 void RocksDBStore::RocksDBTransactionImpl::set(
609 const string &prefix,
610 const char *k, size_t keylen,
611 const bufferlist &to_set_bl)
612 {
613 string key;
614 combine_strings(prefix, k, keylen, &key);
615
616 // bufferlist::c_str() is non-constant, so we can't call c_str()
617 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
618 bat.Put(rocksdb::Slice(key),
619 rocksdb::Slice(to_set_bl.buffers().front().c_str(),
620 to_set_bl.length()));
621 } else {
622 rocksdb::Slice key_slice(key);
623 rocksdb::Slice value_slices[to_set_bl.buffers().size()];
624 bat.Put(nullptr, rocksdb::SliceParts(&key_slice, 1),
625 prepare_sliceparts(to_set_bl, value_slices));
626 }
627 }
628
629 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
630 const string &k)
631 {
632 bat.Delete(combine_strings(prefix, k));
633 }
634
635 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
636 const char *k,
637 size_t keylen)
638 {
639 string key;
640 combine_strings(prefix, k, keylen, &key);
641 bat.Delete(key);
642 }
643
644 void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix,
645 const string &k)
646 {
647 bat.SingleDelete(combine_strings(prefix, k));
648 }
649
650 void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
651 {
652 if (db->enable_rmrange) {
653 string endprefix = prefix;
654 endprefix.push_back('\x01');
655 bat.DeleteRange(combine_strings(prefix, string()),
656 combine_strings(endprefix, string()));
657 } else {
658 KeyValueDB::Iterator it = db->get_iterator(prefix);
659 for (it->seek_to_first();
660 it->valid();
661 it->next()) {
662 bat.Delete(combine_strings(prefix, it->key()));
663 }
664 }
665 }
666
667 void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix,
668 const string &start,
669 const string &end)
670 {
671 if (db->enable_rmrange) {
672 bat.DeleteRange(combine_strings(prefix, start), combine_strings(prefix, end));
673 } else {
674 auto it = db->get_iterator(prefix);
675 it->lower_bound(start);
676 while (it->valid()) {
677 if (it->key() >= end) {
678 break;
679 }
680 bat.Delete(combine_strings(prefix, it->key()));
681 it->next();
682 }
683 }
684 }
685
686 void RocksDBStore::RocksDBTransactionImpl::merge(
687 const string &prefix,
688 const string &k,
689 const bufferlist &to_set_bl)
690 {
691 string key = combine_strings(prefix, k);
692
693 // bufferlist::c_str() is non-constant, so we can't call c_str()
694 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
695 bat.Merge(rocksdb::Slice(key),
696 rocksdb::Slice(to_set_bl.buffers().front().c_str(),
697 to_set_bl.length()));
698 } else {
699 // make a copy
700 rocksdb::Slice key_slice(key);
701 rocksdb::Slice value_slices[to_set_bl.buffers().size()];
702 bat.Merge(nullptr, rocksdb::SliceParts(&key_slice, 1),
703 prepare_sliceparts(to_set_bl, value_slices));
704 }
705 }
706
707 //gets will bypass RocksDB row cache, since it uses iterator
708 int RocksDBStore::get(
709 const string &prefix,
710 const std::set<string> &keys,
711 std::map<string, bufferlist> *out)
712 {
713 utime_t start = ceph_clock_now();
714 for (std::set<string>::const_iterator i = keys.begin();
715 i != keys.end(); ++i) {
716 std::string value;
717 std::string bound = combine_strings(prefix, *i);
718 auto status = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(bound), &value);
719 if (status.ok())
720 (*out)[*i].append(value);
721 }
722 utime_t lat = ceph_clock_now() - start;
723 logger->inc(l_rocksdb_gets);
724 logger->tinc(l_rocksdb_get_latency, lat);
725 return 0;
726 }
727
728 int RocksDBStore::get(
729 const string &prefix,
730 const string &key,
731 bufferlist *out)
732 {
733 assert(out && (out->length() == 0));
734 utime_t start = ceph_clock_now();
735 int r = 0;
736 string value, k;
737 rocksdb::Status s;
738 k = combine_strings(prefix, key);
739 s = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(k), &value);
740 if (s.ok()) {
741 out->append(value);
742 } else {
743 r = -ENOENT;
744 }
745 utime_t lat = ceph_clock_now() - start;
746 logger->inc(l_rocksdb_gets);
747 logger->tinc(l_rocksdb_get_latency, lat);
748 return r;
749 }
750
751 int RocksDBStore::get(
752 const string& prefix,
753 const char *key,
754 size_t keylen,
755 bufferlist *out)
756 {
757 assert(out && (out->length() == 0));
758 utime_t start = ceph_clock_now();
759 int r = 0;
760 string value, k;
761 combine_strings(prefix, key, keylen, &k);
762 rocksdb::Status s;
763 s = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(k), &value);
764 if (s.ok()) {
765 out->append(value);
766 } else {
767 r = -ENOENT;
768 }
769 utime_t lat = ceph_clock_now() - start;
770 logger->inc(l_rocksdb_gets);
771 logger->tinc(l_rocksdb_get_latency, lat);
772 return r;
773 }
774
775 int RocksDBStore::split_key(rocksdb::Slice in, string *prefix, string *key)
776 {
777 size_t prefix_len = 0;
778
779 // Find separator inside Slice
780 char* separator = (char*) memchr(in.data(), 0, in.size());
781 if (separator == NULL)
782 return -EINVAL;
783 prefix_len = size_t(separator - in.data());
784 if (prefix_len >= in.size())
785 return -EINVAL;
786
787 // Fetch prefix and/or key directly from Slice
788 if (prefix)
789 *prefix = string(in.data(), prefix_len);
790 if (key)
791 *key = string(separator+1, in.size()-prefix_len-1);
792 return 0;
793 }
794
795 void RocksDBStore::compact()
796 {
797 logger->inc(l_rocksdb_compact);
798 rocksdb::CompactRangeOptions options;
799 db->CompactRange(options, nullptr, nullptr);
800 }
801
802
803 void RocksDBStore::compact_thread_entry()
804 {
805 compact_queue_lock.Lock();
806 while (!compact_queue_stop) {
807 while (!compact_queue.empty()) {
808 pair<string,string> range = compact_queue.front();
809 compact_queue.pop_front();
810 logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
811 compact_queue_lock.Unlock();
812 logger->inc(l_rocksdb_compact_range);
813 compact_range(range.first, range.second);
814 compact_queue_lock.Lock();
815 continue;
816 }
817 compact_queue_cond.Wait(compact_queue_lock);
818 }
819 compact_queue_lock.Unlock();
820 }
821
822 void RocksDBStore::compact_range_async(const string& start, const string& end)
823 {
824 Mutex::Locker l(compact_queue_lock);
825
826 // try to merge adjacent ranges. this is O(n), but the queue should
827 // be short. note that we do not cover all overlap cases and merge
828 // opportunities here, but we capture the ones we currently need.
829 list< pair<string,string> >::iterator p = compact_queue.begin();
830 while (p != compact_queue.end()) {
831 if (p->first == start && p->second == end) {
832 // dup; no-op
833 return;
834 }
835 if (p->first <= end && p->first > start) {
836 // merge with existing range to the right
837 compact_queue.push_back(make_pair(start, p->second));
838 compact_queue.erase(p);
839 logger->inc(l_rocksdb_compact_queue_merge);
840 break;
841 }
842 if (p->second >= start && p->second < end) {
843 // merge with existing range to the left
844 compact_queue.push_back(make_pair(p->first, end));
845 compact_queue.erase(p);
846 logger->inc(l_rocksdb_compact_queue_merge);
847 break;
848 }
849 ++p;
850 }
851 if (p == compact_queue.end()) {
852 // no merge, new entry.
853 compact_queue.push_back(make_pair(start, end));
854 logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
855 }
856 compact_queue_cond.Signal();
857 if (!compact_thread.is_started()) {
858 compact_thread.create("rstore_compact");
859 }
860 }
861 bool RocksDBStore::check_omap_dir(string &omap_dir)
862 {
863 rocksdb::Options options;
864 options.create_if_missing = true;
865 rocksdb::DB *db;
866 rocksdb::Status status = rocksdb::DB::Open(options, omap_dir, &db);
867 delete db;
868 db = nullptr;
869 return status.ok();
870 }
871 void RocksDBStore::compact_range(const string& start, const string& end)
872 {
873 rocksdb::CompactRangeOptions options;
874 rocksdb::Slice cstart(start);
875 rocksdb::Slice cend(end);
876 db->CompactRange(options, &cstart, &cend);
877 }
878 RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl()
879 {
880 delete dbiter;
881 }
882 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first()
883 {
884 dbiter->SeekToFirst();
885 return dbiter->status().ok() ? 0 : -1;
886 }
887 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string &prefix)
888 {
889 rocksdb::Slice slice_prefix(prefix);
890 dbiter->Seek(slice_prefix);
891 return dbiter->status().ok() ? 0 : -1;
892 }
893 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last()
894 {
895 dbiter->SeekToLast();
896 return dbiter->status().ok() ? 0 : -1;
897 }
898 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string &prefix)
899 {
900 string limit = past_prefix(prefix);
901 rocksdb::Slice slice_limit(limit);
902 dbiter->Seek(slice_limit);
903
904 if (!dbiter->Valid()) {
905 dbiter->SeekToLast();
906 } else {
907 dbiter->Prev();
908 }
909 return dbiter->status().ok() ? 0 : -1;
910 }
911 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after)
912 {
913 lower_bound(prefix, after);
914 if (valid()) {
915 pair<string,string> key = raw_key();
916 if (key.first == prefix && key.second == after)
917 next();
918 }
919 return dbiter->status().ok() ? 0 : -1;
920 }
921 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to)
922 {
923 string bound = combine_strings(prefix, to);
924 rocksdb::Slice slice_bound(bound);
925 dbiter->Seek(slice_bound);
926 return dbiter->status().ok() ? 0 : -1;
927 }
928 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid()
929 {
930 return dbiter->Valid();
931 }
932 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next()
933 {
934 if (valid()) {
935 dbiter->Next();
936 }
937 return dbiter->status().ok() ? 0 : -1;
938 }
939 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev()
940 {
941 if (valid()) {
942 dbiter->Prev();
943 }
944 return dbiter->status().ok() ? 0 : -1;
945 }
946 string RocksDBStore::RocksDBWholeSpaceIteratorImpl::key()
947 {
948 string out_key;
949 split_key(dbiter->key(), 0, &out_key);
950 return out_key;
951 }
952 pair<string,string> RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key()
953 {
954 string prefix, key;
955 split_key(dbiter->key(), &prefix, &key);
956 return make_pair(prefix, key);
957 }
958
959 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key_is_prefixed(const string &prefix) {
960 // Look for "prefix\0" right in rocksb::Slice
961 rocksdb::Slice key = dbiter->key();
962 if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) {
963 return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0;
964 } else {
965 return false;
966 }
967 }
968
969 bufferlist RocksDBStore::RocksDBWholeSpaceIteratorImpl::value()
970 {
971 return to_bufferlist(dbiter->value());
972 }
973
974 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::key_size()
975 {
976 return dbiter->key().size();
977 }
978
979 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_size()
980 {
981 return dbiter->value().size();
982 }
983
984 bufferptr RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_as_ptr()
985 {
986 rocksdb::Slice val = dbiter->value();
987 return bufferptr(val.data(), val.size());
988 }
989
990 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status()
991 {
992 return dbiter->status().ok() ? 0 : -1;
993 }
994
995 string RocksDBStore::past_prefix(const string &prefix)
996 {
997 string limit = prefix;
998 limit.push_back(1);
999 return limit;
1000 }
1001
1002 RocksDBStore::WholeSpaceIterator RocksDBStore::_get_iterator()
1003 {
1004 return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
1005 db->NewIterator(rocksdb::ReadOptions()));
1006 }
1007