]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include <set> | |
5 | #include <map> | |
6 | #include <string> | |
7 | #include <memory> | |
8 | #include <errno.h> | |
9 | #include <unistd.h> | |
10 | #include <sys/types.h> | |
11 | #include <sys/stat.h> | |
12 | ||
13 | #include "rocksdb/db.h" | |
14 | #include "rocksdb/table.h" | |
15 | #include "rocksdb/env.h" | |
16 | #include "rocksdb/slice.h" | |
17 | #include "rocksdb/cache.h" | |
18 | #include "rocksdb/filter_policy.h" | |
19 | #include "rocksdb/utilities/convenience.h" | |
20 | #include "rocksdb/merge_operator.h" | |
21 | using std::string; | |
22 | #include "common/perf_counters.h" | |
23 | #include "common/debug.h" | |
24 | #include "include/str_list.h" | |
25 | #include "include/stringify.h" | |
26 | #include "include/str_map.h" | |
27 | #include "KeyValueDB.h" | |
28 | #include "RocksDBStore.h" | |
29 | ||
30 | #include "common/debug.h" | |
31 | ||
32 | #define dout_context cct | |
33 | #define dout_subsys ceph_subsys_rocksdb | |
34 | #undef dout_prefix | |
35 | #define dout_prefix *_dout << "rocksdb: " | |
36 | ||
37 | // | |
38 | // One of these per rocksdb instance, implements the merge operator prefix stuff | |
39 | // | |
40 | class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperator { | |
41 | RocksDBStore& store; | |
42 | public: | |
43 | const char *Name() const override { | |
44 | // Construct a name that rocksDB will validate against. We want to | |
45 | // do this in a way that doesn't constrain the ordering of calls | |
46 | // to set_merge_operator, so sort the merge operators and then | |
47 | // construct a name from all of those parts. | |
48 | store.assoc_name.clear(); | |
49 | map<std::string,std::string> names; | |
50 | for (auto& p : store.merge_ops) names[p.first] = p.second->name(); | |
51 | for (auto& p : names) { | |
52 | store.assoc_name += '.'; | |
53 | store.assoc_name += p.first; | |
54 | store.assoc_name += ':'; | |
55 | store.assoc_name += p.second; | |
56 | } | |
57 | return store.assoc_name.c_str(); | |
58 | } | |
59 | ||
60 | MergeOperatorRouter(RocksDBStore &_store) : store(_store) {} | |
61 | ||
62 | bool Merge(const rocksdb::Slice& key, | |
63 | const rocksdb::Slice* existing_value, | |
64 | const rocksdb::Slice& value, | |
65 | std::string* new_value, | |
66 | rocksdb::Logger* logger) const override { | |
67 | // Check each prefix | |
68 | for (auto& p : store.merge_ops) { | |
69 | if (p.first.compare(0, p.first.length(), | |
70 | key.data(), p.first.length()) == 0 && | |
71 | key.data()[p.first.length()] == 0) { | |
72 | if (existing_value) { | |
73 | p.second->merge(existing_value->data(), existing_value->size(), | |
74 | value.data(), value.size(), | |
75 | new_value); | |
76 | } else { | |
77 | p.second->merge_nonexistent(value.data(), value.size(), new_value); | |
78 | } | |
79 | break; | |
80 | } | |
81 | } | |
82 | return true; // OK :) | |
83 | } | |
84 | ||
85 | }; | |
86 | ||
87 | int RocksDBStore::set_merge_operator( | |
88 | const string& prefix, | |
89 | std::shared_ptr<KeyValueDB::MergeOperator> mop) | |
90 | { | |
91 | // If you fail here, it's because you can't do this on an open database | |
92 | assert(db == nullptr); | |
93 | merge_ops.push_back(std::make_pair(prefix,mop)); | |
94 | return 0; | |
95 | } | |
96 | ||
97 | class CephRocksdbLogger : public rocksdb::Logger { | |
98 | CephContext *cct; | |
99 | public: | |
100 | explicit CephRocksdbLogger(CephContext *c) : cct(c) { | |
101 | cct->get(); | |
102 | } | |
103 | ~CephRocksdbLogger() override { | |
104 | cct->put(); | |
105 | } | |
106 | ||
107 | // Write an entry to the log file with the specified format. | |
108 | void Logv(const char* format, va_list ap) override { | |
109 | Logv(rocksdb::INFO_LEVEL, format, ap); | |
110 | } | |
111 | ||
112 | // Write an entry to the log file with the specified log level | |
113 | // and format. Any log with level under the internal log level | |
114 | // of *this (see @SetInfoLogLevel and @GetInfoLogLevel) will not be | |
115 | // printed. | |
116 | void Logv(const rocksdb::InfoLogLevel log_level, const char* format, | |
117 | va_list ap) override { | |
118 | int v = rocksdb::NUM_INFO_LOG_LEVELS - log_level - 1; | |
119 | dout(v); | |
120 | char buf[65536]; | |
121 | vsnprintf(buf, sizeof(buf), format, ap); | |
122 | *_dout << buf << dendl; | |
123 | } | |
124 | }; | |
125 | ||
126 | rocksdb::Logger *create_rocksdb_ceph_logger() | |
127 | { | |
128 | return new CephRocksdbLogger(g_ceph_context); | |
129 | } | |
130 | ||
131 | int string2bool(string val, bool &b_val) | |
132 | { | |
133 | if (strcasecmp(val.c_str(), "false") == 0) { | |
134 | b_val = false; | |
135 | return 0; | |
136 | } else if (strcasecmp(val.c_str(), "true") == 0) { | |
137 | b_val = true; | |
138 | return 0; | |
139 | } else { | |
140 | std::string err; | |
141 | int b = strict_strtol(val.c_str(), 10, &err); | |
142 | if (!err.empty()) | |
143 | return -EINVAL; | |
144 | b_val = !!b; | |
145 | return 0; | |
146 | } | |
147 | } | |
148 | ||
149 | int RocksDBStore::tryInterpret(const string key, const string val, rocksdb::Options &opt) | |
150 | { | |
151 | if (key == "compaction_threads") { | |
152 | std::string err; | |
153 | int f = strict_sistrtoll(val.c_str(), &err); | |
154 | if (!err.empty()) | |
155 | return -EINVAL; | |
156 | //Low priority threadpool is used for compaction | |
157 | opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::LOW); | |
158 | } else if (key == "flusher_threads") { | |
159 | std::string err; | |
160 | int f = strict_sistrtoll(val.c_str(), &err); | |
161 | if (!err.empty()) | |
162 | return -EINVAL; | |
163 | //High priority threadpool is used for flusher | |
164 | opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::HIGH); | |
165 | } else if (key == "compact_on_mount") { | |
166 | int ret = string2bool(val, compact_on_mount); | |
167 | if (ret != 0) | |
168 | return ret; | |
169 | } else if (key == "disableWAL") { | |
170 | int ret = string2bool(val, disableWAL); | |
171 | if (ret != 0) | |
172 | return ret; | |
173 | } else { | |
174 | //unrecognize config options. | |
175 | return -EINVAL; | |
176 | } | |
177 | return 0; | |
178 | } | |
179 | ||
180 | int RocksDBStore::ParseOptionsFromString(const string opt_str, rocksdb::Options &opt) | |
181 | { | |
182 | map<string, string> str_map; | |
183 | int r = get_str_map(opt_str, &str_map, ",\n;"); | |
184 | if (r < 0) | |
185 | return r; | |
186 | map<string, string>::iterator it; | |
187 | for(it = str_map.begin(); it != str_map.end(); ++it) { | |
188 | string this_opt = it->first + "=" + it->second; | |
189 | rocksdb::Status status = rocksdb::GetOptionsFromString(opt, this_opt , &opt); | |
190 | if (!status.ok()) { | |
191 | //unrecognized by rocksdb, try to interpret by ourselves. | |
192 | r = tryInterpret(it->first, it->second, opt); | |
193 | if (r < 0) { | |
194 | derr << status.ToString() << dendl; | |
195 | return -EINVAL; | |
196 | } | |
197 | } | |
198 | lgeneric_dout(cct, 0) << " set rocksdb option " << it->first | |
199 | << " = " << it->second << dendl; | |
200 | } | |
201 | return 0; | |
202 | } | |
203 | ||
204 | int RocksDBStore::init(string _options_str) | |
205 | { | |
206 | options_str = _options_str; | |
207 | rocksdb::Options opt; | |
208 | //try parse options | |
209 | if (options_str.length()) { | |
210 | int r = ParseOptionsFromString(options_str, opt); | |
211 | if (r != 0) { | |
212 | return -EINVAL; | |
213 | } | |
214 | } | |
215 | return 0; | |
216 | } | |
217 | ||
218 | int RocksDBStore::create_and_open(ostream &out) | |
219 | { | |
220 | if (env) { | |
221 | unique_ptr<rocksdb::Directory> dir; | |
222 | env->NewDirectory(path, &dir); | |
223 | } else { | |
224 | int r = ::mkdir(path.c_str(), 0755); | |
225 | if (r < 0) | |
226 | r = -errno; | |
227 | if (r < 0 && r != -EEXIST) { | |
228 | derr << __func__ << " failed to create " << path << ": " << cpp_strerror(r) | |
229 | << dendl; | |
230 | return r; | |
231 | } | |
232 | } | |
233 | return do_open(out, true); | |
234 | } | |
235 | ||
236 | int RocksDBStore::do_open(ostream &out, bool create_if_missing) | |
237 | { | |
238 | rocksdb::Options opt; | |
239 | rocksdb::Status status; | |
240 | ||
241 | if (options_str.length()) { | |
242 | int r = ParseOptionsFromString(options_str, opt); | |
243 | if (r != 0) { | |
244 | return -EINVAL; | |
245 | } | |
246 | } | |
247 | ||
248 | if (g_conf->rocksdb_perf) { | |
249 | dbstats = rocksdb::CreateDBStatistics(); | |
250 | opt.statistics = dbstats; | |
251 | } | |
252 | ||
253 | opt.create_if_missing = create_if_missing; | |
254 | if (g_conf->rocksdb_separate_wal_dir) { | |
255 | opt.wal_dir = path + ".wal"; | |
256 | } | |
257 | if (g_conf->get_val<std::string>("rocksdb_db_paths").length()) { | |
258 | list<string> paths; | |
259 | get_str_list(g_conf->get_val<std::string>("rocksdb_db_paths"), "; \t", paths); | |
260 | for (auto& p : paths) { | |
261 | size_t pos = p.find(','); | |
262 | if (pos == std::string::npos) { | |
263 | derr << __func__ << " invalid db path item " << p << " in " | |
264 | << g_conf->get_val<std::string>("rocksdb_db_paths") << dendl; | |
265 | return -EINVAL; | |
266 | } | |
267 | string path = p.substr(0, pos); | |
268 | string size_str = p.substr(pos + 1); | |
269 | uint64_t size = atoll(size_str.c_str()); | |
270 | if (!size) { | |
271 | derr << __func__ << " invalid db path item " << p << " in " | |
272 | << g_conf->get_val<std::string>("rocksdb_db_paths") << dendl; | |
273 | return -EINVAL; | |
274 | } | |
275 | opt.db_paths.push_back(rocksdb::DbPath(path, size)); | |
276 | dout(10) << __func__ << " db_path " << path << " size " << size << dendl; | |
277 | } | |
278 | } | |
279 | ||
280 | if (g_conf->rocksdb_log_to_ceph_log) { | |
281 | opt.info_log.reset(new CephRocksdbLogger(g_ceph_context)); | |
282 | } | |
283 | ||
284 | if (priv) { | |
285 | dout(10) << __func__ << " using custom Env " << priv << dendl; | |
286 | opt.env = static_cast<rocksdb::Env*>(priv); | |
287 | } | |
288 | ||
289 | auto cache = rocksdb::NewLRUCache(g_conf->rocksdb_cache_size, g_conf->rocksdb_cache_shard_bits); | |
290 | bbt_opts.block_size = g_conf->rocksdb_block_size; | |
291 | bbt_opts.block_cache = cache; | |
292 | if (g_conf->kstore_rocksdb_bloom_bits_per_key > 0) { | |
293 | dout(10) << __func__ << " set bloom filter bits per key to " | |
294 | << g_conf->kstore_rocksdb_bloom_bits_per_key << dendl; | |
295 | bbt_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(g_conf->kstore_rocksdb_bloom_bits_per_key)); | |
296 | } | |
297 | opt.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbt_opts)); | |
298 | dout(10) << __func__ << " set block size to " << g_conf->rocksdb_block_size | |
299 | << " cache size to " << g_conf->rocksdb_cache_size | |
300 | << " num of cache shards to " << (1 << g_conf->rocksdb_cache_shard_bits) << dendl; | |
301 | ||
302 | opt.merge_operator.reset(new MergeOperatorRouter(*this)); | |
303 | status = rocksdb::DB::Open(opt, path, &db); | |
304 | if (!status.ok()) { | |
305 | derr << status.ToString() << dendl; | |
306 | return -EINVAL; | |
307 | } | |
308 | ||
309 | PerfCountersBuilder plb(g_ceph_context, "rocksdb", l_rocksdb_first, l_rocksdb_last); | |
310 | plb.add_u64_counter(l_rocksdb_gets, "get", "Gets"); | |
311 | plb.add_u64_counter(l_rocksdb_txns, "submit_transaction", "Submit transactions"); | |
312 | plb.add_u64_counter(l_rocksdb_txns_sync, "submit_transaction_sync", "Submit transactions sync"); | |
313 | plb.add_time_avg(l_rocksdb_get_latency, "get_latency", "Get latency"); | |
314 | plb.add_time_avg(l_rocksdb_submit_latency, "submit_latency", "Submit Latency"); | |
315 | plb.add_time_avg(l_rocksdb_submit_sync_latency, "submit_sync_latency", "Submit Sync Latency"); | |
316 | plb.add_u64_counter(l_rocksdb_compact, "compact", "Compactions"); | |
317 | plb.add_u64_counter(l_rocksdb_compact_range, "compact_range", "Compactions by range"); | |
318 | plb.add_u64_counter(l_rocksdb_compact_queue_merge, "compact_queue_merge", "Mergings of ranges in compaction queue"); | |
319 | plb.add_u64(l_rocksdb_compact_queue_len, "compact_queue_len", "Length of compaction queue"); | |
320 | plb.add_time_avg(l_rocksdb_write_wal_time, "rocksdb_write_wal_time", "Rocksdb write wal time"); | |
321 | plb.add_time_avg(l_rocksdb_write_memtable_time, "rocksdb_write_memtable_time", "Rocksdb write memtable time"); | |
322 | plb.add_time_avg(l_rocksdb_write_delay_time, "rocksdb_write_delay_time", "Rocksdb write delay time"); | |
323 | plb.add_time_avg(l_rocksdb_write_pre_and_post_process_time, | |
324 | "rocksdb_write_pre_and_post_time", "total time spent on writing a record, excluding write process"); | |
325 | logger = plb.create_perf_counters(); | |
326 | cct->get_perfcounters_collection()->add(logger); | |
327 | ||
328 | if (compact_on_mount) { | |
329 | derr << "Compacting rocksdb store..." << dendl; | |
330 | compact(); | |
331 | derr << "Finished compacting rocksdb store" << dendl; | |
332 | } | |
333 | return 0; | |
334 | } | |
335 | ||
336 | int RocksDBStore::_test_init(const string& dir) | |
337 | { | |
338 | rocksdb::Options options; | |
339 | options.create_if_missing = true; | |
340 | rocksdb::DB *db; | |
341 | rocksdb::Status status = rocksdb::DB::Open(options, dir, &db); | |
342 | delete db; | |
343 | db = nullptr; | |
344 | return status.ok() ? 0 : -EIO; | |
345 | } | |
346 | ||
347 | RocksDBStore::~RocksDBStore() | |
348 | { | |
349 | close(); | |
350 | delete logger; | |
351 | ||
352 | // Ensure db is destroyed before dependent db_cache and filterpolicy | |
353 | delete db; | |
354 | db = nullptr; | |
355 | ||
356 | if (priv) { | |
357 | delete static_cast<rocksdb::Env*>(priv); | |
358 | } | |
359 | } | |
360 | ||
361 | void RocksDBStore::close() | |
362 | { | |
363 | // stop compaction thread | |
364 | compact_queue_lock.Lock(); | |
365 | if (compact_thread.is_started()) { | |
366 | compact_queue_stop = true; | |
367 | compact_queue_cond.Signal(); | |
368 | compact_queue_lock.Unlock(); | |
369 | compact_thread.join(); | |
370 | } else { | |
371 | compact_queue_lock.Unlock(); | |
372 | } | |
373 | ||
374 | if (logger) | |
375 | cct->get_perfcounters_collection()->remove(logger); | |
376 | } | |
377 | ||
378 | void RocksDBStore::split_stats(const std::string &s, char delim, std::vector<std::string> &elems) { | |
379 | std::stringstream ss; | |
380 | ss.str(s); | |
381 | std::string item; | |
382 | while (std::getline(ss, item, delim)) { | |
383 | elems.push_back(item); | |
384 | } | |
385 | } | |
386 | ||
387 | void RocksDBStore::get_statistics(Formatter *f) | |
388 | { | |
389 | if (!g_conf->rocksdb_perf) { | |
390 | dout(20) << __func__ << "RocksDB perf is disabled, can't probe for stats" | |
391 | << dendl; | |
392 | return; | |
393 | } | |
394 | ||
395 | if (g_conf->rocksdb_collect_compaction_stats) { | |
396 | std::string stat_str; | |
397 | bool status = db->GetProperty("rocksdb.stats", &stat_str); | |
398 | if (status) { | |
399 | f->open_object_section("rocksdb_statistics"); | |
400 | f->dump_string("rocksdb_compaction_statistics", ""); | |
401 | vector<string> stats; | |
402 | split_stats(stat_str, '\n', stats); | |
403 | for (auto st :stats) { | |
404 | f->dump_string("", st); | |
405 | } | |
406 | f->close_section(); | |
407 | } | |
408 | } | |
409 | if (g_conf->rocksdb_collect_extended_stats) { | |
410 | if (dbstats) { | |
411 | f->open_object_section("rocksdb_extended_statistics"); | |
412 | string stat_str = dbstats->ToString(); | |
413 | vector<string> stats; | |
414 | split_stats(stat_str, '\n', stats); | |
415 | f->dump_string("rocksdb_extended_statistics", ""); | |
416 | for (auto st :stats) { | |
417 | f->dump_string(".", st); | |
418 | } | |
419 | f->close_section(); | |
420 | } | |
421 | f->open_object_section("rocksdbstore_perf_counters"); | |
422 | logger->dump_formatted(f,0); | |
423 | f->close_section(); | |
424 | } | |
425 | if (g_conf->rocksdb_collect_memory_stats) { | |
426 | f->open_object_section("rocksdb_memtable_statistics"); | |
427 | std::string str(stringify(bbt_opts.block_cache->GetUsage())); | |
428 | f->dump_string("block_cache_usage", str.data()); | |
429 | str.clear(); | |
430 | str.append(stringify(bbt_opts.block_cache->GetPinnedUsage())); | |
431 | f->dump_string("block_cache_pinned_blocks_usage", str); | |
432 | str.clear(); | |
433 | db->GetProperty("rocksdb.cur-size-all-mem-tables", &str); | |
434 | f->dump_string("rocksdb_memtable_usage", str); | |
435 | f->close_section(); | |
436 | } | |
437 | } | |
438 | ||
439 | int RocksDBStore::submit_transaction(KeyValueDB::Transaction t) | |
440 | { | |
441 | utime_t start = ceph_clock_now(); | |
442 | // enable rocksdb breakdown | |
443 | // considering performance overhead, default is disabled | |
444 | if (g_conf->rocksdb_perf) { | |
445 | rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex); | |
446 | rocksdb::perf_context.Reset(); | |
447 | } | |
448 | ||
449 | RocksDBTransactionImpl * _t = | |
450 | static_cast<RocksDBTransactionImpl *>(t.get()); | |
451 | rocksdb::WriteOptions woptions; | |
452 | woptions.disableWAL = disableWAL; | |
453 | lgeneric_subdout(cct, rocksdb, 30) << __func__; | |
454 | RocksWBHandler bat_txc; | |
455 | _t->bat.Iterate(&bat_txc); | |
456 | *_dout << " Rocksdb transaction: " << bat_txc.seen << dendl; | |
457 | ||
458 | rocksdb::Status s = db->Write(woptions, &_t->bat); | |
459 | if (!s.ok()) { | |
460 | RocksWBHandler rocks_txc; | |
461 | _t->bat.Iterate(&rocks_txc); | |
462 | derr << __func__ << " error: " << s.ToString() << " code = " << s.code() | |
463 | << " Rocksdb transaction: " << rocks_txc.seen << dendl; | |
464 | } | |
465 | utime_t lat = ceph_clock_now() - start; | |
466 | ||
467 | if (g_conf->rocksdb_perf) { | |
468 | utime_t write_memtable_time; | |
469 | utime_t write_delay_time; | |
470 | utime_t write_wal_time; | |
471 | utime_t write_pre_and_post_process_time; | |
472 | write_wal_time.set_from_double( | |
473 | static_cast<double>(rocksdb::perf_context.write_wal_time)/1000000000); | |
474 | write_memtable_time.set_from_double( | |
475 | static_cast<double>(rocksdb::perf_context.write_memtable_time)/1000000000); | |
476 | write_delay_time.set_from_double( | |
477 | static_cast<double>(rocksdb::perf_context.write_delay_time)/1000000000); | |
478 | write_pre_and_post_process_time.set_from_double( | |
479 | static_cast<double>(rocksdb::perf_context.write_pre_and_post_process_time)/1000000000); | |
480 | logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time); | |
481 | logger->tinc(l_rocksdb_write_delay_time, write_delay_time); | |
482 | logger->tinc(l_rocksdb_write_wal_time, write_wal_time); | |
483 | logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time); | |
484 | } | |
485 | ||
486 | logger->inc(l_rocksdb_txns); | |
487 | logger->tinc(l_rocksdb_submit_latency, lat); | |
488 | ||
489 | return s.ok() ? 0 : -1; | |
490 | } | |
491 | ||
492 | int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t) | |
493 | { | |
494 | utime_t start = ceph_clock_now(); | |
495 | // enable rocksdb breakdown | |
496 | // considering performance overhead, default is disabled | |
497 | if (g_conf->rocksdb_perf) { | |
498 | rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex); | |
499 | rocksdb::perf_context.Reset(); | |
500 | } | |
501 | ||
502 | RocksDBTransactionImpl * _t = | |
503 | static_cast<RocksDBTransactionImpl *>(t.get()); | |
504 | rocksdb::WriteOptions woptions; | |
505 | woptions.sync = true; | |
506 | woptions.disableWAL = disableWAL; | |
507 | lgeneric_subdout(cct, rocksdb, 30) << __func__; | |
508 | RocksWBHandler bat_txc; | |
509 | _t->bat.Iterate(&bat_txc); | |
510 | *_dout << " Rocksdb transaction: " << bat_txc.seen << dendl; | |
511 | ||
512 | rocksdb::Status s = db->Write(woptions, &_t->bat); | |
513 | if (!s.ok()) { | |
514 | RocksWBHandler rocks_txc; | |
515 | _t->bat.Iterate(&rocks_txc); | |
516 | derr << __func__ << " error: " << s.ToString() << " code = " << s.code() | |
517 | << " Rocksdb transaction: " << rocks_txc.seen << dendl; | |
518 | } | |
519 | utime_t lat = ceph_clock_now() - start; | |
520 | ||
521 | if (g_conf->rocksdb_perf) { | |
522 | utime_t write_memtable_time; | |
523 | utime_t write_delay_time; | |
524 | utime_t write_wal_time; | |
525 | utime_t write_pre_and_post_process_time; | |
526 | write_wal_time.set_from_double( | |
527 | static_cast<double>(rocksdb::perf_context.write_wal_time)/1000000000); | |
528 | write_memtable_time.set_from_double( | |
529 | static_cast<double>(rocksdb::perf_context.write_memtable_time)/1000000000); | |
530 | write_delay_time.set_from_double( | |
531 | static_cast<double>(rocksdb::perf_context.write_delay_time)/1000000000); | |
532 | write_pre_and_post_process_time.set_from_double( | |
533 | static_cast<double>(rocksdb::perf_context.write_pre_and_post_process_time)/1000000000); | |
534 | logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time); | |
535 | logger->tinc(l_rocksdb_write_delay_time, write_delay_time); | |
536 | logger->tinc(l_rocksdb_write_wal_time, write_wal_time); | |
537 | logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time); | |
538 | } | |
539 | ||
540 | logger->inc(l_rocksdb_txns_sync); | |
541 | logger->tinc(l_rocksdb_submit_sync_latency, lat); | |
542 | ||
543 | return s.ok() ? 0 : -1; | |
544 | } | |
545 | ||
546 | RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db) | |
547 | { | |
548 | db = _db; | |
549 | } | |
550 | ||
551 | void RocksDBStore::RocksDBTransactionImpl::set( | |
552 | const string &prefix, | |
553 | const string &k, | |
554 | const bufferlist &to_set_bl) | |
555 | { | |
556 | string key = combine_strings(prefix, k); | |
557 | ||
558 | // bufferlist::c_str() is non-constant, so we can't call c_str() | |
559 | if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) { | |
560 | bat.Put(rocksdb::Slice(key), | |
561 | rocksdb::Slice(to_set_bl.buffers().front().c_str(), | |
562 | to_set_bl.length())); | |
563 | } else { | |
564 | // make a copy | |
565 | bufferlist val = to_set_bl; | |
566 | bat.Put(rocksdb::Slice(key), | |
567 | rocksdb::Slice(val.c_str(), val.length())); | |
568 | } | |
569 | } | |
570 | ||
571 | void RocksDBStore::RocksDBTransactionImpl::set( | |
572 | const string &prefix, | |
573 | const char *k, size_t keylen, | |
574 | const bufferlist &to_set_bl) | |
575 | { | |
576 | string key; | |
577 | combine_strings(prefix, k, keylen, &key); | |
578 | ||
579 | // bufferlist::c_str() is non-constant, so we can't call c_str() | |
580 | if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) { | |
581 | bat.Put(rocksdb::Slice(key), | |
582 | rocksdb::Slice(to_set_bl.buffers().front().c_str(), | |
583 | to_set_bl.length())); | |
584 | } else { | |
585 | // make a copy | |
586 | bufferlist val = to_set_bl; | |
587 | bat.Put(rocksdb::Slice(key), | |
588 | rocksdb::Slice(val.c_str(), val.length())); | |
589 | } | |
590 | } | |
591 | ||
592 | void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix, | |
593 | const string &k) | |
594 | { | |
595 | bat.Delete(combine_strings(prefix, k)); | |
596 | } | |
597 | ||
598 | void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix, | |
599 | const char *k, | |
600 | size_t keylen) | |
601 | { | |
602 | string key; | |
603 | combine_strings(prefix, k, keylen, &key); | |
604 | bat.Delete(key); | |
605 | } | |
606 | ||
607 | void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix, | |
608 | const string &k) | |
609 | { | |
610 | bat.SingleDelete(combine_strings(prefix, k)); | |
611 | } | |
612 | ||
613 | void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix) | |
614 | { | |
615 | KeyValueDB::Iterator it = db->get_iterator(prefix); | |
616 | for (it->seek_to_first(); | |
617 | it->valid(); | |
618 | it->next()) { | |
619 | bat.Delete(combine_strings(prefix, it->key())); | |
620 | } | |
621 | } | |
622 | ||
623 | void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix, | |
624 | const string &start, | |
625 | const string &end) | |
626 | { | |
627 | if (db->enable_rmrange) { | |
628 | bat.DeleteRange(combine_strings(prefix, start), combine_strings(prefix, end)); | |
629 | } else { | |
630 | auto it = db->get_iterator(prefix); | |
631 | it->lower_bound(start); | |
632 | while (it->valid()) { | |
633 | if (it->key() >= end) { | |
634 | break; | |
635 | } | |
636 | bat.Delete(combine_strings(prefix, it->key())); | |
637 | it->next(); | |
638 | } | |
639 | } | |
640 | } | |
641 | ||
642 | void RocksDBStore::RocksDBTransactionImpl::merge( | |
643 | const string &prefix, | |
644 | const string &k, | |
645 | const bufferlist &to_set_bl) | |
646 | { | |
647 | string key = combine_strings(prefix, k); | |
648 | ||
649 | // bufferlist::c_str() is non-constant, so we can't call c_str() | |
650 | if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) { | |
651 | bat.Merge(rocksdb::Slice(key), | |
652 | rocksdb::Slice(to_set_bl.buffers().front().c_str(), | |
653 | to_set_bl.length())); | |
654 | } else { | |
655 | // make a copy | |
656 | bufferlist val = to_set_bl; | |
657 | bat.Merge(rocksdb::Slice(key), | |
658 | rocksdb::Slice(val.c_str(), val.length())); | |
659 | } | |
660 | } | |
661 | ||
662 | //gets will bypass RocksDB row cache, since it uses iterator | |
663 | int RocksDBStore::get( | |
664 | const string &prefix, | |
665 | const std::set<string> &keys, | |
666 | std::map<string, bufferlist> *out) | |
667 | { | |
668 | utime_t start = ceph_clock_now(); | |
669 | for (std::set<string>::const_iterator i = keys.begin(); | |
670 | i != keys.end(); ++i) { | |
671 | std::string value; | |
672 | std::string bound = combine_strings(prefix, *i); | |
673 | auto status = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(bound), &value); | |
674 | if (status.ok()) | |
675 | (*out)[*i].append(value); | |
676 | } | |
677 | utime_t lat = ceph_clock_now() - start; | |
678 | logger->inc(l_rocksdb_gets); | |
679 | logger->tinc(l_rocksdb_get_latency, lat); | |
680 | return 0; | |
681 | } | |
682 | ||
683 | int RocksDBStore::get( | |
684 | const string &prefix, | |
685 | const string &key, | |
686 | bufferlist *out) | |
687 | { | |
688 | assert(out && (out->length() == 0)); | |
689 | utime_t start = ceph_clock_now(); | |
690 | int r = 0; | |
691 | string value, k; | |
692 | rocksdb::Status s; | |
693 | k = combine_strings(prefix, key); | |
694 | s = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(k), &value); | |
695 | if (s.ok()) { | |
696 | out->append(value); | |
697 | } else { | |
698 | r = -ENOENT; | |
699 | } | |
700 | utime_t lat = ceph_clock_now() - start; | |
701 | logger->inc(l_rocksdb_gets); | |
702 | logger->tinc(l_rocksdb_get_latency, lat); | |
703 | return r; | |
704 | } | |
705 | ||
706 | int RocksDBStore::get( | |
707 | const string& prefix, | |
708 | const char *key, | |
709 | size_t keylen, | |
710 | bufferlist *out) | |
711 | { | |
712 | assert(out && (out->length() == 0)); | |
713 | utime_t start = ceph_clock_now(); | |
714 | int r = 0; | |
715 | string value, k; | |
716 | combine_strings(prefix, key, keylen, &k); | |
717 | rocksdb::Status s; | |
718 | s = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(k), &value); | |
719 | if (s.ok()) { | |
720 | out->append(value); | |
721 | } else { | |
722 | r = -ENOENT; | |
723 | } | |
724 | utime_t lat = ceph_clock_now() - start; | |
725 | logger->inc(l_rocksdb_gets); | |
726 | logger->tinc(l_rocksdb_get_latency, lat); | |
727 | return r; | |
728 | } | |
729 | ||
730 | int RocksDBStore::split_key(rocksdb::Slice in, string *prefix, string *key) | |
731 | { | |
732 | size_t prefix_len = 0; | |
733 | ||
734 | // Find separator inside Slice | |
735 | char* separator = (char*) memchr(in.data(), 0, in.size()); | |
736 | if (separator == NULL) | |
737 | return -EINVAL; | |
738 | prefix_len = size_t(separator - in.data()); | |
739 | if (prefix_len >= in.size()) | |
740 | return -EINVAL; | |
741 | ||
742 | // Fetch prefix and/or key directly from Slice | |
743 | if (prefix) | |
744 | *prefix = string(in.data(), prefix_len); | |
745 | if (key) | |
746 | *key = string(separator+1, in.size()-prefix_len-1); | |
747 | return 0; | |
748 | } | |
749 | ||
750 | void RocksDBStore::compact() | |
751 | { | |
752 | logger->inc(l_rocksdb_compact); | |
753 | rocksdb::CompactRangeOptions options; | |
754 | db->CompactRange(options, nullptr, nullptr); | |
755 | } | |
756 | ||
757 | ||
758 | void RocksDBStore::compact_thread_entry() | |
759 | { | |
760 | compact_queue_lock.Lock(); | |
761 | while (!compact_queue_stop) { | |
762 | while (!compact_queue.empty()) { | |
763 | pair<string,string> range = compact_queue.front(); | |
764 | compact_queue.pop_front(); | |
765 | logger->set(l_rocksdb_compact_queue_len, compact_queue.size()); | |
766 | compact_queue_lock.Unlock(); | |
767 | logger->inc(l_rocksdb_compact_range); | |
768 | compact_range(range.first, range.second); | |
769 | compact_queue_lock.Lock(); | |
770 | continue; | |
771 | } | |
772 | compact_queue_cond.Wait(compact_queue_lock); | |
773 | } | |
774 | compact_queue_lock.Unlock(); | |
775 | } | |
776 | ||
777 | void RocksDBStore::compact_range_async(const string& start, const string& end) | |
778 | { | |
779 | Mutex::Locker l(compact_queue_lock); | |
780 | ||
781 | // try to merge adjacent ranges. this is O(n), but the queue should | |
782 | // be short. note that we do not cover all overlap cases and merge | |
783 | // opportunities here, but we capture the ones we currently need. | |
784 | list< pair<string,string> >::iterator p = compact_queue.begin(); | |
785 | while (p != compact_queue.end()) { | |
786 | if (p->first == start && p->second == end) { | |
787 | // dup; no-op | |
788 | return; | |
789 | } | |
790 | if (p->first <= end && p->first > start) { | |
791 | // merge with existing range to the right | |
792 | compact_queue.push_back(make_pair(start, p->second)); | |
793 | compact_queue.erase(p); | |
794 | logger->inc(l_rocksdb_compact_queue_merge); | |
795 | break; | |
796 | } | |
797 | if (p->second >= start && p->second < end) { | |
798 | // merge with existing range to the left | |
799 | compact_queue.push_back(make_pair(p->first, end)); | |
800 | compact_queue.erase(p); | |
801 | logger->inc(l_rocksdb_compact_queue_merge); | |
802 | break; | |
803 | } | |
804 | ++p; | |
805 | } | |
806 | if (p == compact_queue.end()) { | |
807 | // no merge, new entry. | |
808 | compact_queue.push_back(make_pair(start, end)); | |
809 | logger->set(l_rocksdb_compact_queue_len, compact_queue.size()); | |
810 | } | |
811 | compact_queue_cond.Signal(); | |
812 | if (!compact_thread.is_started()) { | |
813 | compact_thread.create("rstore_compact"); | |
814 | } | |
815 | } | |
816 | bool RocksDBStore::check_omap_dir(string &omap_dir) | |
817 | { | |
818 | rocksdb::Options options; | |
819 | options.create_if_missing = true; | |
820 | rocksdb::DB *db; | |
821 | rocksdb::Status status = rocksdb::DB::Open(options, omap_dir, &db); | |
822 | delete db; | |
823 | db = nullptr; | |
824 | return status.ok(); | |
825 | } | |
826 | void RocksDBStore::compact_range(const string& start, const string& end) | |
827 | { | |
828 | rocksdb::CompactRangeOptions options; | |
829 | rocksdb::Slice cstart(start); | |
830 | rocksdb::Slice cend(end); | |
831 | db->CompactRange(options, &cstart, &cend); | |
832 | } | |
833 | RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl() | |
834 | { | |
835 | delete dbiter; | |
836 | } | |
837 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first() | |
838 | { | |
839 | dbiter->SeekToFirst(); | |
840 | return dbiter->status().ok() ? 0 : -1; | |
841 | } | |
842 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string &prefix) | |
843 | { | |
844 | rocksdb::Slice slice_prefix(prefix); | |
845 | dbiter->Seek(slice_prefix); | |
846 | return dbiter->status().ok() ? 0 : -1; | |
847 | } | |
848 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last() | |
849 | { | |
850 | dbiter->SeekToLast(); | |
851 | return dbiter->status().ok() ? 0 : -1; | |
852 | } | |
853 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string &prefix) | |
854 | { | |
855 | string limit = past_prefix(prefix); | |
856 | rocksdb::Slice slice_limit(limit); | |
857 | dbiter->Seek(slice_limit); | |
858 | ||
859 | if (!dbiter->Valid()) { | |
860 | dbiter->SeekToLast(); | |
861 | } else { | |
862 | dbiter->Prev(); | |
863 | } | |
864 | return dbiter->status().ok() ? 0 : -1; | |
865 | } | |
866 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after) | |
867 | { | |
868 | lower_bound(prefix, after); | |
869 | if (valid()) { | |
870 | pair<string,string> key = raw_key(); | |
871 | if (key.first == prefix && key.second == after) | |
872 | next(); | |
873 | } | |
874 | return dbiter->status().ok() ? 0 : -1; | |
875 | } | |
876 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to) | |
877 | { | |
878 | string bound = combine_strings(prefix, to); | |
879 | rocksdb::Slice slice_bound(bound); | |
880 | dbiter->Seek(slice_bound); | |
881 | return dbiter->status().ok() ? 0 : -1; | |
882 | } | |
883 | bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid() | |
884 | { | |
885 | return dbiter->Valid(); | |
886 | } | |
887 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next() | |
888 | { | |
889 | if (valid()) { | |
890 | dbiter->Next(); | |
891 | } | |
892 | return dbiter->status().ok() ? 0 : -1; | |
893 | } | |
894 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev() | |
895 | { | |
896 | if (valid()) { | |
897 | dbiter->Prev(); | |
898 | } | |
899 | return dbiter->status().ok() ? 0 : -1; | |
900 | } | |
901 | string RocksDBStore::RocksDBWholeSpaceIteratorImpl::key() | |
902 | { | |
903 | string out_key; | |
904 | split_key(dbiter->key(), 0, &out_key); | |
905 | return out_key; | |
906 | } | |
907 | pair<string,string> RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key() | |
908 | { | |
909 | string prefix, key; | |
910 | split_key(dbiter->key(), &prefix, &key); | |
911 | return make_pair(prefix, key); | |
912 | } | |
913 | ||
914 | bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key_is_prefixed(const string &prefix) { | |
915 | // Look for "prefix\0" right in rocksb::Slice | |
916 | rocksdb::Slice key = dbiter->key(); | |
917 | if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) { | |
918 | return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0; | |
919 | } else { | |
920 | return false; | |
921 | } | |
922 | } | |
923 | ||
924 | bufferlist RocksDBStore::RocksDBWholeSpaceIteratorImpl::value() | |
925 | { | |
926 | return to_bufferlist(dbiter->value()); | |
927 | } | |
928 | ||
929 | size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::key_size() | |
930 | { | |
931 | return dbiter->key().size(); | |
932 | } | |
933 | ||
934 | size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_size() | |
935 | { | |
936 | return dbiter->value().size(); | |
937 | } | |
938 | ||
939 | bufferptr RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_as_ptr() | |
940 | { | |
941 | rocksdb::Slice val = dbiter->value(); | |
942 | return bufferptr(val.data(), val.size()); | |
943 | } | |
944 | ||
945 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status() | |
946 | { | |
947 | return dbiter->status().ok() ? 0 : -1; | |
948 | } | |
949 | ||
950 | string RocksDBStore::past_prefix(const string &prefix) | |
951 | { | |
952 | string limit = prefix; | |
953 | limit.push_back(1); | |
954 | return limit; | |
955 | } | |
956 | ||
957 | RocksDBStore::WholeSpaceIterator RocksDBStore::_get_iterator() | |
958 | { | |
959 | return std::make_shared<RocksDBWholeSpaceIteratorImpl>( | |
960 | db->NewIterator(rocksdb::ReadOptions())); | |
961 | } | |
962 |