]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include <set> | |
5 | #include <map> | |
6 | #include <string> | |
7 | #include <memory> | |
8 | #include <errno.h> | |
9 | #include <unistd.h> | |
10 | #include <sys/types.h> | |
11 | #include <sys/stat.h> | |
12 | ||
13 | #include "rocksdb/db.h" | |
14 | #include "rocksdb/table.h" | |
15 | #include "rocksdb/env.h" | |
16 | #include "rocksdb/slice.h" | |
17 | #include "rocksdb/cache.h" | |
18 | #include "rocksdb/filter_policy.h" | |
19 | #include "rocksdb/utilities/convenience.h" | |
20 | #include "rocksdb/merge_operator.h" | |
21 | using std::string; | |
22 | #include "common/perf_counters.h" | |
23 | #include "common/debug.h" | |
24 | #include "include/str_list.h" | |
25 | #include "include/stringify.h" | |
26 | #include "include/str_map.h" | |
27 | #include "KeyValueDB.h" | |
28 | #include "RocksDBStore.h" | |
29 | ||
30 | #include "common/debug.h" | |
31 | ||
32 | #define dout_context cct | |
33 | #define dout_subsys ceph_subsys_rocksdb | |
34 | #undef dout_prefix | |
35 | #define dout_prefix *_dout << "rocksdb: " | |
36 | ||
31f18b77 FG |
37 | static rocksdb::SliceParts prepare_sliceparts(const bufferlist &bl, rocksdb::Slice *slices) |
38 | { | |
39 | unsigned n = 0; | |
40 | for (std::list<buffer::ptr>::const_iterator p = bl.buffers().begin(); | |
41 | p != bl.buffers().end(); ++p, ++n) { | |
42 | slices[n].data_ = p->c_str(); | |
43 | slices[n].size_ = p->length(); | |
44 | } | |
45 | return rocksdb::SliceParts(slices, n); | |
46 | } | |
47 | ||
7c673cae FG |
48 | // |
49 | // One of these per rocksdb instance, implements the merge operator prefix stuff | |
50 | // | |
51 | class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperator { | |
52 | RocksDBStore& store; | |
53 | public: | |
54 | const char *Name() const override { | |
55 | // Construct a name that rocksDB will validate against. We want to | |
56 | // do this in a way that doesn't constrain the ordering of calls | |
57 | // to set_merge_operator, so sort the merge operators and then | |
58 | // construct a name from all of those parts. | |
59 | store.assoc_name.clear(); | |
60 | map<std::string,std::string> names; | |
61 | for (auto& p : store.merge_ops) names[p.first] = p.second->name(); | |
62 | for (auto& p : names) { | |
63 | store.assoc_name += '.'; | |
64 | store.assoc_name += p.first; | |
65 | store.assoc_name += ':'; | |
66 | store.assoc_name += p.second; | |
67 | } | |
68 | return store.assoc_name.c_str(); | |
69 | } | |
70 | ||
71 | MergeOperatorRouter(RocksDBStore &_store) : store(_store) {} | |
72 | ||
73 | bool Merge(const rocksdb::Slice& key, | |
74 | const rocksdb::Slice* existing_value, | |
75 | const rocksdb::Slice& value, | |
76 | std::string* new_value, | |
77 | rocksdb::Logger* logger) const override { | |
78 | // Check each prefix | |
79 | for (auto& p : store.merge_ops) { | |
80 | if (p.first.compare(0, p.first.length(), | |
81 | key.data(), p.first.length()) == 0 && | |
82 | key.data()[p.first.length()] == 0) { | |
83 | if (existing_value) { | |
84 | p.second->merge(existing_value->data(), existing_value->size(), | |
85 | value.data(), value.size(), | |
86 | new_value); | |
87 | } else { | |
88 | p.second->merge_nonexistent(value.data(), value.size(), new_value); | |
89 | } | |
90 | break; | |
91 | } | |
92 | } | |
93 | return true; // OK :) | |
94 | } | |
95 | ||
96 | }; | |
97 | ||
98 | int RocksDBStore::set_merge_operator( | |
99 | const string& prefix, | |
100 | std::shared_ptr<KeyValueDB::MergeOperator> mop) | |
101 | { | |
102 | // If you fail here, it's because you can't do this on an open database | |
103 | assert(db == nullptr); | |
104 | merge_ops.push_back(std::make_pair(prefix,mop)); | |
105 | return 0; | |
106 | } | |
107 | ||
108 | class CephRocksdbLogger : public rocksdb::Logger { | |
109 | CephContext *cct; | |
110 | public: | |
111 | explicit CephRocksdbLogger(CephContext *c) : cct(c) { | |
112 | cct->get(); | |
113 | } | |
114 | ~CephRocksdbLogger() override { | |
115 | cct->put(); | |
116 | } | |
117 | ||
118 | // Write an entry to the log file with the specified format. | |
119 | void Logv(const char* format, va_list ap) override { | |
120 | Logv(rocksdb::INFO_LEVEL, format, ap); | |
121 | } | |
122 | ||
123 | // Write an entry to the log file with the specified log level | |
124 | // and format. Any log with level under the internal log level | |
125 | // of *this (see @SetInfoLogLevel and @GetInfoLogLevel) will not be | |
126 | // printed. | |
127 | void Logv(const rocksdb::InfoLogLevel log_level, const char* format, | |
128 | va_list ap) override { | |
129 | int v = rocksdb::NUM_INFO_LOG_LEVELS - log_level - 1; | |
130 | dout(v); | |
131 | char buf[65536]; | |
132 | vsnprintf(buf, sizeof(buf), format, ap); | |
133 | *_dout << buf << dendl; | |
134 | } | |
135 | }; | |
136 | ||
137 | rocksdb::Logger *create_rocksdb_ceph_logger() | |
138 | { | |
139 | return new CephRocksdbLogger(g_ceph_context); | |
140 | } | |
141 | ||
142 | int string2bool(string val, bool &b_val) | |
143 | { | |
144 | if (strcasecmp(val.c_str(), "false") == 0) { | |
145 | b_val = false; | |
146 | return 0; | |
147 | } else if (strcasecmp(val.c_str(), "true") == 0) { | |
148 | b_val = true; | |
149 | return 0; | |
150 | } else { | |
151 | std::string err; | |
152 | int b = strict_strtol(val.c_str(), 10, &err); | |
153 | if (!err.empty()) | |
154 | return -EINVAL; | |
155 | b_val = !!b; | |
156 | return 0; | |
157 | } | |
158 | } | |
159 | ||
160 | int RocksDBStore::tryInterpret(const string key, const string val, rocksdb::Options &opt) | |
161 | { | |
162 | if (key == "compaction_threads") { | |
163 | std::string err; | |
164 | int f = strict_sistrtoll(val.c_str(), &err); | |
165 | if (!err.empty()) | |
166 | return -EINVAL; | |
167 | //Low priority threadpool is used for compaction | |
168 | opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::LOW); | |
169 | } else if (key == "flusher_threads") { | |
170 | std::string err; | |
171 | int f = strict_sistrtoll(val.c_str(), &err); | |
172 | if (!err.empty()) | |
173 | return -EINVAL; | |
174 | //High priority threadpool is used for flusher | |
175 | opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::HIGH); | |
176 | } else if (key == "compact_on_mount") { | |
177 | int ret = string2bool(val, compact_on_mount); | |
178 | if (ret != 0) | |
179 | return ret; | |
180 | } else if (key == "disableWAL") { | |
181 | int ret = string2bool(val, disableWAL); | |
182 | if (ret != 0) | |
183 | return ret; | |
184 | } else { | |
185 | //unrecognize config options. | |
186 | return -EINVAL; | |
187 | } | |
188 | return 0; | |
189 | } | |
190 | ||
191 | int RocksDBStore::ParseOptionsFromString(const string opt_str, rocksdb::Options &opt) | |
192 | { | |
193 | map<string, string> str_map; | |
194 | int r = get_str_map(opt_str, &str_map, ",\n;"); | |
195 | if (r < 0) | |
196 | return r; | |
197 | map<string, string>::iterator it; | |
198 | for(it = str_map.begin(); it != str_map.end(); ++it) { | |
199 | string this_opt = it->first + "=" + it->second; | |
200 | rocksdb::Status status = rocksdb::GetOptionsFromString(opt, this_opt , &opt); | |
201 | if (!status.ok()) { | |
202 | //unrecognized by rocksdb, try to interpret by ourselves. | |
203 | r = tryInterpret(it->first, it->second, opt); | |
204 | if (r < 0) { | |
205 | derr << status.ToString() << dendl; | |
206 | return -EINVAL; | |
207 | } | |
208 | } | |
209 | lgeneric_dout(cct, 0) << " set rocksdb option " << it->first | |
210 | << " = " << it->second << dendl; | |
211 | } | |
212 | return 0; | |
213 | } | |
214 | ||
215 | int RocksDBStore::init(string _options_str) | |
216 | { | |
217 | options_str = _options_str; | |
218 | rocksdb::Options opt; | |
219 | //try parse options | |
220 | if (options_str.length()) { | |
221 | int r = ParseOptionsFromString(options_str, opt); | |
222 | if (r != 0) { | |
223 | return -EINVAL; | |
224 | } | |
225 | } | |
226 | return 0; | |
227 | } | |
228 | ||
229 | int RocksDBStore::create_and_open(ostream &out) | |
230 | { | |
231 | if (env) { | |
232 | unique_ptr<rocksdb::Directory> dir; | |
233 | env->NewDirectory(path, &dir); | |
234 | } else { | |
235 | int r = ::mkdir(path.c_str(), 0755); | |
236 | if (r < 0) | |
237 | r = -errno; | |
238 | if (r < 0 && r != -EEXIST) { | |
239 | derr << __func__ << " failed to create " << path << ": " << cpp_strerror(r) | |
240 | << dendl; | |
241 | return r; | |
242 | } | |
243 | } | |
244 | return do_open(out, true); | |
245 | } | |
246 | ||
247 | int RocksDBStore::do_open(ostream &out, bool create_if_missing) | |
248 | { | |
249 | rocksdb::Options opt; | |
250 | rocksdb::Status status; | |
251 | ||
252 | if (options_str.length()) { | |
253 | int r = ParseOptionsFromString(options_str, opt); | |
254 | if (r != 0) { | |
255 | return -EINVAL; | |
256 | } | |
257 | } | |
258 | ||
259 | if (g_conf->rocksdb_perf) { | |
260 | dbstats = rocksdb::CreateDBStatistics(); | |
261 | opt.statistics = dbstats; | |
262 | } | |
263 | ||
264 | opt.create_if_missing = create_if_missing; | |
265 | if (g_conf->rocksdb_separate_wal_dir) { | |
266 | opt.wal_dir = path + ".wal"; | |
267 | } | |
268 | if (g_conf->get_val<std::string>("rocksdb_db_paths").length()) { | |
269 | list<string> paths; | |
270 | get_str_list(g_conf->get_val<std::string>("rocksdb_db_paths"), "; \t", paths); | |
271 | for (auto& p : paths) { | |
272 | size_t pos = p.find(','); | |
273 | if (pos == std::string::npos) { | |
274 | derr << __func__ << " invalid db path item " << p << " in " | |
275 | << g_conf->get_val<std::string>("rocksdb_db_paths") << dendl; | |
276 | return -EINVAL; | |
277 | } | |
278 | string path = p.substr(0, pos); | |
279 | string size_str = p.substr(pos + 1); | |
280 | uint64_t size = atoll(size_str.c_str()); | |
281 | if (!size) { | |
282 | derr << __func__ << " invalid db path item " << p << " in " | |
283 | << g_conf->get_val<std::string>("rocksdb_db_paths") << dendl; | |
284 | return -EINVAL; | |
285 | } | |
286 | opt.db_paths.push_back(rocksdb::DbPath(path, size)); | |
287 | dout(10) << __func__ << " db_path " << path << " size " << size << dendl; | |
288 | } | |
289 | } | |
290 | ||
291 | if (g_conf->rocksdb_log_to_ceph_log) { | |
292 | opt.info_log.reset(new CephRocksdbLogger(g_ceph_context)); | |
293 | } | |
294 | ||
295 | if (priv) { | |
296 | dout(10) << __func__ << " using custom Env " << priv << dendl; | |
297 | opt.env = static_cast<rocksdb::Env*>(priv); | |
298 | } | |
299 | ||
31f18b77 | 300 | // caches |
224ce89b | 301 | if (!set_cache_flag) { |
31f18b77 FG |
302 | cache_size = g_conf->rocksdb_cache_size; |
303 | } | |
304 | uint64_t row_cache_size = cache_size * g_conf->rocksdb_cache_row_ratio; | |
305 | uint64_t block_cache_size = cache_size - row_cache_size; | |
224ce89b WB |
306 | |
307 | if (block_cache_size == 0) { | |
308 | // disable block cache | |
309 | dout(10) << __func__ << " block_cache_size " << block_cache_size | |
310 | << ", setting no_block_cache " << dendl; | |
311 | bbt_opts.no_block_cache = true; | |
31f18b77 | 312 | } else { |
224ce89b WB |
313 | if (g_conf->rocksdb_cache_type == "lru") { |
314 | bbt_opts.block_cache = rocksdb::NewLRUCache( | |
315 | block_cache_size, | |
316 | g_conf->rocksdb_cache_shard_bits); | |
317 | } else if (g_conf->rocksdb_cache_type == "clock") { | |
318 | bbt_opts.block_cache = rocksdb::NewClockCache( | |
319 | block_cache_size, | |
320 | g_conf->rocksdb_cache_shard_bits); | |
321 | } else { | |
322 | derr << "unrecognized rocksdb_cache_type '" << g_conf->rocksdb_cache_type | |
323 | << "'" << dendl; | |
324 | return -EINVAL; | |
325 | } | |
31f18b77 | 326 | } |
7c673cae | 327 | bbt_opts.block_size = g_conf->rocksdb_block_size; |
31f18b77 | 328 | |
224ce89b WB |
329 | if (row_cache_size > 0) |
330 | opt.row_cache = rocksdb::NewLRUCache(row_cache_size, | |
31f18b77 FG |
331 | g_conf->rocksdb_cache_shard_bits); |
332 | ||
7c673cae FG |
333 | if (g_conf->kstore_rocksdb_bloom_bits_per_key > 0) { |
334 | dout(10) << __func__ << " set bloom filter bits per key to " | |
335 | << g_conf->kstore_rocksdb_bloom_bits_per_key << dendl; | |
31f18b77 FG |
336 | bbt_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy( |
337 | g_conf->kstore_rocksdb_bloom_bits_per_key)); | |
7c673cae FG |
338 | } |
339 | opt.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbt_opts)); | |
31f18b77 FG |
340 | dout(10) << __func__ << " block size " << g_conf->rocksdb_block_size |
341 | << ", block_cache size " << prettybyte_t(block_cache_size) | |
342 | << ", row_cache size " << prettybyte_t(row_cache_size) | |
343 | << "; shards " | |
344 | << (1 << g_conf->rocksdb_cache_shard_bits) | |
345 | << ", type " << g_conf->rocksdb_cache_type | |
346 | << dendl; | |
7c673cae FG |
347 | |
348 | opt.merge_operator.reset(new MergeOperatorRouter(*this)); | |
349 | status = rocksdb::DB::Open(opt, path, &db); | |
350 | if (!status.ok()) { | |
351 | derr << status.ToString() << dendl; | |
352 | return -EINVAL; | |
353 | } | |
354 | ||
355 | PerfCountersBuilder plb(g_ceph_context, "rocksdb", l_rocksdb_first, l_rocksdb_last); | |
356 | plb.add_u64_counter(l_rocksdb_gets, "get", "Gets"); | |
357 | plb.add_u64_counter(l_rocksdb_txns, "submit_transaction", "Submit transactions"); | |
358 | plb.add_u64_counter(l_rocksdb_txns_sync, "submit_transaction_sync", "Submit transactions sync"); | |
359 | plb.add_time_avg(l_rocksdb_get_latency, "get_latency", "Get latency"); | |
360 | plb.add_time_avg(l_rocksdb_submit_latency, "submit_latency", "Submit Latency"); | |
361 | plb.add_time_avg(l_rocksdb_submit_sync_latency, "submit_sync_latency", "Submit Sync Latency"); | |
362 | plb.add_u64_counter(l_rocksdb_compact, "compact", "Compactions"); | |
363 | plb.add_u64_counter(l_rocksdb_compact_range, "compact_range", "Compactions by range"); | |
364 | plb.add_u64_counter(l_rocksdb_compact_queue_merge, "compact_queue_merge", "Mergings of ranges in compaction queue"); | |
365 | plb.add_u64(l_rocksdb_compact_queue_len, "compact_queue_len", "Length of compaction queue"); | |
366 | plb.add_time_avg(l_rocksdb_write_wal_time, "rocksdb_write_wal_time", "Rocksdb write wal time"); | |
367 | plb.add_time_avg(l_rocksdb_write_memtable_time, "rocksdb_write_memtable_time", "Rocksdb write memtable time"); | |
368 | plb.add_time_avg(l_rocksdb_write_delay_time, "rocksdb_write_delay_time", "Rocksdb write delay time"); | |
369 | plb.add_time_avg(l_rocksdb_write_pre_and_post_process_time, | |
370 | "rocksdb_write_pre_and_post_time", "total time spent on writing a record, excluding write process"); | |
371 | logger = plb.create_perf_counters(); | |
372 | cct->get_perfcounters_collection()->add(logger); | |
373 | ||
374 | if (compact_on_mount) { | |
375 | derr << "Compacting rocksdb store..." << dendl; | |
376 | compact(); | |
377 | derr << "Finished compacting rocksdb store" << dendl; | |
378 | } | |
379 | return 0; | |
380 | } | |
381 | ||
382 | int RocksDBStore::_test_init(const string& dir) | |
383 | { | |
384 | rocksdb::Options options; | |
385 | options.create_if_missing = true; | |
386 | rocksdb::DB *db; | |
387 | rocksdb::Status status = rocksdb::DB::Open(options, dir, &db); | |
388 | delete db; | |
389 | db = nullptr; | |
390 | return status.ok() ? 0 : -EIO; | |
391 | } | |
392 | ||
393 | RocksDBStore::~RocksDBStore() | |
394 | { | |
395 | close(); | |
396 | delete logger; | |
397 | ||
398 | // Ensure db is destroyed before dependent db_cache and filterpolicy | |
399 | delete db; | |
400 | db = nullptr; | |
401 | ||
402 | if (priv) { | |
403 | delete static_cast<rocksdb::Env*>(priv); | |
404 | } | |
405 | } | |
406 | ||
407 | void RocksDBStore::close() | |
408 | { | |
409 | // stop compaction thread | |
410 | compact_queue_lock.Lock(); | |
411 | if (compact_thread.is_started()) { | |
412 | compact_queue_stop = true; | |
413 | compact_queue_cond.Signal(); | |
414 | compact_queue_lock.Unlock(); | |
415 | compact_thread.join(); | |
416 | } else { | |
417 | compact_queue_lock.Unlock(); | |
418 | } | |
419 | ||
420 | if (logger) | |
421 | cct->get_perfcounters_collection()->remove(logger); | |
422 | } | |
423 | ||
424 | void RocksDBStore::split_stats(const std::string &s, char delim, std::vector<std::string> &elems) { | |
425 | std::stringstream ss; | |
426 | ss.str(s); | |
427 | std::string item; | |
428 | while (std::getline(ss, item, delim)) { | |
429 | elems.push_back(item); | |
430 | } | |
431 | } | |
432 | ||
433 | void RocksDBStore::get_statistics(Formatter *f) | |
434 | { | |
435 | if (!g_conf->rocksdb_perf) { | |
436 | dout(20) << __func__ << "RocksDB perf is disabled, can't probe for stats" | |
437 | << dendl; | |
438 | return; | |
439 | } | |
440 | ||
441 | if (g_conf->rocksdb_collect_compaction_stats) { | |
442 | std::string stat_str; | |
443 | bool status = db->GetProperty("rocksdb.stats", &stat_str); | |
444 | if (status) { | |
445 | f->open_object_section("rocksdb_statistics"); | |
446 | f->dump_string("rocksdb_compaction_statistics", ""); | |
447 | vector<string> stats; | |
448 | split_stats(stat_str, '\n', stats); | |
449 | for (auto st :stats) { | |
450 | f->dump_string("", st); | |
451 | } | |
452 | f->close_section(); | |
453 | } | |
454 | } | |
455 | if (g_conf->rocksdb_collect_extended_stats) { | |
456 | if (dbstats) { | |
457 | f->open_object_section("rocksdb_extended_statistics"); | |
458 | string stat_str = dbstats->ToString(); | |
459 | vector<string> stats; | |
460 | split_stats(stat_str, '\n', stats); | |
461 | f->dump_string("rocksdb_extended_statistics", ""); | |
462 | for (auto st :stats) { | |
463 | f->dump_string(".", st); | |
464 | } | |
465 | f->close_section(); | |
466 | } | |
467 | f->open_object_section("rocksdbstore_perf_counters"); | |
468 | logger->dump_formatted(f,0); | |
469 | f->close_section(); | |
470 | } | |
471 | if (g_conf->rocksdb_collect_memory_stats) { | |
472 | f->open_object_section("rocksdb_memtable_statistics"); | |
473 | std::string str(stringify(bbt_opts.block_cache->GetUsage())); | |
474 | f->dump_string("block_cache_usage", str.data()); | |
475 | str.clear(); | |
476 | str.append(stringify(bbt_opts.block_cache->GetPinnedUsage())); | |
477 | f->dump_string("block_cache_pinned_blocks_usage", str); | |
478 | str.clear(); | |
479 | db->GetProperty("rocksdb.cur-size-all-mem-tables", &str); | |
480 | f->dump_string("rocksdb_memtable_usage", str); | |
481 | f->close_section(); | |
482 | } | |
483 | } | |
484 | ||
485 | int RocksDBStore::submit_transaction(KeyValueDB::Transaction t) | |
486 | { | |
487 | utime_t start = ceph_clock_now(); | |
488 | // enable rocksdb breakdown | |
489 | // considering performance overhead, default is disabled | |
490 | if (g_conf->rocksdb_perf) { | |
491 | rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex); | |
492 | rocksdb::perf_context.Reset(); | |
493 | } | |
494 | ||
495 | RocksDBTransactionImpl * _t = | |
496 | static_cast<RocksDBTransactionImpl *>(t.get()); | |
497 | rocksdb::WriteOptions woptions; | |
498 | woptions.disableWAL = disableWAL; | |
499 | lgeneric_subdout(cct, rocksdb, 30) << __func__; | |
500 | RocksWBHandler bat_txc; | |
501 | _t->bat.Iterate(&bat_txc); | |
502 | *_dout << " Rocksdb transaction: " << bat_txc.seen << dendl; | |
503 | ||
504 | rocksdb::Status s = db->Write(woptions, &_t->bat); | |
505 | if (!s.ok()) { | |
506 | RocksWBHandler rocks_txc; | |
507 | _t->bat.Iterate(&rocks_txc); | |
508 | derr << __func__ << " error: " << s.ToString() << " code = " << s.code() | |
509 | << " Rocksdb transaction: " << rocks_txc.seen << dendl; | |
510 | } | |
511 | utime_t lat = ceph_clock_now() - start; | |
512 | ||
513 | if (g_conf->rocksdb_perf) { | |
514 | utime_t write_memtable_time; | |
515 | utime_t write_delay_time; | |
516 | utime_t write_wal_time; | |
517 | utime_t write_pre_and_post_process_time; | |
518 | write_wal_time.set_from_double( | |
519 | static_cast<double>(rocksdb::perf_context.write_wal_time)/1000000000); | |
520 | write_memtable_time.set_from_double( | |
521 | static_cast<double>(rocksdb::perf_context.write_memtable_time)/1000000000); | |
522 | write_delay_time.set_from_double( | |
523 | static_cast<double>(rocksdb::perf_context.write_delay_time)/1000000000); | |
524 | write_pre_and_post_process_time.set_from_double( | |
525 | static_cast<double>(rocksdb::perf_context.write_pre_and_post_process_time)/1000000000); | |
526 | logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time); | |
527 | logger->tinc(l_rocksdb_write_delay_time, write_delay_time); | |
528 | logger->tinc(l_rocksdb_write_wal_time, write_wal_time); | |
529 | logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time); | |
530 | } | |
531 | ||
532 | logger->inc(l_rocksdb_txns); | |
533 | logger->tinc(l_rocksdb_submit_latency, lat); | |
534 | ||
535 | return s.ok() ? 0 : -1; | |
536 | } | |
537 | ||
538 | int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t) | |
539 | { | |
540 | utime_t start = ceph_clock_now(); | |
541 | // enable rocksdb breakdown | |
542 | // considering performance overhead, default is disabled | |
543 | if (g_conf->rocksdb_perf) { | |
544 | rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex); | |
545 | rocksdb::perf_context.Reset(); | |
546 | } | |
547 | ||
548 | RocksDBTransactionImpl * _t = | |
549 | static_cast<RocksDBTransactionImpl *>(t.get()); | |
550 | rocksdb::WriteOptions woptions; | |
551 | woptions.sync = true; | |
552 | woptions.disableWAL = disableWAL; | |
553 | lgeneric_subdout(cct, rocksdb, 30) << __func__; | |
554 | RocksWBHandler bat_txc; | |
555 | _t->bat.Iterate(&bat_txc); | |
556 | *_dout << " Rocksdb transaction: " << bat_txc.seen << dendl; | |
557 | ||
558 | rocksdb::Status s = db->Write(woptions, &_t->bat); | |
559 | if (!s.ok()) { | |
560 | RocksWBHandler rocks_txc; | |
561 | _t->bat.Iterate(&rocks_txc); | |
562 | derr << __func__ << " error: " << s.ToString() << " code = " << s.code() | |
563 | << " Rocksdb transaction: " << rocks_txc.seen << dendl; | |
564 | } | |
565 | utime_t lat = ceph_clock_now() - start; | |
566 | ||
567 | if (g_conf->rocksdb_perf) { | |
568 | utime_t write_memtable_time; | |
569 | utime_t write_delay_time; | |
570 | utime_t write_wal_time; | |
571 | utime_t write_pre_and_post_process_time; | |
572 | write_wal_time.set_from_double( | |
573 | static_cast<double>(rocksdb::perf_context.write_wal_time)/1000000000); | |
574 | write_memtable_time.set_from_double( | |
575 | static_cast<double>(rocksdb::perf_context.write_memtable_time)/1000000000); | |
576 | write_delay_time.set_from_double( | |
577 | static_cast<double>(rocksdb::perf_context.write_delay_time)/1000000000); | |
578 | write_pre_and_post_process_time.set_from_double( | |
579 | static_cast<double>(rocksdb::perf_context.write_pre_and_post_process_time)/1000000000); | |
580 | logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time); | |
581 | logger->tinc(l_rocksdb_write_delay_time, write_delay_time); | |
582 | logger->tinc(l_rocksdb_write_wal_time, write_wal_time); | |
583 | logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time); | |
584 | } | |
585 | ||
586 | logger->inc(l_rocksdb_txns_sync); | |
587 | logger->tinc(l_rocksdb_submit_sync_latency, lat); | |
588 | ||
589 | return s.ok() ? 0 : -1; | |
590 | } | |
591 | ||
592 | RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db) | |
593 | { | |
594 | db = _db; | |
595 | } | |
596 | ||
597 | void RocksDBStore::RocksDBTransactionImpl::set( | |
598 | const string &prefix, | |
599 | const string &k, | |
600 | const bufferlist &to_set_bl) | |
601 | { | |
602 | string key = combine_strings(prefix, k); | |
603 | ||
604 | // bufferlist::c_str() is non-constant, so we can't call c_str() | |
605 | if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) { | |
606 | bat.Put(rocksdb::Slice(key), | |
607 | rocksdb::Slice(to_set_bl.buffers().front().c_str(), | |
608 | to_set_bl.length())); | |
609 | } else { | |
31f18b77 FG |
610 | rocksdb::Slice key_slice(key); |
611 | rocksdb::Slice value_slices[to_set_bl.buffers().size()]; | |
612 | bat.Put(nullptr, rocksdb::SliceParts(&key_slice, 1), | |
613 | prepare_sliceparts(to_set_bl, value_slices)); | |
7c673cae FG |
614 | } |
615 | } | |
616 | ||
617 | void RocksDBStore::RocksDBTransactionImpl::set( | |
618 | const string &prefix, | |
619 | const char *k, size_t keylen, | |
620 | const bufferlist &to_set_bl) | |
621 | { | |
622 | string key; | |
623 | combine_strings(prefix, k, keylen, &key); | |
624 | ||
625 | // bufferlist::c_str() is non-constant, so we can't call c_str() | |
626 | if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) { | |
627 | bat.Put(rocksdb::Slice(key), | |
628 | rocksdb::Slice(to_set_bl.buffers().front().c_str(), | |
629 | to_set_bl.length())); | |
630 | } else { | |
31f18b77 FG |
631 | rocksdb::Slice key_slice(key); |
632 | rocksdb::Slice value_slices[to_set_bl.buffers().size()]; | |
633 | bat.Put(nullptr, rocksdb::SliceParts(&key_slice, 1), | |
634 | prepare_sliceparts(to_set_bl, value_slices)); | |
7c673cae FG |
635 | } |
636 | } | |
637 | ||
638 | void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix, | |
639 | const string &k) | |
640 | { | |
641 | bat.Delete(combine_strings(prefix, k)); | |
642 | } | |
643 | ||
644 | void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix, | |
645 | const char *k, | |
646 | size_t keylen) | |
647 | { | |
648 | string key; | |
649 | combine_strings(prefix, k, keylen, &key); | |
650 | bat.Delete(key); | |
651 | } | |
652 | ||
653 | void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix, | |
654 | const string &k) | |
655 | { | |
656 | bat.SingleDelete(combine_strings(prefix, k)); | |
657 | } | |
658 | ||
659 | void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix) | |
660 | { | |
31f18b77 FG |
661 | if (db->enable_rmrange) { |
662 | string endprefix = prefix; | |
663 | endprefix.push_back('\x01'); | |
664 | bat.DeleteRange(combine_strings(prefix, string()), | |
665 | combine_strings(endprefix, string())); | |
666 | } else { | |
667 | KeyValueDB::Iterator it = db->get_iterator(prefix); | |
668 | for (it->seek_to_first(); | |
669 | it->valid(); | |
670 | it->next()) { | |
671 | bat.Delete(combine_strings(prefix, it->key())); | |
672 | } | |
7c673cae FG |
673 | } |
674 | } | |
675 | ||
676 | void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix, | |
677 | const string &start, | |
678 | const string &end) | |
679 | { | |
680 | if (db->enable_rmrange) { | |
681 | bat.DeleteRange(combine_strings(prefix, start), combine_strings(prefix, end)); | |
682 | } else { | |
683 | auto it = db->get_iterator(prefix); | |
684 | it->lower_bound(start); | |
685 | while (it->valid()) { | |
686 | if (it->key() >= end) { | |
687 | break; | |
688 | } | |
689 | bat.Delete(combine_strings(prefix, it->key())); | |
690 | it->next(); | |
691 | } | |
692 | } | |
693 | } | |
694 | ||
695 | void RocksDBStore::RocksDBTransactionImpl::merge( | |
696 | const string &prefix, | |
697 | const string &k, | |
698 | const bufferlist &to_set_bl) | |
699 | { | |
700 | string key = combine_strings(prefix, k); | |
701 | ||
702 | // bufferlist::c_str() is non-constant, so we can't call c_str() | |
703 | if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) { | |
704 | bat.Merge(rocksdb::Slice(key), | |
705 | rocksdb::Slice(to_set_bl.buffers().front().c_str(), | |
706 | to_set_bl.length())); | |
707 | } else { | |
708 | // make a copy | |
31f18b77 FG |
709 | rocksdb::Slice key_slice(key); |
710 | rocksdb::Slice value_slices[to_set_bl.buffers().size()]; | |
711 | bat.Merge(nullptr, rocksdb::SliceParts(&key_slice, 1), | |
712 | prepare_sliceparts(to_set_bl, value_slices)); | |
7c673cae FG |
713 | } |
714 | } | |
715 | ||
716 | //gets will bypass RocksDB row cache, since it uses iterator | |
717 | int RocksDBStore::get( | |
718 | const string &prefix, | |
719 | const std::set<string> &keys, | |
720 | std::map<string, bufferlist> *out) | |
721 | { | |
722 | utime_t start = ceph_clock_now(); | |
723 | for (std::set<string>::const_iterator i = keys.begin(); | |
724 | i != keys.end(); ++i) { | |
725 | std::string value; | |
726 | std::string bound = combine_strings(prefix, *i); | |
727 | auto status = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(bound), &value); | |
224ce89b | 728 | if (status.ok()) { |
7c673cae | 729 | (*out)[*i].append(value); |
224ce89b WB |
730 | } else if (status.IsIOError()) { |
731 | ceph_abort_msg(cct, status.ToString()); | |
732 | } | |
733 | ||
7c673cae FG |
734 | } |
735 | utime_t lat = ceph_clock_now() - start; | |
736 | logger->inc(l_rocksdb_gets); | |
737 | logger->tinc(l_rocksdb_get_latency, lat); | |
738 | return 0; | |
739 | } | |
740 | ||
741 | int RocksDBStore::get( | |
742 | const string &prefix, | |
743 | const string &key, | |
744 | bufferlist *out) | |
745 | { | |
746 | assert(out && (out->length() == 0)); | |
747 | utime_t start = ceph_clock_now(); | |
748 | int r = 0; | |
749 | string value, k; | |
750 | rocksdb::Status s; | |
751 | k = combine_strings(prefix, key); | |
752 | s = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(k), &value); | |
753 | if (s.ok()) { | |
754 | out->append(value); | |
224ce89b | 755 | } else if (s.IsNotFound()) { |
7c673cae | 756 | r = -ENOENT; |
224ce89b WB |
757 | } else { |
758 | ceph_abort_msg(cct, s.ToString()); | |
7c673cae FG |
759 | } |
760 | utime_t lat = ceph_clock_now() - start; | |
761 | logger->inc(l_rocksdb_gets); | |
762 | logger->tinc(l_rocksdb_get_latency, lat); | |
763 | return r; | |
764 | } | |
765 | ||
766 | int RocksDBStore::get( | |
767 | const string& prefix, | |
768 | const char *key, | |
769 | size_t keylen, | |
770 | bufferlist *out) | |
771 | { | |
772 | assert(out && (out->length() == 0)); | |
773 | utime_t start = ceph_clock_now(); | |
774 | int r = 0; | |
775 | string value, k; | |
776 | combine_strings(prefix, key, keylen, &k); | |
777 | rocksdb::Status s; | |
778 | s = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(k), &value); | |
779 | if (s.ok()) { | |
780 | out->append(value); | |
224ce89b | 781 | } else if (s.IsNotFound()) { |
7c673cae | 782 | r = -ENOENT; |
224ce89b WB |
783 | } else { |
784 | ceph_abort_msg(cct, s.ToString()); | |
7c673cae FG |
785 | } |
786 | utime_t lat = ceph_clock_now() - start; | |
787 | logger->inc(l_rocksdb_gets); | |
788 | logger->tinc(l_rocksdb_get_latency, lat); | |
789 | return r; | |
790 | } | |
791 | ||
792 | int RocksDBStore::split_key(rocksdb::Slice in, string *prefix, string *key) | |
793 | { | |
794 | size_t prefix_len = 0; | |
795 | ||
796 | // Find separator inside Slice | |
797 | char* separator = (char*) memchr(in.data(), 0, in.size()); | |
798 | if (separator == NULL) | |
799 | return -EINVAL; | |
800 | prefix_len = size_t(separator - in.data()); | |
801 | if (prefix_len >= in.size()) | |
802 | return -EINVAL; | |
803 | ||
804 | // Fetch prefix and/or key directly from Slice | |
805 | if (prefix) | |
806 | *prefix = string(in.data(), prefix_len); | |
807 | if (key) | |
808 | *key = string(separator+1, in.size()-prefix_len-1); | |
809 | return 0; | |
810 | } | |
811 | ||
812 | void RocksDBStore::compact() | |
813 | { | |
814 | logger->inc(l_rocksdb_compact); | |
815 | rocksdb::CompactRangeOptions options; | |
816 | db->CompactRange(options, nullptr, nullptr); | |
817 | } | |
818 | ||
819 | ||
820 | void RocksDBStore::compact_thread_entry() | |
821 | { | |
822 | compact_queue_lock.Lock(); | |
823 | while (!compact_queue_stop) { | |
824 | while (!compact_queue.empty()) { | |
825 | pair<string,string> range = compact_queue.front(); | |
826 | compact_queue.pop_front(); | |
827 | logger->set(l_rocksdb_compact_queue_len, compact_queue.size()); | |
828 | compact_queue_lock.Unlock(); | |
829 | logger->inc(l_rocksdb_compact_range); | |
830 | compact_range(range.first, range.second); | |
831 | compact_queue_lock.Lock(); | |
832 | continue; | |
833 | } | |
834 | compact_queue_cond.Wait(compact_queue_lock); | |
835 | } | |
836 | compact_queue_lock.Unlock(); | |
837 | } | |
838 | ||
839 | void RocksDBStore::compact_range_async(const string& start, const string& end) | |
840 | { | |
841 | Mutex::Locker l(compact_queue_lock); | |
842 | ||
843 | // try to merge adjacent ranges. this is O(n), but the queue should | |
844 | // be short. note that we do not cover all overlap cases and merge | |
845 | // opportunities here, but we capture the ones we currently need. | |
846 | list< pair<string,string> >::iterator p = compact_queue.begin(); | |
847 | while (p != compact_queue.end()) { | |
848 | if (p->first == start && p->second == end) { | |
849 | // dup; no-op | |
850 | return; | |
851 | } | |
852 | if (p->first <= end && p->first > start) { | |
853 | // merge with existing range to the right | |
854 | compact_queue.push_back(make_pair(start, p->second)); | |
855 | compact_queue.erase(p); | |
856 | logger->inc(l_rocksdb_compact_queue_merge); | |
857 | break; | |
858 | } | |
859 | if (p->second >= start && p->second < end) { | |
860 | // merge with existing range to the left | |
861 | compact_queue.push_back(make_pair(p->first, end)); | |
862 | compact_queue.erase(p); | |
863 | logger->inc(l_rocksdb_compact_queue_merge); | |
864 | break; | |
865 | } | |
866 | ++p; | |
867 | } | |
868 | if (p == compact_queue.end()) { | |
869 | // no merge, new entry. | |
870 | compact_queue.push_back(make_pair(start, end)); | |
871 | logger->set(l_rocksdb_compact_queue_len, compact_queue.size()); | |
872 | } | |
873 | compact_queue_cond.Signal(); | |
874 | if (!compact_thread.is_started()) { | |
875 | compact_thread.create("rstore_compact"); | |
876 | } | |
877 | } | |
878 | bool RocksDBStore::check_omap_dir(string &omap_dir) | |
879 | { | |
880 | rocksdb::Options options; | |
881 | options.create_if_missing = true; | |
882 | rocksdb::DB *db; | |
883 | rocksdb::Status status = rocksdb::DB::Open(options, omap_dir, &db); | |
884 | delete db; | |
885 | db = nullptr; | |
886 | return status.ok(); | |
887 | } | |
888 | void RocksDBStore::compact_range(const string& start, const string& end) | |
889 | { | |
890 | rocksdb::CompactRangeOptions options; | |
891 | rocksdb::Slice cstart(start); | |
892 | rocksdb::Slice cend(end); | |
893 | db->CompactRange(options, &cstart, &cend); | |
894 | } | |
895 | RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl() | |
896 | { | |
897 | delete dbiter; | |
898 | } | |
899 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first() | |
900 | { | |
901 | dbiter->SeekToFirst(); | |
224ce89b | 902 | assert(!dbiter->status().IsIOError()); |
7c673cae FG |
903 | return dbiter->status().ok() ? 0 : -1; |
904 | } | |
905 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string &prefix) | |
906 | { | |
907 | rocksdb::Slice slice_prefix(prefix); | |
908 | dbiter->Seek(slice_prefix); | |
224ce89b | 909 | assert(!dbiter->status().IsIOError()); |
7c673cae FG |
910 | return dbiter->status().ok() ? 0 : -1; |
911 | } | |
912 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last() | |
913 | { | |
914 | dbiter->SeekToLast(); | |
224ce89b | 915 | assert(!dbiter->status().IsIOError()); |
7c673cae FG |
916 | return dbiter->status().ok() ? 0 : -1; |
917 | } | |
918 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string &prefix) | |
919 | { | |
920 | string limit = past_prefix(prefix); | |
921 | rocksdb::Slice slice_limit(limit); | |
922 | dbiter->Seek(slice_limit); | |
923 | ||
924 | if (!dbiter->Valid()) { | |
925 | dbiter->SeekToLast(); | |
926 | } else { | |
927 | dbiter->Prev(); | |
928 | } | |
929 | return dbiter->status().ok() ? 0 : -1; | |
930 | } | |
931 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after) | |
932 | { | |
933 | lower_bound(prefix, after); | |
934 | if (valid()) { | |
935 | pair<string,string> key = raw_key(); | |
936 | if (key.first == prefix && key.second == after) | |
937 | next(); | |
938 | } | |
939 | return dbiter->status().ok() ? 0 : -1; | |
940 | } | |
941 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to) | |
942 | { | |
943 | string bound = combine_strings(prefix, to); | |
944 | rocksdb::Slice slice_bound(bound); | |
945 | dbiter->Seek(slice_bound); | |
946 | return dbiter->status().ok() ? 0 : -1; | |
947 | } | |
948 | bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid() | |
949 | { | |
950 | return dbiter->Valid(); | |
951 | } | |
952 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next() | |
953 | { | |
954 | if (valid()) { | |
955 | dbiter->Next(); | |
956 | } | |
224ce89b | 957 | assert(!dbiter->status().IsIOError()); |
7c673cae FG |
958 | return dbiter->status().ok() ? 0 : -1; |
959 | } | |
960 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev() | |
961 | { | |
962 | if (valid()) { | |
963 | dbiter->Prev(); | |
964 | } | |
224ce89b | 965 | assert(!dbiter->status().IsIOError()); |
7c673cae FG |
966 | return dbiter->status().ok() ? 0 : -1; |
967 | } | |
968 | string RocksDBStore::RocksDBWholeSpaceIteratorImpl::key() | |
969 | { | |
970 | string out_key; | |
971 | split_key(dbiter->key(), 0, &out_key); | |
972 | return out_key; | |
973 | } | |
974 | pair<string,string> RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key() | |
975 | { | |
976 | string prefix, key; | |
977 | split_key(dbiter->key(), &prefix, &key); | |
978 | return make_pair(prefix, key); | |
979 | } | |
980 | ||
981 | bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key_is_prefixed(const string &prefix) { | |
982 | // Look for "prefix\0" right in rocksb::Slice | |
983 | rocksdb::Slice key = dbiter->key(); | |
984 | if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) { | |
985 | return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0; | |
986 | } else { | |
987 | return false; | |
988 | } | |
989 | } | |
990 | ||
991 | bufferlist RocksDBStore::RocksDBWholeSpaceIteratorImpl::value() | |
992 | { | |
993 | return to_bufferlist(dbiter->value()); | |
994 | } | |
995 | ||
996 | size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::key_size() | |
997 | { | |
998 | return dbiter->key().size(); | |
999 | } | |
1000 | ||
1001 | size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_size() | |
1002 | { | |
1003 | return dbiter->value().size(); | |
1004 | } | |
1005 | ||
1006 | bufferptr RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_as_ptr() | |
1007 | { | |
1008 | rocksdb::Slice val = dbiter->value(); | |
1009 | return bufferptr(val.data(), val.size()); | |
1010 | } | |
1011 | ||
1012 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status() | |
1013 | { | |
1014 | return dbiter->status().ok() ? 0 : -1; | |
1015 | } | |
1016 | ||
1017 | string RocksDBStore::past_prefix(const string &prefix) | |
1018 | { | |
1019 | string limit = prefix; | |
1020 | limit.push_back(1); | |
1021 | return limit; | |
1022 | } | |
1023 | ||
1024 | RocksDBStore::WholeSpaceIterator RocksDBStore::_get_iterator() | |
1025 | { | |
1026 | return std::make_shared<RocksDBWholeSpaceIteratorImpl>( | |
1027 | db->NewIterator(rocksdb::ReadOptions())); | |
1028 | } | |
1029 |