]> git.proxmox.com Git - ceph.git/blob - ceph/src/kv/RocksDBStore.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / kv / RocksDBStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <set>
5 #include <map>
6 #include <string>
7 #include <memory>
8 #include <errno.h>
9 #include <unistd.h>
10 #include <sys/types.h>
11 #include <sys/stat.h>
12
13 #include "rocksdb/db.h"
14 #include "rocksdb/table.h"
15 #include "rocksdb/env.h"
16 #include "rocksdb/slice.h"
17 #include "rocksdb/cache.h"
18 #include "rocksdb/filter_policy.h"
19 #include "rocksdb/utilities/convenience.h"
20 #include "rocksdb/merge_operator.h"
21 using std::string;
22 #include "common/perf_counters.h"
23 #include "common/debug.h"
24 #include "include/str_list.h"
25 #include "include/stringify.h"
26 #include "include/str_map.h"
27 #include "KeyValueDB.h"
28 #include "RocksDBStore.h"
29
30 #include "common/debug.h"
31
32 #define dout_context cct
33 #define dout_subsys ceph_subsys_rocksdb
34 #undef dout_prefix
35 #define dout_prefix *_dout << "rocksdb: "
36
37 //
38 // One of these per rocksdb instance, implements the merge operator prefix stuff
39 //
40 class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperator {
41 RocksDBStore& store;
42 public:
43 const char *Name() const override {
44 // Construct a name that rocksDB will validate against. We want to
45 // do this in a way that doesn't constrain the ordering of calls
46 // to set_merge_operator, so sort the merge operators and then
47 // construct a name from all of those parts.
48 store.assoc_name.clear();
49 map<std::string,std::string> names;
50 for (auto& p : store.merge_ops) names[p.first] = p.second->name();
51 for (auto& p : names) {
52 store.assoc_name += '.';
53 store.assoc_name += p.first;
54 store.assoc_name += ':';
55 store.assoc_name += p.second;
56 }
57 return store.assoc_name.c_str();
58 }
59
60 MergeOperatorRouter(RocksDBStore &_store) : store(_store) {}
61
62 bool Merge(const rocksdb::Slice& key,
63 const rocksdb::Slice* existing_value,
64 const rocksdb::Slice& value,
65 std::string* new_value,
66 rocksdb::Logger* logger) const override {
67 // Check each prefix
68 for (auto& p : store.merge_ops) {
69 if (p.first.compare(0, p.first.length(),
70 key.data(), p.first.length()) == 0 &&
71 key.data()[p.first.length()] == 0) {
72 if (existing_value) {
73 p.second->merge(existing_value->data(), existing_value->size(),
74 value.data(), value.size(),
75 new_value);
76 } else {
77 p.second->merge_nonexistent(value.data(), value.size(), new_value);
78 }
79 break;
80 }
81 }
82 return true; // OK :)
83 }
84
85 };
86
87 int RocksDBStore::set_merge_operator(
88 const string& prefix,
89 std::shared_ptr<KeyValueDB::MergeOperator> mop)
90 {
91 // If you fail here, it's because you can't do this on an open database
92 assert(db == nullptr);
93 merge_ops.push_back(std::make_pair(prefix,mop));
94 return 0;
95 }
96
97 class CephRocksdbLogger : public rocksdb::Logger {
98 CephContext *cct;
99 public:
100 explicit CephRocksdbLogger(CephContext *c) : cct(c) {
101 cct->get();
102 }
103 ~CephRocksdbLogger() override {
104 cct->put();
105 }
106
107 // Write an entry to the log file with the specified format.
108 void Logv(const char* format, va_list ap) override {
109 Logv(rocksdb::INFO_LEVEL, format, ap);
110 }
111
112 // Write an entry to the log file with the specified log level
113 // and format. Any log with level under the internal log level
114 // of *this (see @SetInfoLogLevel and @GetInfoLogLevel) will not be
115 // printed.
116 void Logv(const rocksdb::InfoLogLevel log_level, const char* format,
117 va_list ap) override {
118 int v = rocksdb::NUM_INFO_LOG_LEVELS - log_level - 1;
119 dout(v);
120 char buf[65536];
121 vsnprintf(buf, sizeof(buf), format, ap);
122 *_dout << buf << dendl;
123 }
124 };
125
126 rocksdb::Logger *create_rocksdb_ceph_logger()
127 {
128 return new CephRocksdbLogger(g_ceph_context);
129 }
130
131 int string2bool(string val, bool &b_val)
132 {
133 if (strcasecmp(val.c_str(), "false") == 0) {
134 b_val = false;
135 return 0;
136 } else if (strcasecmp(val.c_str(), "true") == 0) {
137 b_val = true;
138 return 0;
139 } else {
140 std::string err;
141 int b = strict_strtol(val.c_str(), 10, &err);
142 if (!err.empty())
143 return -EINVAL;
144 b_val = !!b;
145 return 0;
146 }
147 }
148
149 int RocksDBStore::tryInterpret(const string key, const string val, rocksdb::Options &opt)
150 {
151 if (key == "compaction_threads") {
152 std::string err;
153 int f = strict_sistrtoll(val.c_str(), &err);
154 if (!err.empty())
155 return -EINVAL;
156 //Low priority threadpool is used for compaction
157 opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::LOW);
158 } else if (key == "flusher_threads") {
159 std::string err;
160 int f = strict_sistrtoll(val.c_str(), &err);
161 if (!err.empty())
162 return -EINVAL;
163 //High priority threadpool is used for flusher
164 opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::HIGH);
165 } else if (key == "compact_on_mount") {
166 int ret = string2bool(val, compact_on_mount);
167 if (ret != 0)
168 return ret;
169 } else if (key == "disableWAL") {
170 int ret = string2bool(val, disableWAL);
171 if (ret != 0)
172 return ret;
173 } else {
174 //unrecognize config options.
175 return -EINVAL;
176 }
177 return 0;
178 }
179
180 int RocksDBStore::ParseOptionsFromString(const string opt_str, rocksdb::Options &opt)
181 {
182 map<string, string> str_map;
183 int r = get_str_map(opt_str, &str_map, ",\n;");
184 if (r < 0)
185 return r;
186 map<string, string>::iterator it;
187 for(it = str_map.begin(); it != str_map.end(); ++it) {
188 string this_opt = it->first + "=" + it->second;
189 rocksdb::Status status = rocksdb::GetOptionsFromString(opt, this_opt , &opt);
190 if (!status.ok()) {
191 //unrecognized by rocksdb, try to interpret by ourselves.
192 r = tryInterpret(it->first, it->second, opt);
193 if (r < 0) {
194 derr << status.ToString() << dendl;
195 return -EINVAL;
196 }
197 }
198 lgeneric_dout(cct, 0) << " set rocksdb option " << it->first
199 << " = " << it->second << dendl;
200 }
201 return 0;
202 }
203
204 int RocksDBStore::init(string _options_str)
205 {
206 options_str = _options_str;
207 rocksdb::Options opt;
208 //try parse options
209 if (options_str.length()) {
210 int r = ParseOptionsFromString(options_str, opt);
211 if (r != 0) {
212 return -EINVAL;
213 }
214 }
215 return 0;
216 }
217
218 int RocksDBStore::create_and_open(ostream &out)
219 {
220 if (env) {
221 unique_ptr<rocksdb::Directory> dir;
222 env->NewDirectory(path, &dir);
223 } else {
224 int r = ::mkdir(path.c_str(), 0755);
225 if (r < 0)
226 r = -errno;
227 if (r < 0 && r != -EEXIST) {
228 derr << __func__ << " failed to create " << path << ": " << cpp_strerror(r)
229 << dendl;
230 return r;
231 }
232 }
233 return do_open(out, true);
234 }
235
236 int RocksDBStore::do_open(ostream &out, bool create_if_missing)
237 {
238 rocksdb::Options opt;
239 rocksdb::Status status;
240
241 if (options_str.length()) {
242 int r = ParseOptionsFromString(options_str, opt);
243 if (r != 0) {
244 return -EINVAL;
245 }
246 }
247
248 if (g_conf->rocksdb_perf) {
249 dbstats = rocksdb::CreateDBStatistics();
250 opt.statistics = dbstats;
251 }
252
253 opt.create_if_missing = create_if_missing;
254 if (g_conf->rocksdb_separate_wal_dir) {
255 opt.wal_dir = path + ".wal";
256 }
257 if (g_conf->get_val<std::string>("rocksdb_db_paths").length()) {
258 list<string> paths;
259 get_str_list(g_conf->get_val<std::string>("rocksdb_db_paths"), "; \t", paths);
260 for (auto& p : paths) {
261 size_t pos = p.find(',');
262 if (pos == std::string::npos) {
263 derr << __func__ << " invalid db path item " << p << " in "
264 << g_conf->get_val<std::string>("rocksdb_db_paths") << dendl;
265 return -EINVAL;
266 }
267 string path = p.substr(0, pos);
268 string size_str = p.substr(pos + 1);
269 uint64_t size = atoll(size_str.c_str());
270 if (!size) {
271 derr << __func__ << " invalid db path item " << p << " in "
272 << g_conf->get_val<std::string>("rocksdb_db_paths") << dendl;
273 return -EINVAL;
274 }
275 opt.db_paths.push_back(rocksdb::DbPath(path, size));
276 dout(10) << __func__ << " db_path " << path << " size " << size << dendl;
277 }
278 }
279
280 if (g_conf->rocksdb_log_to_ceph_log) {
281 opt.info_log.reset(new CephRocksdbLogger(g_ceph_context));
282 }
283
284 if (priv) {
285 dout(10) << __func__ << " using custom Env " << priv << dendl;
286 opt.env = static_cast<rocksdb::Env*>(priv);
287 }
288
289 auto cache = rocksdb::NewLRUCache(g_conf->rocksdb_cache_size, g_conf->rocksdb_cache_shard_bits);
290 bbt_opts.block_size = g_conf->rocksdb_block_size;
291 bbt_opts.block_cache = cache;
292 if (g_conf->kstore_rocksdb_bloom_bits_per_key > 0) {
293 dout(10) << __func__ << " set bloom filter bits per key to "
294 << g_conf->kstore_rocksdb_bloom_bits_per_key << dendl;
295 bbt_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(g_conf->kstore_rocksdb_bloom_bits_per_key));
296 }
297 opt.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbt_opts));
298 dout(10) << __func__ << " set block size to " << g_conf->rocksdb_block_size
299 << " cache size to " << g_conf->rocksdb_cache_size
300 << " num of cache shards to " << (1 << g_conf->rocksdb_cache_shard_bits) << dendl;
301
302 opt.merge_operator.reset(new MergeOperatorRouter(*this));
303 status = rocksdb::DB::Open(opt, path, &db);
304 if (!status.ok()) {
305 derr << status.ToString() << dendl;
306 return -EINVAL;
307 }
308
309 PerfCountersBuilder plb(g_ceph_context, "rocksdb", l_rocksdb_first, l_rocksdb_last);
310 plb.add_u64_counter(l_rocksdb_gets, "get", "Gets");
311 plb.add_u64_counter(l_rocksdb_txns, "submit_transaction", "Submit transactions");
312 plb.add_u64_counter(l_rocksdb_txns_sync, "submit_transaction_sync", "Submit transactions sync");
313 plb.add_time_avg(l_rocksdb_get_latency, "get_latency", "Get latency");
314 plb.add_time_avg(l_rocksdb_submit_latency, "submit_latency", "Submit Latency");
315 plb.add_time_avg(l_rocksdb_submit_sync_latency, "submit_sync_latency", "Submit Sync Latency");
316 plb.add_u64_counter(l_rocksdb_compact, "compact", "Compactions");
317 plb.add_u64_counter(l_rocksdb_compact_range, "compact_range", "Compactions by range");
318 plb.add_u64_counter(l_rocksdb_compact_queue_merge, "compact_queue_merge", "Mergings of ranges in compaction queue");
319 plb.add_u64(l_rocksdb_compact_queue_len, "compact_queue_len", "Length of compaction queue");
320 plb.add_time_avg(l_rocksdb_write_wal_time, "rocksdb_write_wal_time", "Rocksdb write wal time");
321 plb.add_time_avg(l_rocksdb_write_memtable_time, "rocksdb_write_memtable_time", "Rocksdb write memtable time");
322 plb.add_time_avg(l_rocksdb_write_delay_time, "rocksdb_write_delay_time", "Rocksdb write delay time");
323 plb.add_time_avg(l_rocksdb_write_pre_and_post_process_time,
324 "rocksdb_write_pre_and_post_time", "total time spent on writing a record, excluding write process");
325 logger = plb.create_perf_counters();
326 cct->get_perfcounters_collection()->add(logger);
327
328 if (compact_on_mount) {
329 derr << "Compacting rocksdb store..." << dendl;
330 compact();
331 derr << "Finished compacting rocksdb store" << dendl;
332 }
333 return 0;
334 }
335
336 int RocksDBStore::_test_init(const string& dir)
337 {
338 rocksdb::Options options;
339 options.create_if_missing = true;
340 rocksdb::DB *db;
341 rocksdb::Status status = rocksdb::DB::Open(options, dir, &db);
342 delete db;
343 db = nullptr;
344 return status.ok() ? 0 : -EIO;
345 }
346
347 RocksDBStore::~RocksDBStore()
348 {
349 close();
350 delete logger;
351
352 // Ensure db is destroyed before dependent db_cache and filterpolicy
353 delete db;
354 db = nullptr;
355
356 if (priv) {
357 delete static_cast<rocksdb::Env*>(priv);
358 }
359 }
360
361 void RocksDBStore::close()
362 {
363 // stop compaction thread
364 compact_queue_lock.Lock();
365 if (compact_thread.is_started()) {
366 compact_queue_stop = true;
367 compact_queue_cond.Signal();
368 compact_queue_lock.Unlock();
369 compact_thread.join();
370 } else {
371 compact_queue_lock.Unlock();
372 }
373
374 if (logger)
375 cct->get_perfcounters_collection()->remove(logger);
376 }
377
378 void RocksDBStore::split_stats(const std::string &s, char delim, std::vector<std::string> &elems) {
379 std::stringstream ss;
380 ss.str(s);
381 std::string item;
382 while (std::getline(ss, item, delim)) {
383 elems.push_back(item);
384 }
385 }
386
387 void RocksDBStore::get_statistics(Formatter *f)
388 {
389 if (!g_conf->rocksdb_perf) {
390 dout(20) << __func__ << "RocksDB perf is disabled, can't probe for stats"
391 << dendl;
392 return;
393 }
394
395 if (g_conf->rocksdb_collect_compaction_stats) {
396 std::string stat_str;
397 bool status = db->GetProperty("rocksdb.stats", &stat_str);
398 if (status) {
399 f->open_object_section("rocksdb_statistics");
400 f->dump_string("rocksdb_compaction_statistics", "");
401 vector<string> stats;
402 split_stats(stat_str, '\n', stats);
403 for (auto st :stats) {
404 f->dump_string("", st);
405 }
406 f->close_section();
407 }
408 }
409 if (g_conf->rocksdb_collect_extended_stats) {
410 if (dbstats) {
411 f->open_object_section("rocksdb_extended_statistics");
412 string stat_str = dbstats->ToString();
413 vector<string> stats;
414 split_stats(stat_str, '\n', stats);
415 f->dump_string("rocksdb_extended_statistics", "");
416 for (auto st :stats) {
417 f->dump_string(".", st);
418 }
419 f->close_section();
420 }
421 f->open_object_section("rocksdbstore_perf_counters");
422 logger->dump_formatted(f,0);
423 f->close_section();
424 }
425 if (g_conf->rocksdb_collect_memory_stats) {
426 f->open_object_section("rocksdb_memtable_statistics");
427 std::string str(stringify(bbt_opts.block_cache->GetUsage()));
428 f->dump_string("block_cache_usage", str.data());
429 str.clear();
430 str.append(stringify(bbt_opts.block_cache->GetPinnedUsage()));
431 f->dump_string("block_cache_pinned_blocks_usage", str);
432 str.clear();
433 db->GetProperty("rocksdb.cur-size-all-mem-tables", &str);
434 f->dump_string("rocksdb_memtable_usage", str);
435 f->close_section();
436 }
437 }
438
439 int RocksDBStore::submit_transaction(KeyValueDB::Transaction t)
440 {
441 utime_t start = ceph_clock_now();
442 // enable rocksdb breakdown
443 // considering performance overhead, default is disabled
444 if (g_conf->rocksdb_perf) {
445 rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
446 rocksdb::perf_context.Reset();
447 }
448
449 RocksDBTransactionImpl * _t =
450 static_cast<RocksDBTransactionImpl *>(t.get());
451 rocksdb::WriteOptions woptions;
452 woptions.disableWAL = disableWAL;
453 lgeneric_subdout(cct, rocksdb, 30) << __func__;
454 RocksWBHandler bat_txc;
455 _t->bat.Iterate(&bat_txc);
456 *_dout << " Rocksdb transaction: " << bat_txc.seen << dendl;
457
458 rocksdb::Status s = db->Write(woptions, &_t->bat);
459 if (!s.ok()) {
460 RocksWBHandler rocks_txc;
461 _t->bat.Iterate(&rocks_txc);
462 derr << __func__ << " error: " << s.ToString() << " code = " << s.code()
463 << " Rocksdb transaction: " << rocks_txc.seen << dendl;
464 }
465 utime_t lat = ceph_clock_now() - start;
466
467 if (g_conf->rocksdb_perf) {
468 utime_t write_memtable_time;
469 utime_t write_delay_time;
470 utime_t write_wal_time;
471 utime_t write_pre_and_post_process_time;
472 write_wal_time.set_from_double(
473 static_cast<double>(rocksdb::perf_context.write_wal_time)/1000000000);
474 write_memtable_time.set_from_double(
475 static_cast<double>(rocksdb::perf_context.write_memtable_time)/1000000000);
476 write_delay_time.set_from_double(
477 static_cast<double>(rocksdb::perf_context.write_delay_time)/1000000000);
478 write_pre_and_post_process_time.set_from_double(
479 static_cast<double>(rocksdb::perf_context.write_pre_and_post_process_time)/1000000000);
480 logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time);
481 logger->tinc(l_rocksdb_write_delay_time, write_delay_time);
482 logger->tinc(l_rocksdb_write_wal_time, write_wal_time);
483 logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time);
484 }
485
486 logger->inc(l_rocksdb_txns);
487 logger->tinc(l_rocksdb_submit_latency, lat);
488
489 return s.ok() ? 0 : -1;
490 }
491
492 int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t)
493 {
494 utime_t start = ceph_clock_now();
495 // enable rocksdb breakdown
496 // considering performance overhead, default is disabled
497 if (g_conf->rocksdb_perf) {
498 rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
499 rocksdb::perf_context.Reset();
500 }
501
502 RocksDBTransactionImpl * _t =
503 static_cast<RocksDBTransactionImpl *>(t.get());
504 rocksdb::WriteOptions woptions;
505 woptions.sync = true;
506 woptions.disableWAL = disableWAL;
507 lgeneric_subdout(cct, rocksdb, 30) << __func__;
508 RocksWBHandler bat_txc;
509 _t->bat.Iterate(&bat_txc);
510 *_dout << " Rocksdb transaction: " << bat_txc.seen << dendl;
511
512 rocksdb::Status s = db->Write(woptions, &_t->bat);
513 if (!s.ok()) {
514 RocksWBHandler rocks_txc;
515 _t->bat.Iterate(&rocks_txc);
516 derr << __func__ << " error: " << s.ToString() << " code = " << s.code()
517 << " Rocksdb transaction: " << rocks_txc.seen << dendl;
518 }
519 utime_t lat = ceph_clock_now() - start;
520
521 if (g_conf->rocksdb_perf) {
522 utime_t write_memtable_time;
523 utime_t write_delay_time;
524 utime_t write_wal_time;
525 utime_t write_pre_and_post_process_time;
526 write_wal_time.set_from_double(
527 static_cast<double>(rocksdb::perf_context.write_wal_time)/1000000000);
528 write_memtable_time.set_from_double(
529 static_cast<double>(rocksdb::perf_context.write_memtable_time)/1000000000);
530 write_delay_time.set_from_double(
531 static_cast<double>(rocksdb::perf_context.write_delay_time)/1000000000);
532 write_pre_and_post_process_time.set_from_double(
533 static_cast<double>(rocksdb::perf_context.write_pre_and_post_process_time)/1000000000);
534 logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time);
535 logger->tinc(l_rocksdb_write_delay_time, write_delay_time);
536 logger->tinc(l_rocksdb_write_wal_time, write_wal_time);
537 logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time);
538 }
539
540 logger->inc(l_rocksdb_txns_sync);
541 logger->tinc(l_rocksdb_submit_sync_latency, lat);
542
543 return s.ok() ? 0 : -1;
544 }
545
546 RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db)
547 {
548 db = _db;
549 }
550
551 void RocksDBStore::RocksDBTransactionImpl::set(
552 const string &prefix,
553 const string &k,
554 const bufferlist &to_set_bl)
555 {
556 string key = combine_strings(prefix, k);
557
558 // bufferlist::c_str() is non-constant, so we can't call c_str()
559 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
560 bat.Put(rocksdb::Slice(key),
561 rocksdb::Slice(to_set_bl.buffers().front().c_str(),
562 to_set_bl.length()));
563 } else {
564 // make a copy
565 bufferlist val = to_set_bl;
566 bat.Put(rocksdb::Slice(key),
567 rocksdb::Slice(val.c_str(), val.length()));
568 }
569 }
570
571 void RocksDBStore::RocksDBTransactionImpl::set(
572 const string &prefix,
573 const char *k, size_t keylen,
574 const bufferlist &to_set_bl)
575 {
576 string key;
577 combine_strings(prefix, k, keylen, &key);
578
579 // bufferlist::c_str() is non-constant, so we can't call c_str()
580 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
581 bat.Put(rocksdb::Slice(key),
582 rocksdb::Slice(to_set_bl.buffers().front().c_str(),
583 to_set_bl.length()));
584 } else {
585 // make a copy
586 bufferlist val = to_set_bl;
587 bat.Put(rocksdb::Slice(key),
588 rocksdb::Slice(val.c_str(), val.length()));
589 }
590 }
591
592 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
593 const string &k)
594 {
595 bat.Delete(combine_strings(prefix, k));
596 }
597
598 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
599 const char *k,
600 size_t keylen)
601 {
602 string key;
603 combine_strings(prefix, k, keylen, &key);
604 bat.Delete(key);
605 }
606
607 void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix,
608 const string &k)
609 {
610 bat.SingleDelete(combine_strings(prefix, k));
611 }
612
613 void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
614 {
615 KeyValueDB::Iterator it = db->get_iterator(prefix);
616 for (it->seek_to_first();
617 it->valid();
618 it->next()) {
619 bat.Delete(combine_strings(prefix, it->key()));
620 }
621 }
622
623 void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix,
624 const string &start,
625 const string &end)
626 {
627 if (db->enable_rmrange) {
628 bat.DeleteRange(combine_strings(prefix, start), combine_strings(prefix, end));
629 } else {
630 auto it = db->get_iterator(prefix);
631 it->lower_bound(start);
632 while (it->valid()) {
633 if (it->key() >= end) {
634 break;
635 }
636 bat.Delete(combine_strings(prefix, it->key()));
637 it->next();
638 }
639 }
640 }
641
642 void RocksDBStore::RocksDBTransactionImpl::merge(
643 const string &prefix,
644 const string &k,
645 const bufferlist &to_set_bl)
646 {
647 string key = combine_strings(prefix, k);
648
649 // bufferlist::c_str() is non-constant, so we can't call c_str()
650 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
651 bat.Merge(rocksdb::Slice(key),
652 rocksdb::Slice(to_set_bl.buffers().front().c_str(),
653 to_set_bl.length()));
654 } else {
655 // make a copy
656 bufferlist val = to_set_bl;
657 bat.Merge(rocksdb::Slice(key),
658 rocksdb::Slice(val.c_str(), val.length()));
659 }
660 }
661
662 //gets will bypass RocksDB row cache, since it uses iterator
663 int RocksDBStore::get(
664 const string &prefix,
665 const std::set<string> &keys,
666 std::map<string, bufferlist> *out)
667 {
668 utime_t start = ceph_clock_now();
669 for (std::set<string>::const_iterator i = keys.begin();
670 i != keys.end(); ++i) {
671 std::string value;
672 std::string bound = combine_strings(prefix, *i);
673 auto status = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(bound), &value);
674 if (status.ok())
675 (*out)[*i].append(value);
676 }
677 utime_t lat = ceph_clock_now() - start;
678 logger->inc(l_rocksdb_gets);
679 logger->tinc(l_rocksdb_get_latency, lat);
680 return 0;
681 }
682
683 int RocksDBStore::get(
684 const string &prefix,
685 const string &key,
686 bufferlist *out)
687 {
688 assert(out && (out->length() == 0));
689 utime_t start = ceph_clock_now();
690 int r = 0;
691 string value, k;
692 rocksdb::Status s;
693 k = combine_strings(prefix, key);
694 s = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(k), &value);
695 if (s.ok()) {
696 out->append(value);
697 } else {
698 r = -ENOENT;
699 }
700 utime_t lat = ceph_clock_now() - start;
701 logger->inc(l_rocksdb_gets);
702 logger->tinc(l_rocksdb_get_latency, lat);
703 return r;
704 }
705
706 int RocksDBStore::get(
707 const string& prefix,
708 const char *key,
709 size_t keylen,
710 bufferlist *out)
711 {
712 assert(out && (out->length() == 0));
713 utime_t start = ceph_clock_now();
714 int r = 0;
715 string value, k;
716 combine_strings(prefix, key, keylen, &k);
717 rocksdb::Status s;
718 s = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(k), &value);
719 if (s.ok()) {
720 out->append(value);
721 } else {
722 r = -ENOENT;
723 }
724 utime_t lat = ceph_clock_now() - start;
725 logger->inc(l_rocksdb_gets);
726 logger->tinc(l_rocksdb_get_latency, lat);
727 return r;
728 }
729
730 int RocksDBStore::split_key(rocksdb::Slice in, string *prefix, string *key)
731 {
732 size_t prefix_len = 0;
733
734 // Find separator inside Slice
735 char* separator = (char*) memchr(in.data(), 0, in.size());
736 if (separator == NULL)
737 return -EINVAL;
738 prefix_len = size_t(separator - in.data());
739 if (prefix_len >= in.size())
740 return -EINVAL;
741
742 // Fetch prefix and/or key directly from Slice
743 if (prefix)
744 *prefix = string(in.data(), prefix_len);
745 if (key)
746 *key = string(separator+1, in.size()-prefix_len-1);
747 return 0;
748 }
749
750 void RocksDBStore::compact()
751 {
752 logger->inc(l_rocksdb_compact);
753 rocksdb::CompactRangeOptions options;
754 db->CompactRange(options, nullptr, nullptr);
755 }
756
757
758 void RocksDBStore::compact_thread_entry()
759 {
760 compact_queue_lock.Lock();
761 while (!compact_queue_stop) {
762 while (!compact_queue.empty()) {
763 pair<string,string> range = compact_queue.front();
764 compact_queue.pop_front();
765 logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
766 compact_queue_lock.Unlock();
767 logger->inc(l_rocksdb_compact_range);
768 compact_range(range.first, range.second);
769 compact_queue_lock.Lock();
770 continue;
771 }
772 compact_queue_cond.Wait(compact_queue_lock);
773 }
774 compact_queue_lock.Unlock();
775 }
776
777 void RocksDBStore::compact_range_async(const string& start, const string& end)
778 {
779 Mutex::Locker l(compact_queue_lock);
780
781 // try to merge adjacent ranges. this is O(n), but the queue should
782 // be short. note that we do not cover all overlap cases and merge
783 // opportunities here, but we capture the ones we currently need.
784 list< pair<string,string> >::iterator p = compact_queue.begin();
785 while (p != compact_queue.end()) {
786 if (p->first == start && p->second == end) {
787 // dup; no-op
788 return;
789 }
790 if (p->first <= end && p->first > start) {
791 // merge with existing range to the right
792 compact_queue.push_back(make_pair(start, p->second));
793 compact_queue.erase(p);
794 logger->inc(l_rocksdb_compact_queue_merge);
795 break;
796 }
797 if (p->second >= start && p->second < end) {
798 // merge with existing range to the left
799 compact_queue.push_back(make_pair(p->first, end));
800 compact_queue.erase(p);
801 logger->inc(l_rocksdb_compact_queue_merge);
802 break;
803 }
804 ++p;
805 }
806 if (p == compact_queue.end()) {
807 // no merge, new entry.
808 compact_queue.push_back(make_pair(start, end));
809 logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
810 }
811 compact_queue_cond.Signal();
812 if (!compact_thread.is_started()) {
813 compact_thread.create("rstore_compact");
814 }
815 }
816 bool RocksDBStore::check_omap_dir(string &omap_dir)
817 {
818 rocksdb::Options options;
819 options.create_if_missing = true;
820 rocksdb::DB *db;
821 rocksdb::Status status = rocksdb::DB::Open(options, omap_dir, &db);
822 delete db;
823 db = nullptr;
824 return status.ok();
825 }
826 void RocksDBStore::compact_range(const string& start, const string& end)
827 {
828 rocksdb::CompactRangeOptions options;
829 rocksdb::Slice cstart(start);
830 rocksdb::Slice cend(end);
831 db->CompactRange(options, &cstart, &cend);
832 }
833 RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl()
834 {
835 delete dbiter;
836 }
837 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first()
838 {
839 dbiter->SeekToFirst();
840 return dbiter->status().ok() ? 0 : -1;
841 }
842 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string &prefix)
843 {
844 rocksdb::Slice slice_prefix(prefix);
845 dbiter->Seek(slice_prefix);
846 return dbiter->status().ok() ? 0 : -1;
847 }
848 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last()
849 {
850 dbiter->SeekToLast();
851 return dbiter->status().ok() ? 0 : -1;
852 }
853 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string &prefix)
854 {
855 string limit = past_prefix(prefix);
856 rocksdb::Slice slice_limit(limit);
857 dbiter->Seek(slice_limit);
858
859 if (!dbiter->Valid()) {
860 dbiter->SeekToLast();
861 } else {
862 dbiter->Prev();
863 }
864 return dbiter->status().ok() ? 0 : -1;
865 }
866 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after)
867 {
868 lower_bound(prefix, after);
869 if (valid()) {
870 pair<string,string> key = raw_key();
871 if (key.first == prefix && key.second == after)
872 next();
873 }
874 return dbiter->status().ok() ? 0 : -1;
875 }
876 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to)
877 {
878 string bound = combine_strings(prefix, to);
879 rocksdb::Slice slice_bound(bound);
880 dbiter->Seek(slice_bound);
881 return dbiter->status().ok() ? 0 : -1;
882 }
883 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid()
884 {
885 return dbiter->Valid();
886 }
887 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next()
888 {
889 if (valid()) {
890 dbiter->Next();
891 }
892 return dbiter->status().ok() ? 0 : -1;
893 }
894 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev()
895 {
896 if (valid()) {
897 dbiter->Prev();
898 }
899 return dbiter->status().ok() ? 0 : -1;
900 }
901 string RocksDBStore::RocksDBWholeSpaceIteratorImpl::key()
902 {
903 string out_key;
904 split_key(dbiter->key(), 0, &out_key);
905 return out_key;
906 }
907 pair<string,string> RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key()
908 {
909 string prefix, key;
910 split_key(dbiter->key(), &prefix, &key);
911 return make_pair(prefix, key);
912 }
913
914 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key_is_prefixed(const string &prefix) {
915 // Look for "prefix\0" right in rocksb::Slice
916 rocksdb::Slice key = dbiter->key();
917 if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) {
918 return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0;
919 } else {
920 return false;
921 }
922 }
923
924 bufferlist RocksDBStore::RocksDBWholeSpaceIteratorImpl::value()
925 {
926 return to_bufferlist(dbiter->value());
927 }
928
929 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::key_size()
930 {
931 return dbiter->key().size();
932 }
933
934 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_size()
935 {
936 return dbiter->value().size();
937 }
938
939 bufferptr RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_as_ptr()
940 {
941 rocksdb::Slice val = dbiter->value();
942 return bufferptr(val.data(), val.size());
943 }
944
945 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status()
946 {
947 return dbiter->status().ok() ? 0 : -1;
948 }
949
950 string RocksDBStore::past_prefix(const string &prefix)
951 {
952 string limit = prefix;
953 limit.push_back(1);
954 return limit;
955 }
956
957 RocksDBStore::WholeSpaceIterator RocksDBStore::_get_iterator()
958 {
959 return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
960 db->NewIterator(rocksdb::ReadOptions()));
961 }
962