]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include <set> | |
5 | #include <map> | |
6 | #include <string> | |
7 | #include <memory> | |
8 | #include <errno.h> | |
9 | #include <unistd.h> | |
10 | #include <sys/types.h> | |
11 | #include <sys/stat.h> | |
12 | ||
13 | #include "rocksdb/db.h" | |
14 | #include "rocksdb/table.h" | |
15 | #include "rocksdb/env.h" | |
16 | #include "rocksdb/slice.h" | |
17 | #include "rocksdb/cache.h" | |
18 | #include "rocksdb/filter_policy.h" | |
19 | #include "rocksdb/utilities/convenience.h" | |
20 | #include "rocksdb/merge_operator.h" | |
21 | using std::string; | |
22 | #include "common/perf_counters.h" | |
23 | #include "common/debug.h" | |
24 | #include "include/str_list.h" | |
25 | #include "include/stringify.h" | |
26 | #include "include/str_map.h" | |
27 | #include "KeyValueDB.h" | |
28 | #include "RocksDBStore.h" | |
29 | ||
30 | #include "common/debug.h" | |
31 | ||
32 | #define dout_context cct | |
33 | #define dout_subsys ceph_subsys_rocksdb | |
34 | #undef dout_prefix | |
35 | #define dout_prefix *_dout << "rocksdb: " | |
36 | ||
31f18b77 FG |
37 | static rocksdb::SliceParts prepare_sliceparts(const bufferlist &bl, rocksdb::Slice *slices) |
38 | { | |
39 | unsigned n = 0; | |
40 | for (std::list<buffer::ptr>::const_iterator p = bl.buffers().begin(); | |
41 | p != bl.buffers().end(); ++p, ++n) { | |
42 | slices[n].data_ = p->c_str(); | |
43 | slices[n].size_ = p->length(); | |
44 | } | |
45 | return rocksdb::SliceParts(slices, n); | |
46 | } | |
47 | ||
7c673cae FG |
48 | // |
49 | // One of these per rocksdb instance, implements the merge operator prefix stuff | |
50 | // | |
51 | class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperator { | |
52 | RocksDBStore& store; | |
53 | public: | |
54 | const char *Name() const override { | |
55 | // Construct a name that rocksDB will validate against. We want to | |
56 | // do this in a way that doesn't constrain the ordering of calls | |
57 | // to set_merge_operator, so sort the merge operators and then | |
58 | // construct a name from all of those parts. | |
59 | store.assoc_name.clear(); | |
60 | map<std::string,std::string> names; | |
61 | for (auto& p : store.merge_ops) names[p.first] = p.second->name(); | |
62 | for (auto& p : names) { | |
63 | store.assoc_name += '.'; | |
64 | store.assoc_name += p.first; | |
65 | store.assoc_name += ':'; | |
66 | store.assoc_name += p.second; | |
67 | } | |
68 | return store.assoc_name.c_str(); | |
69 | } | |
70 | ||
71 | MergeOperatorRouter(RocksDBStore &_store) : store(_store) {} | |
72 | ||
73 | bool Merge(const rocksdb::Slice& key, | |
74 | const rocksdb::Slice* existing_value, | |
75 | const rocksdb::Slice& value, | |
76 | std::string* new_value, | |
77 | rocksdb::Logger* logger) const override { | |
78 | // Check each prefix | |
79 | for (auto& p : store.merge_ops) { | |
80 | if (p.first.compare(0, p.first.length(), | |
81 | key.data(), p.first.length()) == 0 && | |
82 | key.data()[p.first.length()] == 0) { | |
83 | if (existing_value) { | |
84 | p.second->merge(existing_value->data(), existing_value->size(), | |
85 | value.data(), value.size(), | |
86 | new_value); | |
87 | } else { | |
88 | p.second->merge_nonexistent(value.data(), value.size(), new_value); | |
89 | } | |
90 | break; | |
91 | } | |
92 | } | |
93 | return true; // OK :) | |
94 | } | |
95 | ||
96 | }; | |
97 | ||
98 | int RocksDBStore::set_merge_operator( | |
99 | const string& prefix, | |
100 | std::shared_ptr<KeyValueDB::MergeOperator> mop) | |
101 | { | |
102 | // If you fail here, it's because you can't do this on an open database | |
103 | assert(db == nullptr); | |
104 | merge_ops.push_back(std::make_pair(prefix,mop)); | |
105 | return 0; | |
106 | } | |
107 | ||
108 | class CephRocksdbLogger : public rocksdb::Logger { | |
109 | CephContext *cct; | |
110 | public: | |
111 | explicit CephRocksdbLogger(CephContext *c) : cct(c) { | |
112 | cct->get(); | |
113 | } | |
114 | ~CephRocksdbLogger() override { | |
115 | cct->put(); | |
116 | } | |
117 | ||
118 | // Write an entry to the log file with the specified format. | |
119 | void Logv(const char* format, va_list ap) override { | |
120 | Logv(rocksdb::INFO_LEVEL, format, ap); | |
121 | } | |
122 | ||
123 | // Write an entry to the log file with the specified log level | |
124 | // and format. Any log with level under the internal log level | |
125 | // of *this (see @SetInfoLogLevel and @GetInfoLogLevel) will not be | |
126 | // printed. | |
127 | void Logv(const rocksdb::InfoLogLevel log_level, const char* format, | |
128 | va_list ap) override { | |
129 | int v = rocksdb::NUM_INFO_LOG_LEVELS - log_level - 1; | |
130 | dout(v); | |
131 | char buf[65536]; | |
132 | vsnprintf(buf, sizeof(buf), format, ap); | |
133 | *_dout << buf << dendl; | |
134 | } | |
135 | }; | |
136 | ||
137 | rocksdb::Logger *create_rocksdb_ceph_logger() | |
138 | { | |
139 | return new CephRocksdbLogger(g_ceph_context); | |
140 | } | |
141 | ||
142 | int string2bool(string val, bool &b_val) | |
143 | { | |
144 | if (strcasecmp(val.c_str(), "false") == 0) { | |
145 | b_val = false; | |
146 | return 0; | |
147 | } else if (strcasecmp(val.c_str(), "true") == 0) { | |
148 | b_val = true; | |
149 | return 0; | |
150 | } else { | |
151 | std::string err; | |
152 | int b = strict_strtol(val.c_str(), 10, &err); | |
153 | if (!err.empty()) | |
154 | return -EINVAL; | |
155 | b_val = !!b; | |
156 | return 0; | |
157 | } | |
158 | } | |
159 | ||
160 | int RocksDBStore::tryInterpret(const string key, const string val, rocksdb::Options &opt) | |
161 | { | |
162 | if (key == "compaction_threads") { | |
163 | std::string err; | |
164 | int f = strict_sistrtoll(val.c_str(), &err); | |
165 | if (!err.empty()) | |
166 | return -EINVAL; | |
167 | //Low priority threadpool is used for compaction | |
168 | opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::LOW); | |
169 | } else if (key == "flusher_threads") { | |
170 | std::string err; | |
171 | int f = strict_sistrtoll(val.c_str(), &err); | |
172 | if (!err.empty()) | |
173 | return -EINVAL; | |
174 | //High priority threadpool is used for flusher | |
175 | opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::HIGH); | |
176 | } else if (key == "compact_on_mount") { | |
177 | int ret = string2bool(val, compact_on_mount); | |
178 | if (ret != 0) | |
179 | return ret; | |
180 | } else if (key == "disableWAL") { | |
181 | int ret = string2bool(val, disableWAL); | |
182 | if (ret != 0) | |
183 | return ret; | |
184 | } else { | |
185 | //unrecognize config options. | |
186 | return -EINVAL; | |
187 | } | |
188 | return 0; | |
189 | } | |
190 | ||
191 | int RocksDBStore::ParseOptionsFromString(const string opt_str, rocksdb::Options &opt) | |
192 | { | |
193 | map<string, string> str_map; | |
194 | int r = get_str_map(opt_str, &str_map, ",\n;"); | |
195 | if (r < 0) | |
196 | return r; | |
197 | map<string, string>::iterator it; | |
198 | for(it = str_map.begin(); it != str_map.end(); ++it) { | |
199 | string this_opt = it->first + "=" + it->second; | |
200 | rocksdb::Status status = rocksdb::GetOptionsFromString(opt, this_opt , &opt); | |
201 | if (!status.ok()) { | |
202 | //unrecognized by rocksdb, try to interpret by ourselves. | |
203 | r = tryInterpret(it->first, it->second, opt); | |
204 | if (r < 0) { | |
205 | derr << status.ToString() << dendl; | |
206 | return -EINVAL; | |
207 | } | |
208 | } | |
209 | lgeneric_dout(cct, 0) << " set rocksdb option " << it->first | |
210 | << " = " << it->second << dendl; | |
211 | } | |
212 | return 0; | |
213 | } | |
214 | ||
215 | int RocksDBStore::init(string _options_str) | |
216 | { | |
217 | options_str = _options_str; | |
218 | rocksdb::Options opt; | |
219 | //try parse options | |
220 | if (options_str.length()) { | |
221 | int r = ParseOptionsFromString(options_str, opt); | |
222 | if (r != 0) { | |
223 | return -EINVAL; | |
224 | } | |
225 | } | |
226 | return 0; | |
227 | } | |
228 | ||
229 | int RocksDBStore::create_and_open(ostream &out) | |
230 | { | |
231 | if (env) { | |
232 | unique_ptr<rocksdb::Directory> dir; | |
233 | env->NewDirectory(path, &dir); | |
234 | } else { | |
235 | int r = ::mkdir(path.c_str(), 0755); | |
236 | if (r < 0) | |
237 | r = -errno; | |
238 | if (r < 0 && r != -EEXIST) { | |
239 | derr << __func__ << " failed to create " << path << ": " << cpp_strerror(r) | |
240 | << dendl; | |
241 | return r; | |
242 | } | |
243 | } | |
244 | return do_open(out, true); | |
245 | } | |
246 | ||
247 | int RocksDBStore::do_open(ostream &out, bool create_if_missing) | |
248 | { | |
249 | rocksdb::Options opt; | |
250 | rocksdb::Status status; | |
251 | ||
252 | if (options_str.length()) { | |
253 | int r = ParseOptionsFromString(options_str, opt); | |
254 | if (r != 0) { | |
255 | return -EINVAL; | |
256 | } | |
257 | } | |
258 | ||
259 | if (g_conf->rocksdb_perf) { | |
260 | dbstats = rocksdb::CreateDBStatistics(); | |
261 | opt.statistics = dbstats; | |
262 | } | |
263 | ||
264 | opt.create_if_missing = create_if_missing; | |
265 | if (g_conf->rocksdb_separate_wal_dir) { | |
266 | opt.wal_dir = path + ".wal"; | |
267 | } | |
268 | if (g_conf->get_val<std::string>("rocksdb_db_paths").length()) { | |
269 | list<string> paths; | |
270 | get_str_list(g_conf->get_val<std::string>("rocksdb_db_paths"), "; \t", paths); | |
271 | for (auto& p : paths) { | |
272 | size_t pos = p.find(','); | |
273 | if (pos == std::string::npos) { | |
274 | derr << __func__ << " invalid db path item " << p << " in " | |
275 | << g_conf->get_val<std::string>("rocksdb_db_paths") << dendl; | |
276 | return -EINVAL; | |
277 | } | |
278 | string path = p.substr(0, pos); | |
279 | string size_str = p.substr(pos + 1); | |
280 | uint64_t size = atoll(size_str.c_str()); | |
281 | if (!size) { | |
282 | derr << __func__ << " invalid db path item " << p << " in " | |
283 | << g_conf->get_val<std::string>("rocksdb_db_paths") << dendl; | |
284 | return -EINVAL; | |
285 | } | |
286 | opt.db_paths.push_back(rocksdb::DbPath(path, size)); | |
287 | dout(10) << __func__ << " db_path " << path << " size " << size << dendl; | |
288 | } | |
289 | } | |
290 | ||
291 | if (g_conf->rocksdb_log_to_ceph_log) { | |
292 | opt.info_log.reset(new CephRocksdbLogger(g_ceph_context)); | |
293 | } | |
294 | ||
295 | if (priv) { | |
296 | dout(10) << __func__ << " using custom Env " << priv << dendl; | |
297 | opt.env = static_cast<rocksdb::Env*>(priv); | |
298 | } | |
299 | ||
31f18b77 FG |
300 | // caches |
301 | if (!cache_size) { | |
302 | cache_size = g_conf->rocksdb_cache_size; | |
303 | } | |
304 | uint64_t row_cache_size = cache_size * g_conf->rocksdb_cache_row_ratio; | |
305 | uint64_t block_cache_size = cache_size - row_cache_size; | |
306 | if (g_conf->rocksdb_cache_type == "lru") { | |
307 | bbt_opts.block_cache = rocksdb::NewLRUCache( | |
308 | block_cache_size, | |
309 | g_conf->rocksdb_cache_shard_bits); | |
310 | } else if (g_conf->rocksdb_cache_type == "clock") { | |
311 | bbt_opts.block_cache = rocksdb::NewClockCache( | |
312 | block_cache_size, | |
313 | g_conf->rocksdb_cache_shard_bits); | |
314 | } else { | |
315 | derr << "unrecognized rocksdb_cache_type '" << g_conf->rocksdb_cache_type | |
316 | << "'" << dendl; | |
317 | return -EINVAL; | |
318 | } | |
7c673cae | 319 | bbt_opts.block_size = g_conf->rocksdb_block_size; |
31f18b77 FG |
320 | |
321 | opt.row_cache = rocksdb::NewLRUCache(row_cache_size, | |
322 | g_conf->rocksdb_cache_shard_bits); | |
323 | ||
7c673cae FG |
324 | if (g_conf->kstore_rocksdb_bloom_bits_per_key > 0) { |
325 | dout(10) << __func__ << " set bloom filter bits per key to " | |
326 | << g_conf->kstore_rocksdb_bloom_bits_per_key << dendl; | |
31f18b77 FG |
327 | bbt_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy( |
328 | g_conf->kstore_rocksdb_bloom_bits_per_key)); | |
7c673cae FG |
329 | } |
330 | opt.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbt_opts)); | |
31f18b77 FG |
331 | dout(10) << __func__ << " block size " << g_conf->rocksdb_block_size |
332 | << ", block_cache size " << prettybyte_t(block_cache_size) | |
333 | << ", row_cache size " << prettybyte_t(row_cache_size) | |
334 | << "; shards " | |
335 | << (1 << g_conf->rocksdb_cache_shard_bits) | |
336 | << ", type " << g_conf->rocksdb_cache_type | |
337 | << dendl; | |
7c673cae FG |
338 | |
339 | opt.merge_operator.reset(new MergeOperatorRouter(*this)); | |
340 | status = rocksdb::DB::Open(opt, path, &db); | |
341 | if (!status.ok()) { | |
342 | derr << status.ToString() << dendl; | |
343 | return -EINVAL; | |
344 | } | |
345 | ||
346 | PerfCountersBuilder plb(g_ceph_context, "rocksdb", l_rocksdb_first, l_rocksdb_last); | |
347 | plb.add_u64_counter(l_rocksdb_gets, "get", "Gets"); | |
348 | plb.add_u64_counter(l_rocksdb_txns, "submit_transaction", "Submit transactions"); | |
349 | plb.add_u64_counter(l_rocksdb_txns_sync, "submit_transaction_sync", "Submit transactions sync"); | |
350 | plb.add_time_avg(l_rocksdb_get_latency, "get_latency", "Get latency"); | |
351 | plb.add_time_avg(l_rocksdb_submit_latency, "submit_latency", "Submit Latency"); | |
352 | plb.add_time_avg(l_rocksdb_submit_sync_latency, "submit_sync_latency", "Submit Sync Latency"); | |
353 | plb.add_u64_counter(l_rocksdb_compact, "compact", "Compactions"); | |
354 | plb.add_u64_counter(l_rocksdb_compact_range, "compact_range", "Compactions by range"); | |
355 | plb.add_u64_counter(l_rocksdb_compact_queue_merge, "compact_queue_merge", "Mergings of ranges in compaction queue"); | |
356 | plb.add_u64(l_rocksdb_compact_queue_len, "compact_queue_len", "Length of compaction queue"); | |
357 | plb.add_time_avg(l_rocksdb_write_wal_time, "rocksdb_write_wal_time", "Rocksdb write wal time"); | |
358 | plb.add_time_avg(l_rocksdb_write_memtable_time, "rocksdb_write_memtable_time", "Rocksdb write memtable time"); | |
359 | plb.add_time_avg(l_rocksdb_write_delay_time, "rocksdb_write_delay_time", "Rocksdb write delay time"); | |
360 | plb.add_time_avg(l_rocksdb_write_pre_and_post_process_time, | |
361 | "rocksdb_write_pre_and_post_time", "total time spent on writing a record, excluding write process"); | |
362 | logger = plb.create_perf_counters(); | |
363 | cct->get_perfcounters_collection()->add(logger); | |
364 | ||
365 | if (compact_on_mount) { | |
366 | derr << "Compacting rocksdb store..." << dendl; | |
367 | compact(); | |
368 | derr << "Finished compacting rocksdb store" << dendl; | |
369 | } | |
370 | return 0; | |
371 | } | |
372 | ||
373 | int RocksDBStore::_test_init(const string& dir) | |
374 | { | |
375 | rocksdb::Options options; | |
376 | options.create_if_missing = true; | |
377 | rocksdb::DB *db; | |
378 | rocksdb::Status status = rocksdb::DB::Open(options, dir, &db); | |
379 | delete db; | |
380 | db = nullptr; | |
381 | return status.ok() ? 0 : -EIO; | |
382 | } | |
383 | ||
384 | RocksDBStore::~RocksDBStore() | |
385 | { | |
386 | close(); | |
387 | delete logger; | |
388 | ||
389 | // Ensure db is destroyed before dependent db_cache and filterpolicy | |
390 | delete db; | |
391 | db = nullptr; | |
392 | ||
393 | if (priv) { | |
394 | delete static_cast<rocksdb::Env*>(priv); | |
395 | } | |
396 | } | |
397 | ||
398 | void RocksDBStore::close() | |
399 | { | |
400 | // stop compaction thread | |
401 | compact_queue_lock.Lock(); | |
402 | if (compact_thread.is_started()) { | |
403 | compact_queue_stop = true; | |
404 | compact_queue_cond.Signal(); | |
405 | compact_queue_lock.Unlock(); | |
406 | compact_thread.join(); | |
407 | } else { | |
408 | compact_queue_lock.Unlock(); | |
409 | } | |
410 | ||
411 | if (logger) | |
412 | cct->get_perfcounters_collection()->remove(logger); | |
413 | } | |
414 | ||
415 | void RocksDBStore::split_stats(const std::string &s, char delim, std::vector<std::string> &elems) { | |
416 | std::stringstream ss; | |
417 | ss.str(s); | |
418 | std::string item; | |
419 | while (std::getline(ss, item, delim)) { | |
420 | elems.push_back(item); | |
421 | } | |
422 | } | |
423 | ||
424 | void RocksDBStore::get_statistics(Formatter *f) | |
425 | { | |
426 | if (!g_conf->rocksdb_perf) { | |
427 | dout(20) << __func__ << "RocksDB perf is disabled, can't probe for stats" | |
428 | << dendl; | |
429 | return; | |
430 | } | |
431 | ||
432 | if (g_conf->rocksdb_collect_compaction_stats) { | |
433 | std::string stat_str; | |
434 | bool status = db->GetProperty("rocksdb.stats", &stat_str); | |
435 | if (status) { | |
436 | f->open_object_section("rocksdb_statistics"); | |
437 | f->dump_string("rocksdb_compaction_statistics", ""); | |
438 | vector<string> stats; | |
439 | split_stats(stat_str, '\n', stats); | |
440 | for (auto st :stats) { | |
441 | f->dump_string("", st); | |
442 | } | |
443 | f->close_section(); | |
444 | } | |
445 | } | |
446 | if (g_conf->rocksdb_collect_extended_stats) { | |
447 | if (dbstats) { | |
448 | f->open_object_section("rocksdb_extended_statistics"); | |
449 | string stat_str = dbstats->ToString(); | |
450 | vector<string> stats; | |
451 | split_stats(stat_str, '\n', stats); | |
452 | f->dump_string("rocksdb_extended_statistics", ""); | |
453 | for (auto st :stats) { | |
454 | f->dump_string(".", st); | |
455 | } | |
456 | f->close_section(); | |
457 | } | |
458 | f->open_object_section("rocksdbstore_perf_counters"); | |
459 | logger->dump_formatted(f,0); | |
460 | f->close_section(); | |
461 | } | |
462 | if (g_conf->rocksdb_collect_memory_stats) { | |
463 | f->open_object_section("rocksdb_memtable_statistics"); | |
464 | std::string str(stringify(bbt_opts.block_cache->GetUsage())); | |
465 | f->dump_string("block_cache_usage", str.data()); | |
466 | str.clear(); | |
467 | str.append(stringify(bbt_opts.block_cache->GetPinnedUsage())); | |
468 | f->dump_string("block_cache_pinned_blocks_usage", str); | |
469 | str.clear(); | |
470 | db->GetProperty("rocksdb.cur-size-all-mem-tables", &str); | |
471 | f->dump_string("rocksdb_memtable_usage", str); | |
472 | f->close_section(); | |
473 | } | |
474 | } | |
475 | ||
476 | int RocksDBStore::submit_transaction(KeyValueDB::Transaction t) | |
477 | { | |
478 | utime_t start = ceph_clock_now(); | |
479 | // enable rocksdb breakdown | |
480 | // considering performance overhead, default is disabled | |
481 | if (g_conf->rocksdb_perf) { | |
482 | rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex); | |
483 | rocksdb::perf_context.Reset(); | |
484 | } | |
485 | ||
486 | RocksDBTransactionImpl * _t = | |
487 | static_cast<RocksDBTransactionImpl *>(t.get()); | |
488 | rocksdb::WriteOptions woptions; | |
489 | woptions.disableWAL = disableWAL; | |
490 | lgeneric_subdout(cct, rocksdb, 30) << __func__; | |
491 | RocksWBHandler bat_txc; | |
492 | _t->bat.Iterate(&bat_txc); | |
493 | *_dout << " Rocksdb transaction: " << bat_txc.seen << dendl; | |
494 | ||
495 | rocksdb::Status s = db->Write(woptions, &_t->bat); | |
496 | if (!s.ok()) { | |
497 | RocksWBHandler rocks_txc; | |
498 | _t->bat.Iterate(&rocks_txc); | |
499 | derr << __func__ << " error: " << s.ToString() << " code = " << s.code() | |
500 | << " Rocksdb transaction: " << rocks_txc.seen << dendl; | |
501 | } | |
502 | utime_t lat = ceph_clock_now() - start; | |
503 | ||
504 | if (g_conf->rocksdb_perf) { | |
505 | utime_t write_memtable_time; | |
506 | utime_t write_delay_time; | |
507 | utime_t write_wal_time; | |
508 | utime_t write_pre_and_post_process_time; | |
509 | write_wal_time.set_from_double( | |
510 | static_cast<double>(rocksdb::perf_context.write_wal_time)/1000000000); | |
511 | write_memtable_time.set_from_double( | |
512 | static_cast<double>(rocksdb::perf_context.write_memtable_time)/1000000000); | |
513 | write_delay_time.set_from_double( | |
514 | static_cast<double>(rocksdb::perf_context.write_delay_time)/1000000000); | |
515 | write_pre_and_post_process_time.set_from_double( | |
516 | static_cast<double>(rocksdb::perf_context.write_pre_and_post_process_time)/1000000000); | |
517 | logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time); | |
518 | logger->tinc(l_rocksdb_write_delay_time, write_delay_time); | |
519 | logger->tinc(l_rocksdb_write_wal_time, write_wal_time); | |
520 | logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time); | |
521 | } | |
522 | ||
523 | logger->inc(l_rocksdb_txns); | |
524 | logger->tinc(l_rocksdb_submit_latency, lat); | |
525 | ||
526 | return s.ok() ? 0 : -1; | |
527 | } | |
528 | ||
529 | int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t) | |
530 | { | |
531 | utime_t start = ceph_clock_now(); | |
532 | // enable rocksdb breakdown | |
533 | // considering performance overhead, default is disabled | |
534 | if (g_conf->rocksdb_perf) { | |
535 | rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex); | |
536 | rocksdb::perf_context.Reset(); | |
537 | } | |
538 | ||
539 | RocksDBTransactionImpl * _t = | |
540 | static_cast<RocksDBTransactionImpl *>(t.get()); | |
541 | rocksdb::WriteOptions woptions; | |
542 | woptions.sync = true; | |
543 | woptions.disableWAL = disableWAL; | |
544 | lgeneric_subdout(cct, rocksdb, 30) << __func__; | |
545 | RocksWBHandler bat_txc; | |
546 | _t->bat.Iterate(&bat_txc); | |
547 | *_dout << " Rocksdb transaction: " << bat_txc.seen << dendl; | |
548 | ||
549 | rocksdb::Status s = db->Write(woptions, &_t->bat); | |
550 | if (!s.ok()) { | |
551 | RocksWBHandler rocks_txc; | |
552 | _t->bat.Iterate(&rocks_txc); | |
553 | derr << __func__ << " error: " << s.ToString() << " code = " << s.code() | |
554 | << " Rocksdb transaction: " << rocks_txc.seen << dendl; | |
555 | } | |
556 | utime_t lat = ceph_clock_now() - start; | |
557 | ||
558 | if (g_conf->rocksdb_perf) { | |
559 | utime_t write_memtable_time; | |
560 | utime_t write_delay_time; | |
561 | utime_t write_wal_time; | |
562 | utime_t write_pre_and_post_process_time; | |
563 | write_wal_time.set_from_double( | |
564 | static_cast<double>(rocksdb::perf_context.write_wal_time)/1000000000); | |
565 | write_memtable_time.set_from_double( | |
566 | static_cast<double>(rocksdb::perf_context.write_memtable_time)/1000000000); | |
567 | write_delay_time.set_from_double( | |
568 | static_cast<double>(rocksdb::perf_context.write_delay_time)/1000000000); | |
569 | write_pre_and_post_process_time.set_from_double( | |
570 | static_cast<double>(rocksdb::perf_context.write_pre_and_post_process_time)/1000000000); | |
571 | logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time); | |
572 | logger->tinc(l_rocksdb_write_delay_time, write_delay_time); | |
573 | logger->tinc(l_rocksdb_write_wal_time, write_wal_time); | |
574 | logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time); | |
575 | } | |
576 | ||
577 | logger->inc(l_rocksdb_txns_sync); | |
578 | logger->tinc(l_rocksdb_submit_sync_latency, lat); | |
579 | ||
580 | return s.ok() ? 0 : -1; | |
581 | } | |
582 | ||
583 | RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db) | |
584 | { | |
585 | db = _db; | |
586 | } | |
587 | ||
588 | void RocksDBStore::RocksDBTransactionImpl::set( | |
589 | const string &prefix, | |
590 | const string &k, | |
591 | const bufferlist &to_set_bl) | |
592 | { | |
593 | string key = combine_strings(prefix, k); | |
594 | ||
595 | // bufferlist::c_str() is non-constant, so we can't call c_str() | |
596 | if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) { | |
597 | bat.Put(rocksdb::Slice(key), | |
598 | rocksdb::Slice(to_set_bl.buffers().front().c_str(), | |
599 | to_set_bl.length())); | |
600 | } else { | |
31f18b77 FG |
601 | rocksdb::Slice key_slice(key); |
602 | rocksdb::Slice value_slices[to_set_bl.buffers().size()]; | |
603 | bat.Put(nullptr, rocksdb::SliceParts(&key_slice, 1), | |
604 | prepare_sliceparts(to_set_bl, value_slices)); | |
7c673cae FG |
605 | } |
606 | } | |
607 | ||
608 | void RocksDBStore::RocksDBTransactionImpl::set( | |
609 | const string &prefix, | |
610 | const char *k, size_t keylen, | |
611 | const bufferlist &to_set_bl) | |
612 | { | |
613 | string key; | |
614 | combine_strings(prefix, k, keylen, &key); | |
615 | ||
616 | // bufferlist::c_str() is non-constant, so we can't call c_str() | |
617 | if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) { | |
618 | bat.Put(rocksdb::Slice(key), | |
619 | rocksdb::Slice(to_set_bl.buffers().front().c_str(), | |
620 | to_set_bl.length())); | |
621 | } else { | |
31f18b77 FG |
622 | rocksdb::Slice key_slice(key); |
623 | rocksdb::Slice value_slices[to_set_bl.buffers().size()]; | |
624 | bat.Put(nullptr, rocksdb::SliceParts(&key_slice, 1), | |
625 | prepare_sliceparts(to_set_bl, value_slices)); | |
7c673cae FG |
626 | } |
627 | } | |
628 | ||
629 | void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix, | |
630 | const string &k) | |
631 | { | |
632 | bat.Delete(combine_strings(prefix, k)); | |
633 | } | |
634 | ||
635 | void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix, | |
636 | const char *k, | |
637 | size_t keylen) | |
638 | { | |
639 | string key; | |
640 | combine_strings(prefix, k, keylen, &key); | |
641 | bat.Delete(key); | |
642 | } | |
643 | ||
644 | void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix, | |
645 | const string &k) | |
646 | { | |
647 | bat.SingleDelete(combine_strings(prefix, k)); | |
648 | } | |
649 | ||
650 | void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix) | |
651 | { | |
31f18b77 FG |
652 | if (db->enable_rmrange) { |
653 | string endprefix = prefix; | |
654 | endprefix.push_back('\x01'); | |
655 | bat.DeleteRange(combine_strings(prefix, string()), | |
656 | combine_strings(endprefix, string())); | |
657 | } else { | |
658 | KeyValueDB::Iterator it = db->get_iterator(prefix); | |
659 | for (it->seek_to_first(); | |
660 | it->valid(); | |
661 | it->next()) { | |
662 | bat.Delete(combine_strings(prefix, it->key())); | |
663 | } | |
7c673cae FG |
664 | } |
665 | } | |
666 | ||
667 | void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix, | |
668 | const string &start, | |
669 | const string &end) | |
670 | { | |
671 | if (db->enable_rmrange) { | |
672 | bat.DeleteRange(combine_strings(prefix, start), combine_strings(prefix, end)); | |
673 | } else { | |
674 | auto it = db->get_iterator(prefix); | |
675 | it->lower_bound(start); | |
676 | while (it->valid()) { | |
677 | if (it->key() >= end) { | |
678 | break; | |
679 | } | |
680 | bat.Delete(combine_strings(prefix, it->key())); | |
681 | it->next(); | |
682 | } | |
683 | } | |
684 | } | |
685 | ||
686 | void RocksDBStore::RocksDBTransactionImpl::merge( | |
687 | const string &prefix, | |
688 | const string &k, | |
689 | const bufferlist &to_set_bl) | |
690 | { | |
691 | string key = combine_strings(prefix, k); | |
692 | ||
693 | // bufferlist::c_str() is non-constant, so we can't call c_str() | |
694 | if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) { | |
695 | bat.Merge(rocksdb::Slice(key), | |
696 | rocksdb::Slice(to_set_bl.buffers().front().c_str(), | |
697 | to_set_bl.length())); | |
698 | } else { | |
699 | // make a copy | |
31f18b77 FG |
700 | rocksdb::Slice key_slice(key); |
701 | rocksdb::Slice value_slices[to_set_bl.buffers().size()]; | |
702 | bat.Merge(nullptr, rocksdb::SliceParts(&key_slice, 1), | |
703 | prepare_sliceparts(to_set_bl, value_slices)); | |
7c673cae FG |
704 | } |
705 | } | |
706 | ||
707 | //gets will bypass RocksDB row cache, since it uses iterator | |
708 | int RocksDBStore::get( | |
709 | const string &prefix, | |
710 | const std::set<string> &keys, | |
711 | std::map<string, bufferlist> *out) | |
712 | { | |
713 | utime_t start = ceph_clock_now(); | |
714 | for (std::set<string>::const_iterator i = keys.begin(); | |
715 | i != keys.end(); ++i) { | |
716 | std::string value; | |
717 | std::string bound = combine_strings(prefix, *i); | |
718 | auto status = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(bound), &value); | |
719 | if (status.ok()) | |
720 | (*out)[*i].append(value); | |
721 | } | |
722 | utime_t lat = ceph_clock_now() - start; | |
723 | logger->inc(l_rocksdb_gets); | |
724 | logger->tinc(l_rocksdb_get_latency, lat); | |
725 | return 0; | |
726 | } | |
727 | ||
728 | int RocksDBStore::get( | |
729 | const string &prefix, | |
730 | const string &key, | |
731 | bufferlist *out) | |
732 | { | |
733 | assert(out && (out->length() == 0)); | |
734 | utime_t start = ceph_clock_now(); | |
735 | int r = 0; | |
736 | string value, k; | |
737 | rocksdb::Status s; | |
738 | k = combine_strings(prefix, key); | |
739 | s = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(k), &value); | |
740 | if (s.ok()) { | |
741 | out->append(value); | |
742 | } else { | |
743 | r = -ENOENT; | |
744 | } | |
745 | utime_t lat = ceph_clock_now() - start; | |
746 | logger->inc(l_rocksdb_gets); | |
747 | logger->tinc(l_rocksdb_get_latency, lat); | |
748 | return r; | |
749 | } | |
750 | ||
751 | int RocksDBStore::get( | |
752 | const string& prefix, | |
753 | const char *key, | |
754 | size_t keylen, | |
755 | bufferlist *out) | |
756 | { | |
757 | assert(out && (out->length() == 0)); | |
758 | utime_t start = ceph_clock_now(); | |
759 | int r = 0; | |
760 | string value, k; | |
761 | combine_strings(prefix, key, keylen, &k); | |
762 | rocksdb::Status s; | |
763 | s = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(k), &value); | |
764 | if (s.ok()) { | |
765 | out->append(value); | |
766 | } else { | |
767 | r = -ENOENT; | |
768 | } | |
769 | utime_t lat = ceph_clock_now() - start; | |
770 | logger->inc(l_rocksdb_gets); | |
771 | logger->tinc(l_rocksdb_get_latency, lat); | |
772 | return r; | |
773 | } | |
774 | ||
775 | int RocksDBStore::split_key(rocksdb::Slice in, string *prefix, string *key) | |
776 | { | |
777 | size_t prefix_len = 0; | |
778 | ||
779 | // Find separator inside Slice | |
780 | char* separator = (char*) memchr(in.data(), 0, in.size()); | |
781 | if (separator == NULL) | |
782 | return -EINVAL; | |
783 | prefix_len = size_t(separator - in.data()); | |
784 | if (prefix_len >= in.size()) | |
785 | return -EINVAL; | |
786 | ||
787 | // Fetch prefix and/or key directly from Slice | |
788 | if (prefix) | |
789 | *prefix = string(in.data(), prefix_len); | |
790 | if (key) | |
791 | *key = string(separator+1, in.size()-prefix_len-1); | |
792 | return 0; | |
793 | } | |
794 | ||
795 | void RocksDBStore::compact() | |
796 | { | |
797 | logger->inc(l_rocksdb_compact); | |
798 | rocksdb::CompactRangeOptions options; | |
799 | db->CompactRange(options, nullptr, nullptr); | |
800 | } | |
801 | ||
802 | ||
803 | void RocksDBStore::compact_thread_entry() | |
804 | { | |
805 | compact_queue_lock.Lock(); | |
806 | while (!compact_queue_stop) { | |
807 | while (!compact_queue.empty()) { | |
808 | pair<string,string> range = compact_queue.front(); | |
809 | compact_queue.pop_front(); | |
810 | logger->set(l_rocksdb_compact_queue_len, compact_queue.size()); | |
811 | compact_queue_lock.Unlock(); | |
812 | logger->inc(l_rocksdb_compact_range); | |
813 | compact_range(range.first, range.second); | |
814 | compact_queue_lock.Lock(); | |
815 | continue; | |
816 | } | |
817 | compact_queue_cond.Wait(compact_queue_lock); | |
818 | } | |
819 | compact_queue_lock.Unlock(); | |
820 | } | |
821 | ||
822 | void RocksDBStore::compact_range_async(const string& start, const string& end) | |
823 | { | |
824 | Mutex::Locker l(compact_queue_lock); | |
825 | ||
826 | // try to merge adjacent ranges. this is O(n), but the queue should | |
827 | // be short. note that we do not cover all overlap cases and merge | |
828 | // opportunities here, but we capture the ones we currently need. | |
829 | list< pair<string,string> >::iterator p = compact_queue.begin(); | |
830 | while (p != compact_queue.end()) { | |
831 | if (p->first == start && p->second == end) { | |
832 | // dup; no-op | |
833 | return; | |
834 | } | |
835 | if (p->first <= end && p->first > start) { | |
836 | // merge with existing range to the right | |
837 | compact_queue.push_back(make_pair(start, p->second)); | |
838 | compact_queue.erase(p); | |
839 | logger->inc(l_rocksdb_compact_queue_merge); | |
840 | break; | |
841 | } | |
842 | if (p->second >= start && p->second < end) { | |
843 | // merge with existing range to the left | |
844 | compact_queue.push_back(make_pair(p->first, end)); | |
845 | compact_queue.erase(p); | |
846 | logger->inc(l_rocksdb_compact_queue_merge); | |
847 | break; | |
848 | } | |
849 | ++p; | |
850 | } | |
851 | if (p == compact_queue.end()) { | |
852 | // no merge, new entry. | |
853 | compact_queue.push_back(make_pair(start, end)); | |
854 | logger->set(l_rocksdb_compact_queue_len, compact_queue.size()); | |
855 | } | |
856 | compact_queue_cond.Signal(); | |
857 | if (!compact_thread.is_started()) { | |
858 | compact_thread.create("rstore_compact"); | |
859 | } | |
860 | } | |
861 | bool RocksDBStore::check_omap_dir(string &omap_dir) | |
862 | { | |
863 | rocksdb::Options options; | |
864 | options.create_if_missing = true; | |
865 | rocksdb::DB *db; | |
866 | rocksdb::Status status = rocksdb::DB::Open(options, omap_dir, &db); | |
867 | delete db; | |
868 | db = nullptr; | |
869 | return status.ok(); | |
870 | } | |
871 | void RocksDBStore::compact_range(const string& start, const string& end) | |
872 | { | |
873 | rocksdb::CompactRangeOptions options; | |
874 | rocksdb::Slice cstart(start); | |
875 | rocksdb::Slice cend(end); | |
876 | db->CompactRange(options, &cstart, &cend); | |
877 | } | |
878 | RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl() | |
879 | { | |
880 | delete dbiter; | |
881 | } | |
882 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first() | |
883 | { | |
884 | dbiter->SeekToFirst(); | |
885 | return dbiter->status().ok() ? 0 : -1; | |
886 | } | |
887 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string &prefix) | |
888 | { | |
889 | rocksdb::Slice slice_prefix(prefix); | |
890 | dbiter->Seek(slice_prefix); | |
891 | return dbiter->status().ok() ? 0 : -1; | |
892 | } | |
893 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last() | |
894 | { | |
895 | dbiter->SeekToLast(); | |
896 | return dbiter->status().ok() ? 0 : -1; | |
897 | } | |
898 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string &prefix) | |
899 | { | |
900 | string limit = past_prefix(prefix); | |
901 | rocksdb::Slice slice_limit(limit); | |
902 | dbiter->Seek(slice_limit); | |
903 | ||
904 | if (!dbiter->Valid()) { | |
905 | dbiter->SeekToLast(); | |
906 | } else { | |
907 | dbiter->Prev(); | |
908 | } | |
909 | return dbiter->status().ok() ? 0 : -1; | |
910 | } | |
911 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after) | |
912 | { | |
913 | lower_bound(prefix, after); | |
914 | if (valid()) { | |
915 | pair<string,string> key = raw_key(); | |
916 | if (key.first == prefix && key.second == after) | |
917 | next(); | |
918 | } | |
919 | return dbiter->status().ok() ? 0 : -1; | |
920 | } | |
921 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to) | |
922 | { | |
923 | string bound = combine_strings(prefix, to); | |
924 | rocksdb::Slice slice_bound(bound); | |
925 | dbiter->Seek(slice_bound); | |
926 | return dbiter->status().ok() ? 0 : -1; | |
927 | } | |
928 | bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid() | |
929 | { | |
930 | return dbiter->Valid(); | |
931 | } | |
932 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next() | |
933 | { | |
934 | if (valid()) { | |
935 | dbiter->Next(); | |
936 | } | |
937 | return dbiter->status().ok() ? 0 : -1; | |
938 | } | |
939 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev() | |
940 | { | |
941 | if (valid()) { | |
942 | dbiter->Prev(); | |
943 | } | |
944 | return dbiter->status().ok() ? 0 : -1; | |
945 | } | |
946 | string RocksDBStore::RocksDBWholeSpaceIteratorImpl::key() | |
947 | { | |
948 | string out_key; | |
949 | split_key(dbiter->key(), 0, &out_key); | |
950 | return out_key; | |
951 | } | |
952 | pair<string,string> RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key() | |
953 | { | |
954 | string prefix, key; | |
955 | split_key(dbiter->key(), &prefix, &key); | |
956 | return make_pair(prefix, key); | |
957 | } | |
958 | ||
959 | bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key_is_prefixed(const string &prefix) { | |
960 | // Look for "prefix\0" right in rocksb::Slice | |
961 | rocksdb::Slice key = dbiter->key(); | |
962 | if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) { | |
963 | return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0; | |
964 | } else { | |
965 | return false; | |
966 | } | |
967 | } | |
968 | ||
969 | bufferlist RocksDBStore::RocksDBWholeSpaceIteratorImpl::value() | |
970 | { | |
971 | return to_bufferlist(dbiter->value()); | |
972 | } | |
973 | ||
974 | size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::key_size() | |
975 | { | |
976 | return dbiter->key().size(); | |
977 | } | |
978 | ||
979 | size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_size() | |
980 | { | |
981 | return dbiter->value().size(); | |
982 | } | |
983 | ||
984 | bufferptr RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_as_ptr() | |
985 | { | |
986 | rocksdb::Slice val = dbiter->value(); | |
987 | return bufferptr(val.data(), val.size()); | |
988 | } | |
989 | ||
990 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status() | |
991 | { | |
992 | return dbiter->status().ok() ? 0 : -1; | |
993 | } | |
994 | ||
995 | string RocksDBStore::past_prefix(const string &prefix) | |
996 | { | |
997 | string limit = prefix; | |
998 | limit.push_back(1); | |
999 | return limit; | |
1000 | } | |
1001 | ||
1002 | RocksDBStore::WholeSpaceIterator RocksDBStore::_get_iterator() | |
1003 | { | |
1004 | return std::make_shared<RocksDBWholeSpaceIteratorImpl>( | |
1005 | db->NewIterator(rocksdb::ReadOptions())); | |
1006 | } | |
1007 |