]> git.proxmox.com Git - ceph.git/blob - ceph/src/kv/RocksDBStore.cc
update sources to 12.2.10
[ceph.git] / ceph / src / kv / RocksDBStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <set>
5 #include <map>
6 #include <string>
7 #include <memory>
8 #include <errno.h>
9 #include <unistd.h>
10 #include <sys/types.h>
11 #include <sys/stat.h>
12
13 #include "rocksdb/db.h"
14 #include "rocksdb/table.h"
15 #include "rocksdb/env.h"
16 #include "rocksdb/slice.h"
17 #include "rocksdb/cache.h"
18 #include "rocksdb/filter_policy.h"
19 #include "rocksdb/utilities/convenience.h"
20 #include "rocksdb/merge_operator.h"
21 #include "kv/rocksdb_cache/BinnedLRUCache.h"
22
23 using std::string;
24 #include "common/perf_counters.h"
25 #include "common/PriorityCache.h"
26 #include "include/str_list.h"
27 #include "include/stringify.h"
28 #include "include/str_map.h"
29 #include "KeyValueDB.h"
30 #include "RocksDBStore.h"
31
32 #include "common/debug.h"
33
34 #define dout_context cct
35 #define dout_subsys ceph_subsys_rocksdb
36 #undef dout_prefix
37 #define dout_prefix *_dout << "rocksdb: "
38
39 static rocksdb::SliceParts prepare_sliceparts(const bufferlist &bl,
40 vector<rocksdb::Slice> *slices)
41 {
42 unsigned n = 0;
43 for (auto& buf : bl.buffers()) {
44 (*slices)[n].data_ = buf.c_str();
45 (*slices)[n].size_ = buf.length();
46 n++;
47 }
48 return rocksdb::SliceParts(slices->data(), slices->size());
49 }
50
51 //
52 // One of these per rocksdb instance, implements the merge operator prefix stuff
53 //
54 class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperator {
55 RocksDBStore& store;
56 public:
57 const char *Name() const override {
58 // Construct a name that rocksDB will validate against. We want to
59 // do this in a way that doesn't constrain the ordering of calls
60 // to set_merge_operator, so sort the merge operators and then
61 // construct a name from all of those parts.
62 store.assoc_name.clear();
63 map<std::string,std::string> names;
64 for (auto& p : store.merge_ops) names[p.first] = p.second->name();
65 for (auto& p : names) {
66 store.assoc_name += '.';
67 store.assoc_name += p.first;
68 store.assoc_name += ':';
69 store.assoc_name += p.second;
70 }
71 return store.assoc_name.c_str();
72 }
73
74 MergeOperatorRouter(RocksDBStore &_store) : store(_store) {}
75
76 bool Merge(const rocksdb::Slice& key,
77 const rocksdb::Slice* existing_value,
78 const rocksdb::Slice& value,
79 std::string* new_value,
80 rocksdb::Logger* logger) const override {
81 // Check each prefix
82 for (auto& p : store.merge_ops) {
83 if (p.first.compare(0, p.first.length(),
84 key.data(), p.first.length()) == 0 &&
85 key.data()[p.first.length()] == 0) {
86 if (existing_value) {
87 p.second->merge(existing_value->data(), existing_value->size(),
88 value.data(), value.size(),
89 new_value);
90 } else {
91 p.second->merge_nonexistent(value.data(), value.size(), new_value);
92 }
93 break;
94 }
95 }
96 return true; // OK :)
97 }
98
99 };
100
101 int RocksDBStore::set_merge_operator(
102 const string& prefix,
103 std::shared_ptr<KeyValueDB::MergeOperator> mop)
104 {
105 // If you fail here, it's because you can't do this on an open database
106 assert(db == nullptr);
107 merge_ops.push_back(std::make_pair(prefix,mop));
108 return 0;
109 }
110
111 class CephRocksdbLogger : public rocksdb::Logger {
112 CephContext *cct;
113 public:
114 explicit CephRocksdbLogger(CephContext *c) : cct(c) {
115 cct->get();
116 }
117 ~CephRocksdbLogger() override {
118 cct->put();
119 }
120
121 // Write an entry to the log file with the specified format.
122 void Logv(const char* format, va_list ap) override {
123 Logv(rocksdb::INFO_LEVEL, format, ap);
124 }
125
126 // Write an entry to the log file with the specified log level
127 // and format. Any log with level under the internal log level
128 // of *this (see @SetInfoLogLevel and @GetInfoLogLevel) will not be
129 // printed.
130 void Logv(const rocksdb::InfoLogLevel log_level, const char* format,
131 va_list ap) override {
132 int v = rocksdb::NUM_INFO_LOG_LEVELS - log_level - 1;
133 dout(v);
134 char buf[65536];
135 vsnprintf(buf, sizeof(buf), format, ap);
136 *_dout << buf << dendl;
137 }
138 };
139
140 rocksdb::Logger *create_rocksdb_ceph_logger()
141 {
142 return new CephRocksdbLogger(g_ceph_context);
143 }
144
145 static int string2bool(const string &val, bool &b_val)
146 {
147 if (strcasecmp(val.c_str(), "false") == 0) {
148 b_val = false;
149 return 0;
150 } else if (strcasecmp(val.c_str(), "true") == 0) {
151 b_val = true;
152 return 0;
153 } else {
154 std::string err;
155 int b = strict_strtol(val.c_str(), 10, &err);
156 if (!err.empty())
157 return -EINVAL;
158 b_val = !!b;
159 return 0;
160 }
161 }
162
163 int RocksDBStore::tryInterpret(const string &key, const string &val, rocksdb::Options &opt)
164 {
165 if (key == "compaction_threads") {
166 std::string err;
167 int f = strict_iecstrtoll(val.c_str(), &err);
168 if (!err.empty())
169 return -EINVAL;
170 //Low priority threadpool is used for compaction
171 opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::LOW);
172 } else if (key == "flusher_threads") {
173 std::string err;
174 int f = strict_iecstrtoll(val.c_str(), &err);
175 if (!err.empty())
176 return -EINVAL;
177 //High priority threadpool is used for flusher
178 opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::HIGH);
179 } else if (key == "compact_on_mount") {
180 int ret = string2bool(val, compact_on_mount);
181 if (ret != 0)
182 return ret;
183 } else if (key == "disableWAL") {
184 int ret = string2bool(val, disableWAL);
185 if (ret != 0)
186 return ret;
187 } else {
188 //unrecognize config options.
189 return -EINVAL;
190 }
191 return 0;
192 }
193
194 int RocksDBStore::ParseOptionsFromString(const string &opt_str, rocksdb::Options &opt)
195 {
196 map<string, string> str_map;
197 int r = get_str_map(opt_str, &str_map, ",\n;");
198 if (r < 0)
199 return r;
200 map<string, string>::iterator it;
201 for(it = str_map.begin(); it != str_map.end(); ++it) {
202 string this_opt = it->first + "=" + it->second;
203 rocksdb::Status status = rocksdb::GetOptionsFromString(opt, this_opt , &opt);
204 if (!status.ok()) {
205 //unrecognized by rocksdb, try to interpret by ourselves.
206 r = tryInterpret(it->first, it->second, opt);
207 if (r < 0) {
208 derr << status.ToString() << dendl;
209 return -EINVAL;
210 }
211 }
212 lgeneric_dout(cct, 0) << " set rocksdb option " << it->first
213 << " = " << it->second << dendl;
214 }
215 return 0;
216 }
217
218 int RocksDBStore::init(string _options_str)
219 {
220 options_str = _options_str;
221 rocksdb::Options opt;
222 //try parse options
223 if (options_str.length()) {
224 int r = ParseOptionsFromString(options_str, opt);
225 if (r != 0) {
226 return -EINVAL;
227 }
228 }
229 return 0;
230 }
231
232 int RocksDBStore::create_and_open(ostream &out)
233 {
234 if (env) {
235 unique_ptr<rocksdb::Directory> dir;
236 env->NewDirectory(path, &dir);
237 } else {
238 int r = ::mkdir(path.c_str(), 0755);
239 if (r < 0)
240 r = -errno;
241 if (r < 0 && r != -EEXIST) {
242 derr << __func__ << " failed to create " << path << ": " << cpp_strerror(r)
243 << dendl;
244 return r;
245 }
246 }
247 return do_open(out, true);
248 }
249
250 int RocksDBStore::do_open(ostream &out, bool create_if_missing)
251 {
252 rocksdb::Options opt;
253 rocksdb::Status status;
254
255 if (options_str.length()) {
256 int r = ParseOptionsFromString(options_str, opt);
257 if (r != 0) {
258 return -EINVAL;
259 }
260 }
261
262 if (g_conf->rocksdb_perf) {
263 dbstats = rocksdb::CreateDBStatistics();
264 opt.statistics = dbstats;
265 }
266
267 opt.create_if_missing = create_if_missing;
268 if (g_conf->rocksdb_separate_wal_dir) {
269 opt.wal_dir = path + ".wal";
270 }
271 if (g_conf->get_val<std::string>("rocksdb_db_paths").length()) {
272 list<string> paths;
273 get_str_list(g_conf->get_val<std::string>("rocksdb_db_paths"), "; \t", paths);
274 for (auto& p : paths) {
275 size_t pos = p.find(',');
276 if (pos == std::string::npos) {
277 derr << __func__ << " invalid db path item " << p << " in "
278 << g_conf->get_val<std::string>("rocksdb_db_paths") << dendl;
279 return -EINVAL;
280 }
281 string path = p.substr(0, pos);
282 string size_str = p.substr(pos + 1);
283 uint64_t size = atoll(size_str.c_str());
284 if (!size) {
285 derr << __func__ << " invalid db path item " << p << " in "
286 << g_conf->get_val<std::string>("rocksdb_db_paths") << dendl;
287 return -EINVAL;
288 }
289 opt.db_paths.push_back(rocksdb::DbPath(path, size));
290 dout(10) << __func__ << " db_path " << path << " size " << size << dendl;
291 }
292 }
293
294 if (g_conf->rocksdb_log_to_ceph_log) {
295 opt.info_log.reset(new CephRocksdbLogger(g_ceph_context));
296 }
297
298 if (priv) {
299 dout(10) << __func__ << " using custom Env " << priv << dendl;
300 opt.env = static_cast<rocksdb::Env*>(priv);
301 }
302
303 // caches
304 if (!set_cache_flag) {
305 cache_size = g_conf->rocksdb_cache_size;
306 }
307 uint64_t row_cache_size = cache_size * g_conf->rocksdb_cache_row_ratio;
308 uint64_t block_cache_size = cache_size - row_cache_size;
309
310 if (g_conf->rocksdb_cache_type == "binned_lru") {
311 bbt_opts.block_cache = rocksdb_cache::NewBinnedLRUCache(
312 block_cache_size,
313 g_conf->rocksdb_cache_shard_bits);
314 } else if (g_conf->rocksdb_cache_type == "lru") {
315 bbt_opts.block_cache = rocksdb::NewLRUCache(
316 block_cache_size,
317 g_conf->rocksdb_cache_shard_bits);
318 } else if (g_conf->rocksdb_cache_type == "clock") {
319 bbt_opts.block_cache = rocksdb::NewClockCache(
320 block_cache_size,
321 g_conf->rocksdb_cache_shard_bits);
322 if (!bbt_opts.block_cache) {
323 derr << "rocksdb_cache_type '" << g_conf->rocksdb_cache_type
324 << "' chosen, but RocksDB not compiled with LibTBB. "
325 << dendl;
326 return -EINVAL;
327 }
328 } else {
329 derr << "unrecognized rocksdb_cache_type '" << g_conf->rocksdb_cache_type
330 << "'" << dendl;
331 return -EINVAL;
332 }
333 bbt_opts.block_size = g_conf->rocksdb_block_size;
334
335 if (row_cache_size > 0)
336 opt.row_cache = rocksdb::NewLRUCache(row_cache_size,
337 g_conf->rocksdb_cache_shard_bits);
338 uint64_t bloom_bits = g_conf->get_val<uint64_t>("rocksdb_bloom_bits_per_key");
339 if (bloom_bits > 0) {
340 dout(10) << __func__ << " set bloom filter bits per key to "
341 << bloom_bits << dendl;
342 bbt_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(bloom_bits));
343 }
344 if (g_conf->get_val<std::string>("rocksdb_index_type") == "binary_search")
345 bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kBinarySearch;
346 if (g_conf->get_val<std::string>("rocksdb_index_type") == "hash_search")
347 bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kHashSearch;
348 if (g_conf->get_val<std::string>("rocksdb_index_type") == "two_level")
349 bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
350 bbt_opts.cache_index_and_filter_blocks =
351 g_conf->get_val<bool>("rocksdb_cache_index_and_filter_blocks");
352 bbt_opts.cache_index_and_filter_blocks_with_high_priority =
353 g_conf->get_val<bool>("rocksdb_cache_index_and_filter_blocks_with_high_priority");
354 bbt_opts.partition_filters = g_conf->get_val<bool>("rocksdb_partition_filters");
355 if (g_conf->get_val<uint64_t>("rocksdb_metadata_block_size") > 0)
356 bbt_opts.metadata_block_size = g_conf->get_val<uint64_t>("rocksdb_metadata_block_size");
357 bbt_opts.pin_l0_filter_and_index_blocks_in_cache =
358 g_conf->get_val<bool>("rocksdb_pin_l0_filter_and_index_blocks_in_cache");
359
360 opt.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbt_opts));
361 dout(10) << __func__ << " block size " << g_conf->rocksdb_block_size
362 << ", block_cache size " << byte_u_t(block_cache_size)
363 << ", row_cache size " << byte_u_t(row_cache_size)
364 << "; shards "
365 << (1 << g_conf->rocksdb_cache_shard_bits)
366 << ", type " << g_conf->rocksdb_cache_type
367 << dendl;
368
369 opt.merge_operator.reset(new MergeOperatorRouter(*this));
370 status = rocksdb::DB::Open(opt, path, &db);
371 if (!status.ok()) {
372 derr << status.ToString() << dendl;
373 return -EINVAL;
374 }
375
376 PerfCountersBuilder plb(g_ceph_context, "rocksdb", l_rocksdb_first, l_rocksdb_last);
377 plb.add_u64_counter(l_rocksdb_gets, "get", "Gets");
378 plb.add_u64_counter(l_rocksdb_txns, "submit_transaction", "Submit transactions");
379 plb.add_u64_counter(l_rocksdb_txns_sync, "submit_transaction_sync", "Submit transactions sync");
380 plb.add_time_avg(l_rocksdb_get_latency, "get_latency", "Get latency");
381 plb.add_time_avg(l_rocksdb_submit_latency, "submit_latency", "Submit Latency");
382 plb.add_time_avg(l_rocksdb_submit_sync_latency, "submit_sync_latency", "Submit Sync Latency");
383 plb.add_u64_counter(l_rocksdb_compact, "compact", "Compactions");
384 plb.add_u64_counter(l_rocksdb_compact_range, "compact_range", "Compactions by range");
385 plb.add_u64_counter(l_rocksdb_compact_queue_merge, "compact_queue_merge", "Mergings of ranges in compaction queue");
386 plb.add_u64(l_rocksdb_compact_queue_len, "compact_queue_len", "Length of compaction queue");
387 plb.add_time_avg(l_rocksdb_write_wal_time, "rocksdb_write_wal_time", "Rocksdb write wal time");
388 plb.add_time_avg(l_rocksdb_write_memtable_time, "rocksdb_write_memtable_time", "Rocksdb write memtable time");
389 plb.add_time_avg(l_rocksdb_write_delay_time, "rocksdb_write_delay_time", "Rocksdb write delay time");
390 plb.add_time_avg(l_rocksdb_write_pre_and_post_process_time,
391 "rocksdb_write_pre_and_post_time", "total time spent on writing a record, excluding write process");
392 logger = plb.create_perf_counters();
393 cct->get_perfcounters_collection()->add(logger);
394
395 if (compact_on_mount) {
396 derr << "Compacting rocksdb store..." << dendl;
397 compact();
398 derr << "Finished compacting rocksdb store" << dendl;
399 }
400 return 0;
401 }
402
403 int RocksDBStore::_test_init(const string& dir)
404 {
405 rocksdb::Options options;
406 options.create_if_missing = true;
407 rocksdb::DB *db;
408 rocksdb::Status status = rocksdb::DB::Open(options, dir, &db);
409 delete db;
410 db = nullptr;
411 return status.ok() ? 0 : -EIO;
412 }
413
414 RocksDBStore::~RocksDBStore()
415 {
416 close();
417 delete logger;
418
419 // Ensure db is destroyed before dependent db_cache and filterpolicy
420 delete db;
421 db = nullptr;
422
423 if (priv) {
424 delete static_cast<rocksdb::Env*>(priv);
425 }
426 }
427
428 void RocksDBStore::close()
429 {
430 // stop compaction thread
431 compact_queue_lock.Lock();
432 if (compact_thread.is_started()) {
433 compact_queue_stop = true;
434 compact_queue_cond.Signal();
435 compact_queue_lock.Unlock();
436 compact_thread.join();
437 } else {
438 compact_queue_lock.Unlock();
439 }
440
441 if (logger)
442 cct->get_perfcounters_collection()->remove(logger);
443 }
444
445 void RocksDBStore::split_stats(const std::string &s, char delim, std::vector<std::string> &elems) {
446 std::stringstream ss;
447 ss.str(s);
448 std::string item;
449 while (std::getline(ss, item, delim)) {
450 elems.push_back(item);
451 }
452 }
453
454 void RocksDBStore::get_statistics(Formatter *f)
455 {
456 if (!g_conf->rocksdb_perf) {
457 dout(20) << __func__ << "RocksDB perf is disabled, can't probe for stats"
458 << dendl;
459 return;
460 }
461
462 if (g_conf->rocksdb_collect_compaction_stats) {
463 std::string stat_str;
464 bool status = db->GetProperty("rocksdb.stats", &stat_str);
465 if (status) {
466 f->open_object_section("rocksdb_statistics");
467 f->dump_string("rocksdb_compaction_statistics", "");
468 vector<string> stats;
469 split_stats(stat_str, '\n', stats);
470 for (auto st :stats) {
471 f->dump_string("", st);
472 }
473 f->close_section();
474 }
475 }
476 if (g_conf->rocksdb_collect_extended_stats) {
477 if (dbstats) {
478 f->open_object_section("rocksdb_extended_statistics");
479 string stat_str = dbstats->ToString();
480 vector<string> stats;
481 split_stats(stat_str, '\n', stats);
482 f->dump_string("rocksdb_extended_statistics", "");
483 for (auto st :stats) {
484 f->dump_string(".", st);
485 }
486 f->close_section();
487 }
488 f->open_object_section("rocksdbstore_perf_counters");
489 logger->dump_formatted(f,0);
490 f->close_section();
491 }
492 if (g_conf->rocksdb_collect_memory_stats) {
493 f->open_object_section("rocksdb_memtable_statistics");
494 std::string str(stringify(bbt_opts.block_cache->GetUsage()));
495 f->dump_string("block_cache_usage", str.data());
496 str.clear();
497 str.append(stringify(bbt_opts.block_cache->GetPinnedUsage()));
498 f->dump_string("block_cache_pinned_blocks_usage", str);
499 str.clear();
500 db->GetProperty("rocksdb.cur-size-all-mem-tables", &str);
501 f->dump_string("rocksdb_memtable_usage", str);
502 f->close_section();
503 }
504 }
505
506 int RocksDBStore::submit_transaction(KeyValueDB::Transaction t)
507 {
508 utime_t start = ceph_clock_now();
509 // enable rocksdb breakdown
510 // considering performance overhead, default is disabled
511 if (g_conf->rocksdb_perf) {
512 rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
513 rocksdb::perf_context.Reset();
514 }
515
516 RocksDBTransactionImpl * _t =
517 static_cast<RocksDBTransactionImpl *>(t.get());
518 rocksdb::WriteOptions woptions;
519 woptions.disableWAL = disableWAL;
520 lgeneric_subdout(cct, rocksdb, 30) << __func__;
521 RocksWBHandler bat_txc;
522 _t->bat.Iterate(&bat_txc);
523 *_dout << " Rocksdb transaction: " << bat_txc.seen << dendl;
524
525 rocksdb::Status s = db->Write(woptions, &_t->bat);
526 if (!s.ok()) {
527 RocksWBHandler rocks_txc;
528 _t->bat.Iterate(&rocks_txc);
529 derr << __func__ << " error: " << s.ToString() << " code = " << s.code()
530 << " Rocksdb transaction: " << rocks_txc.seen << dendl;
531 }
532 utime_t lat = ceph_clock_now() - start;
533
534 if (g_conf->rocksdb_perf) {
535 utime_t write_memtable_time;
536 utime_t write_delay_time;
537 utime_t write_wal_time;
538 utime_t write_pre_and_post_process_time;
539 write_wal_time.set_from_double(
540 static_cast<double>(rocksdb::perf_context.write_wal_time)/1000000000);
541 write_memtable_time.set_from_double(
542 static_cast<double>(rocksdb::perf_context.write_memtable_time)/1000000000);
543 write_delay_time.set_from_double(
544 static_cast<double>(rocksdb::perf_context.write_delay_time)/1000000000);
545 write_pre_and_post_process_time.set_from_double(
546 static_cast<double>(rocksdb::perf_context.write_pre_and_post_process_time)/1000000000);
547 logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time);
548 logger->tinc(l_rocksdb_write_delay_time, write_delay_time);
549 logger->tinc(l_rocksdb_write_wal_time, write_wal_time);
550 logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time);
551 }
552
553 logger->inc(l_rocksdb_txns);
554 logger->tinc(l_rocksdb_submit_latency, lat);
555
556 return s.ok() ? 0 : -1;
557 }
558
559 int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t)
560 {
561 utime_t start = ceph_clock_now();
562 // enable rocksdb breakdown
563 // considering performance overhead, default is disabled
564 if (g_conf->rocksdb_perf) {
565 rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
566 rocksdb::perf_context.Reset();
567 }
568
569 RocksDBTransactionImpl * _t =
570 static_cast<RocksDBTransactionImpl *>(t.get());
571 rocksdb::WriteOptions woptions;
572 woptions.sync = true;
573 woptions.disableWAL = disableWAL;
574 lgeneric_subdout(cct, rocksdb, 30) << __func__;
575 RocksWBHandler bat_txc;
576 _t->bat.Iterate(&bat_txc);
577 *_dout << " Rocksdb transaction: " << bat_txc.seen << dendl;
578
579 rocksdb::Status s = db->Write(woptions, &_t->bat);
580 if (!s.ok()) {
581 RocksWBHandler rocks_txc;
582 _t->bat.Iterate(&rocks_txc);
583 derr << __func__ << " error: " << s.ToString() << " code = " << s.code()
584 << " Rocksdb transaction: " << rocks_txc.seen << dendl;
585 }
586 utime_t lat = ceph_clock_now() - start;
587
588 if (g_conf->rocksdb_perf) {
589 utime_t write_memtable_time;
590 utime_t write_delay_time;
591 utime_t write_wal_time;
592 utime_t write_pre_and_post_process_time;
593 write_wal_time.set_from_double(
594 static_cast<double>(rocksdb::perf_context.write_wal_time)/1000000000);
595 write_memtable_time.set_from_double(
596 static_cast<double>(rocksdb::perf_context.write_memtable_time)/1000000000);
597 write_delay_time.set_from_double(
598 static_cast<double>(rocksdb::perf_context.write_delay_time)/1000000000);
599 write_pre_and_post_process_time.set_from_double(
600 static_cast<double>(rocksdb::perf_context.write_pre_and_post_process_time)/1000000000);
601 logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time);
602 logger->tinc(l_rocksdb_write_delay_time, write_delay_time);
603 logger->tinc(l_rocksdb_write_wal_time, write_wal_time);
604 logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time);
605 }
606
607 logger->inc(l_rocksdb_txns_sync);
608 logger->tinc(l_rocksdb_submit_sync_latency, lat);
609
610 return s.ok() ? 0 : -1;
611 }
612
613 RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db)
614 {
615 db = _db;
616 }
617
618 void RocksDBStore::RocksDBTransactionImpl::set(
619 const string &prefix,
620 const string &k,
621 const bufferlist &to_set_bl)
622 {
623 string key = combine_strings(prefix, k);
624
625 // bufferlist::c_str() is non-constant, so we can't call c_str()
626 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
627 bat.Put(rocksdb::Slice(key),
628 rocksdb::Slice(to_set_bl.buffers().front().c_str(),
629 to_set_bl.length()));
630 } else {
631 rocksdb::Slice key_slice(key);
632 vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
633 bat.Put(nullptr, rocksdb::SliceParts(&key_slice, 1),
634 prepare_sliceparts(to_set_bl, &value_slices));
635 }
636 }
637
638 void RocksDBStore::RocksDBTransactionImpl::set(
639 const string &prefix,
640 const char *k, size_t keylen,
641 const bufferlist &to_set_bl)
642 {
643 string key;
644 combine_strings(prefix, k, keylen, &key);
645
646 // bufferlist::c_str() is non-constant, so we can't call c_str()
647 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
648 bat.Put(rocksdb::Slice(key),
649 rocksdb::Slice(to_set_bl.buffers().front().c_str(),
650 to_set_bl.length()));
651 } else {
652 rocksdb::Slice key_slice(key);
653 vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
654 bat.Put(nullptr, rocksdb::SliceParts(&key_slice, 1),
655 prepare_sliceparts(to_set_bl, &value_slices));
656 }
657 }
658
659 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
660 const string &k)
661 {
662 bat.Delete(combine_strings(prefix, k));
663 }
664
665 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
666 const char *k,
667 size_t keylen)
668 {
669 string key;
670 combine_strings(prefix, k, keylen, &key);
671 bat.Delete(key);
672 }
673
674 void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix,
675 const string &k)
676 {
677 bat.SingleDelete(combine_strings(prefix, k));
678 }
679
680 void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
681 {
682 if (db->enable_rmrange) {
683 string endprefix = prefix;
684 endprefix.push_back('\x01');
685 bat.DeleteRange(combine_strings(prefix, string()),
686 combine_strings(endprefix, string()));
687 } else {
688 KeyValueDB::Iterator it = db->get_iterator(prefix);
689 for (it->seek_to_first();
690 it->valid();
691 it->next()) {
692 bat.Delete(combine_strings(prefix, it->key()));
693 }
694 }
695 }
696
697 void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix,
698 const string &start,
699 const string &end)
700 {
701 if (db->enable_rmrange) {
702 bat.DeleteRange(combine_strings(prefix, start), combine_strings(prefix, end));
703 } else {
704 auto it = db->get_iterator(prefix);
705 it->lower_bound(start);
706 while (it->valid()) {
707 if (it->key() >= end) {
708 break;
709 }
710 bat.Delete(combine_strings(prefix, it->key()));
711 it->next();
712 }
713 }
714 }
715
716 void RocksDBStore::RocksDBTransactionImpl::merge(
717 const string &prefix,
718 const string &k,
719 const bufferlist &to_set_bl)
720 {
721 string key = combine_strings(prefix, k);
722
723 // bufferlist::c_str() is non-constant, so we can't call c_str()
724 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
725 bat.Merge(rocksdb::Slice(key),
726 rocksdb::Slice(to_set_bl.buffers().front().c_str(),
727 to_set_bl.length()));
728 } else {
729 // make a copy
730 rocksdb::Slice key_slice(key);
731 vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
732 bat.Merge(nullptr, rocksdb::SliceParts(&key_slice, 1),
733 prepare_sliceparts(to_set_bl, &value_slices));
734 }
735 }
736
737 //gets will bypass RocksDB row cache, since it uses iterator
738 int RocksDBStore::get(
739 const string &prefix,
740 const std::set<string> &keys,
741 std::map<string, bufferlist> *out)
742 {
743 utime_t start = ceph_clock_now();
744 for (std::set<string>::const_iterator i = keys.begin();
745 i != keys.end(); ++i) {
746 std::string value;
747 std::string bound = combine_strings(prefix, *i);
748 auto status = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(bound), &value);
749 if (status.ok()) {
750 (*out)[*i].append(value);
751 } else if (status.IsIOError()) {
752 ceph_abort_msg(cct, status.ToString());
753 }
754
755 }
756 utime_t lat = ceph_clock_now() - start;
757 logger->inc(l_rocksdb_gets);
758 logger->tinc(l_rocksdb_get_latency, lat);
759 return 0;
760 }
761
762 int RocksDBStore::get(
763 const string &prefix,
764 const string &key,
765 bufferlist *out)
766 {
767 assert(out && (out->length() == 0));
768 utime_t start = ceph_clock_now();
769 int r = 0;
770 string value, k;
771 rocksdb::Status s;
772 k = combine_strings(prefix, key);
773 s = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(k), &value);
774 if (s.ok()) {
775 out->append(value);
776 } else if (s.IsNotFound()) {
777 r = -ENOENT;
778 } else {
779 ceph_abort_msg(cct, s.ToString());
780 }
781 utime_t lat = ceph_clock_now() - start;
782 logger->inc(l_rocksdb_gets);
783 logger->tinc(l_rocksdb_get_latency, lat);
784 return r;
785 }
786
787 int RocksDBStore::get(
788 const string& prefix,
789 const char *key,
790 size_t keylen,
791 bufferlist *out)
792 {
793 assert(out && (out->length() == 0));
794 utime_t start = ceph_clock_now();
795 int r = 0;
796 string value, k;
797 combine_strings(prefix, key, keylen, &k);
798 rocksdb::Status s;
799 s = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(k), &value);
800 if (s.ok()) {
801 out->append(value);
802 } else if (s.IsNotFound()) {
803 r = -ENOENT;
804 } else {
805 ceph_abort_msg(cct, s.ToString());
806 }
807 utime_t lat = ceph_clock_now() - start;
808 logger->inc(l_rocksdb_gets);
809 logger->tinc(l_rocksdb_get_latency, lat);
810 return r;
811 }
812
813 int RocksDBStore::split_key(rocksdb::Slice in, string *prefix, string *key)
814 {
815 size_t prefix_len = 0;
816
817 // Find separator inside Slice
818 char* separator = (char*) memchr(in.data(), 0, in.size());
819 if (separator == NULL)
820 return -EINVAL;
821 prefix_len = size_t(separator - in.data());
822 if (prefix_len >= in.size())
823 return -EINVAL;
824
825 // Fetch prefix and/or key directly from Slice
826 if (prefix)
827 *prefix = string(in.data(), prefix_len);
828 if (key)
829 *key = string(separator+1, in.size()-prefix_len-1);
830 return 0;
831 }
832
833 void RocksDBStore::compact()
834 {
835 logger->inc(l_rocksdb_compact);
836 rocksdb::CompactRangeOptions options;
837 db->CompactRange(options, nullptr, nullptr);
838 }
839
840
841 void RocksDBStore::compact_thread_entry()
842 {
843 compact_queue_lock.Lock();
844 while (!compact_queue_stop) {
845 while (!compact_queue.empty()) {
846 pair<string,string> range = compact_queue.front();
847 compact_queue.pop_front();
848 logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
849 compact_queue_lock.Unlock();
850 logger->inc(l_rocksdb_compact_range);
851 compact_range(range.first, range.second);
852 compact_queue_lock.Lock();
853 continue;
854 }
855 compact_queue_cond.Wait(compact_queue_lock);
856 }
857 compact_queue_lock.Unlock();
858 }
859
860 void RocksDBStore::compact_range_async(const string& start, const string& end)
861 {
862 Mutex::Locker l(compact_queue_lock);
863
864 // try to merge adjacent ranges. this is O(n), but the queue should
865 // be short. note that we do not cover all overlap cases and merge
866 // opportunities here, but we capture the ones we currently need.
867 list< pair<string,string> >::iterator p = compact_queue.begin();
868 while (p != compact_queue.end()) {
869 if (p->first == start && p->second == end) {
870 // dup; no-op
871 return;
872 }
873 if (p->first <= end && p->first > start) {
874 // merge with existing range to the right
875 compact_queue.push_back(make_pair(start, p->second));
876 compact_queue.erase(p);
877 logger->inc(l_rocksdb_compact_queue_merge);
878 break;
879 }
880 if (p->second >= start && p->second < end) {
881 // merge with existing range to the left
882 compact_queue.push_back(make_pair(p->first, end));
883 compact_queue.erase(p);
884 logger->inc(l_rocksdb_compact_queue_merge);
885 break;
886 }
887 ++p;
888 }
889 if (p == compact_queue.end()) {
890 // no merge, new entry.
891 compact_queue.push_back(make_pair(start, end));
892 logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
893 }
894 compact_queue_cond.Signal();
895 if (!compact_thread.is_started()) {
896 compact_thread.create("rstore_compact");
897 }
898 }
899 bool RocksDBStore::check_omap_dir(string &omap_dir)
900 {
901 rocksdb::Options options;
902 options.create_if_missing = true;
903 rocksdb::DB *db;
904 rocksdb::Status status = rocksdb::DB::Open(options, omap_dir, &db);
905 delete db;
906 db = nullptr;
907 return status.ok();
908 }
909 void RocksDBStore::compact_range(const string& start, const string& end)
910 {
911 rocksdb::CompactRangeOptions options;
912 rocksdb::Slice cstart(start);
913 rocksdb::Slice cend(end);
914 db->CompactRange(options, &cstart, &cend);
915 }
916
917 int64_t RocksDBStore::request_cache_bytes(PriorityCache::Priority pri, uint64_t chunk_bytes) const
918 {
919 auto cache = bbt_opts.block_cache;
920
921 int64_t assigned = get_cache_bytes(pri);
922 int64_t usage = 0;
923 int64_t request = 0;
924 switch (pri) {
925 // PRI0 is for rocksdb's high priority items (indexes/filters)
926 case PriorityCache::Priority::PRI0:
927 {
928 usage += cache->GetPinnedUsage();
929 if (g_conf->rocksdb_cache_type == "binned_lru") {
930 auto binned_cache =
931 std::static_pointer_cast<rocksdb_cache::BinnedLRUCache>(cache);
932 usage += binned_cache->GetHighPriPoolUsage();
933 }
934 break;
935 }
936 // All other cache items are currently shoved into the LAST priority.
937 case PriorityCache::Priority::LAST:
938 {
939 usage = get_cache_usage() - cache->GetPinnedUsage();
940 if (g_conf->rocksdb_cache_type == "binned_lru") {
941 auto binned_cache =
942 std::static_pointer_cast<rocksdb_cache::BinnedLRUCache>(cache);
943 usage -= binned_cache->GetHighPriPoolUsage();
944 }
945 break;
946 }
947 default:
948 break;
949 }
950 request = PriorityCache::get_chunk(usage, chunk_bytes);
951 request = (request > assigned) ? request - assigned : 0;
952 dout(10) << __func__ << " Priority: " << static_cast<uint32_t>(pri)
953 << " Usage: " << usage << " Request: " << request << dendl;
954 return request;
955 }
956
957 int64_t RocksDBStore::get_cache_usage() const
958 {
959 return static_cast<int64_t>(bbt_opts.block_cache->GetUsage());
960 }
961
962 int64_t RocksDBStore::commit_cache_size()
963 {
964 size_t old_bytes = bbt_opts.block_cache->GetCapacity();
965 int64_t total_bytes = get_cache_bytes();
966 dout(10) << __func__ << " old: " << old_bytes
967 << " new: " << total_bytes << dendl;
968 bbt_opts.block_cache->SetCapacity((size_t) total_bytes);
969
970 // Set the high priority pool ratio is this is the binned LRU cache.
971 if (g_conf->rocksdb_cache_type == "binned_lru") {
972 auto binned_cache =
973 std::static_pointer_cast<rocksdb_cache::BinnedLRUCache>(bbt_opts.block_cache);
974 int64_t high_pri_bytes = get_cache_bytes(PriorityCache::Priority::PRI0);
975 double ratio = (double) high_pri_bytes / total_bytes;
976 dout(10) << __func__ << " High Pri Pool Ratio set to " << ratio << dendl;
977 binned_cache->SetHighPriPoolRatio(ratio);
978 }
979 return total_bytes;
980 }
981
982 int64_t RocksDBStore::get_cache_capacity() {
983 return bbt_opts.block_cache->GetCapacity();
984 }
985
986 RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl()
987 {
988 delete dbiter;
989 }
990 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first()
991 {
992 dbiter->SeekToFirst();
993 assert(!dbiter->status().IsIOError());
994 return dbiter->status().ok() ? 0 : -1;
995 }
996 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string &prefix)
997 {
998 rocksdb::Slice slice_prefix(prefix);
999 dbiter->Seek(slice_prefix);
1000 assert(!dbiter->status().IsIOError());
1001 return dbiter->status().ok() ? 0 : -1;
1002 }
1003 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last()
1004 {
1005 dbiter->SeekToLast();
1006 assert(!dbiter->status().IsIOError());
1007 return dbiter->status().ok() ? 0 : -1;
1008 }
1009 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string &prefix)
1010 {
1011 string limit = past_prefix(prefix);
1012 rocksdb::Slice slice_limit(limit);
1013 dbiter->Seek(slice_limit);
1014
1015 if (!dbiter->Valid()) {
1016 dbiter->SeekToLast();
1017 } else {
1018 dbiter->Prev();
1019 }
1020 return dbiter->status().ok() ? 0 : -1;
1021 }
1022 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after)
1023 {
1024 lower_bound(prefix, after);
1025 if (valid()) {
1026 pair<string,string> key = raw_key();
1027 if (key.first == prefix && key.second == after)
1028 next();
1029 }
1030 return dbiter->status().ok() ? 0 : -1;
1031 }
1032 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to)
1033 {
1034 string bound = combine_strings(prefix, to);
1035 rocksdb::Slice slice_bound(bound);
1036 dbiter->Seek(slice_bound);
1037 return dbiter->status().ok() ? 0 : -1;
1038 }
1039 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid()
1040 {
1041 return dbiter->Valid();
1042 }
1043 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next()
1044 {
1045 if (valid()) {
1046 dbiter->Next();
1047 }
1048 assert(!dbiter->status().IsIOError());
1049 return dbiter->status().ok() ? 0 : -1;
1050 }
1051 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev()
1052 {
1053 if (valid()) {
1054 dbiter->Prev();
1055 }
1056 assert(!dbiter->status().IsIOError());
1057 return dbiter->status().ok() ? 0 : -1;
1058 }
1059 string RocksDBStore::RocksDBWholeSpaceIteratorImpl::key()
1060 {
1061 string out_key;
1062 split_key(dbiter->key(), 0, &out_key);
1063 return out_key;
1064 }
1065 pair<string,string> RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key()
1066 {
1067 string prefix, key;
1068 split_key(dbiter->key(), &prefix, &key);
1069 return make_pair(prefix, key);
1070 }
1071
1072 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key_is_prefixed(const string &prefix) {
1073 // Look for "prefix\0" right in rocksb::Slice
1074 rocksdb::Slice key = dbiter->key();
1075 if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) {
1076 return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0;
1077 } else {
1078 return false;
1079 }
1080 }
1081
1082 bufferlist RocksDBStore::RocksDBWholeSpaceIteratorImpl::value()
1083 {
1084 return to_bufferlist(dbiter->value());
1085 }
1086
1087 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::key_size()
1088 {
1089 return dbiter->key().size();
1090 }
1091
1092 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_size()
1093 {
1094 return dbiter->value().size();
1095 }
1096
1097 bufferptr RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_as_ptr()
1098 {
1099 rocksdb::Slice val = dbiter->value();
1100 return bufferptr(val.data(), val.size());
1101 }
1102
1103 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status()
1104 {
1105 return dbiter->status().ok() ? 0 : -1;
1106 }
1107
1108 string RocksDBStore::past_prefix(const string &prefix)
1109 {
1110 string limit = prefix;
1111 limit.push_back(1);
1112 return limit;
1113 }
1114
1115 RocksDBStore::WholeSpaceIterator RocksDBStore::_get_iterator()
1116 {
1117 return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
1118 db->NewIterator(rocksdb::ReadOptions()));
1119 }
1120