]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
20effc67 | 4 | #include <filesystem> |
7c673cae | 5 | #include <map> |
7c673cae | 6 | #include <memory> |
20effc67 TL |
7 | #include <set> |
8 | #include <string> | |
7c673cae FG |
9 | #include <errno.h> |
10 | #include <unistd.h> | |
11 | #include <sys/types.h> | |
12 | #include <sys/stat.h> | |
13 | ||
14 | #include "rocksdb/db.h" | |
15 | #include "rocksdb/table.h" | |
16 | #include "rocksdb/env.h" | |
17 | #include "rocksdb/slice.h" | |
18 | #include "rocksdb/cache.h" | |
19 | #include "rocksdb/filter_policy.h" | |
20 | #include "rocksdb/utilities/convenience.h" | |
21 | #include "rocksdb/merge_operator.h" | |
91327a77 | 22 | |
7c673cae | 23 | #include "common/perf_counters.h" |
91327a77 | 24 | #include "common/PriorityCache.h" |
9f95a23c | 25 | #include "include/common_fwd.h" |
f67539c2 | 26 | #include "include/scope_guard.h" |
7c673cae FG |
27 | #include "include/str_list.h" |
28 | #include "include/stringify.h" | |
29 | #include "include/str_map.h" | |
30 | #include "KeyValueDB.h" | |
31 | #include "RocksDBStore.h" | |
32 | ||
33 | #include "common/debug.h" | |
34 | ||
35 | #define dout_context cct | |
36 | #define dout_subsys ceph_subsys_rocksdb | |
37 | #undef dout_prefix | |
38 | #define dout_prefix *_dout << "rocksdb: " | |
39 | ||
20effc67 TL |
40 | namespace fs = std::filesystem; |
41 | ||
f67539c2 TL |
42 | using std::function; |
43 | using std::list; | |
44 | using std::map; | |
45 | using std::ostream; | |
46 | using std::pair; | |
47 | using std::set; | |
48 | using std::string; | |
49 | using std::unique_ptr; | |
50 | using std::vector; | |
51 | ||
52 | using ceph::bufferlist; | |
53 | using ceph::bufferptr; | |
54 | using ceph::Formatter; | |
55 | ||
56 | static const char* sharding_def_dir = "sharding"; | |
57 | static const char* sharding_def_file = "sharding/def"; | |
58 | static const char* sharding_recreate = "sharding/recreate_columns"; | |
59 | static const char* resharding_column_lock = "reshardingXcommencingXlocked"; | |
60 | ||
11fdf7f2 TL |
61 | static bufferlist to_bufferlist(rocksdb::Slice in) { |
62 | bufferlist bl; | |
63 | bl.append(bufferptr(in.data(), in.size())); | |
64 | return bl; | |
65 | } | |
66 | ||
c07f9fc5 FG |
67 | static rocksdb::SliceParts prepare_sliceparts(const bufferlist &bl, |
68 | vector<rocksdb::Slice> *slices) | |
31f18b77 FG |
69 | { |
70 | unsigned n = 0; | |
c07f9fc5 FG |
71 | for (auto& buf : bl.buffers()) { |
72 | (*slices)[n].data_ = buf.c_str(); | |
73 | (*slices)[n].size_ = buf.length(); | |
74 | n++; | |
31f18b77 | 75 | } |
c07f9fc5 | 76 | return rocksdb::SliceParts(slices->data(), slices->size()); |
31f18b77 FG |
77 | } |
78 | ||
11fdf7f2 | 79 | |
7c673cae | 80 | // |
11fdf7f2 TL |
81 | // One of these for the default rocksdb column family, routing each prefix |
82 | // to the appropriate MergeOperator. | |
7c673cae | 83 | // |
11fdf7f2 TL |
84 | class RocksDBStore::MergeOperatorRouter |
85 | : public rocksdb::AssociativeMergeOperator | |
86 | { | |
7c673cae | 87 | RocksDBStore& store; |
11fdf7f2 | 88 | public: |
7c673cae FG |
89 | const char *Name() const override { |
90 | // Construct a name that rocksDB will validate against. We want to | |
91 | // do this in a way that doesn't constrain the ordering of calls | |
92 | // to set_merge_operator, so sort the merge operators and then | |
93 | // construct a name from all of those parts. | |
94 | store.assoc_name.clear(); | |
95 | map<std::string,std::string> names; | |
11fdf7f2 TL |
96 | |
97 | for (auto& p : store.merge_ops) { | |
98 | names[p.first] = p.second->name(); | |
99 | } | |
7c673cae FG |
100 | for (auto& p : names) { |
101 | store.assoc_name += '.'; | |
102 | store.assoc_name += p.first; | |
103 | store.assoc_name += ':'; | |
104 | store.assoc_name += p.second; | |
105 | } | |
106 | return store.assoc_name.c_str(); | |
107 | } | |
108 | ||
11fdf7f2 | 109 | explicit MergeOperatorRouter(RocksDBStore &_store) : store(_store) {} |
7c673cae FG |
110 | |
111 | bool Merge(const rocksdb::Slice& key, | |
11fdf7f2 TL |
112 | const rocksdb::Slice* existing_value, |
113 | const rocksdb::Slice& value, | |
114 | std::string* new_value, | |
115 | rocksdb::Logger* logger) const override { | |
116 | // for default column family | |
117 | // extract prefix from key and compare against each registered merge op; | |
118 | // even though merge operator for explicit CF is included in merge_ops, | |
119 | // it won't be picked up, since it won't match. | |
7c673cae FG |
120 | for (auto& p : store.merge_ops) { |
121 | if (p.first.compare(0, p.first.length(), | |
122 | key.data(), p.first.length()) == 0 && | |
123 | key.data()[p.first.length()] == 0) { | |
11fdf7f2 TL |
124 | if (existing_value) { |
125 | p.second->merge(existing_value->data(), existing_value->size(), | |
7c673cae FG |
126 | value.data(), value.size(), |
127 | new_value); | |
11fdf7f2 TL |
128 | } else { |
129 | p.second->merge_nonexistent(value.data(), value.size(), new_value); | |
130 | } | |
131 | break; | |
7c673cae FG |
132 | } |
133 | } | |
134 | return true; // OK :) | |
135 | } | |
11fdf7f2 TL |
136 | }; |
137 | ||
138 | // | |
139 | // One of these per non-default column family, linked directly to the | |
140 | // merge operator for that CF/prefix (if any). | |
141 | // | |
142 | class RocksDBStore::MergeOperatorLinker | |
143 | : public rocksdb::AssociativeMergeOperator | |
144 | { | |
145 | private: | |
146 | std::shared_ptr<KeyValueDB::MergeOperator> mop; | |
147 | public: | |
148 | explicit MergeOperatorLinker(const std::shared_ptr<KeyValueDB::MergeOperator> &o) : mop(o) {} | |
7c673cae | 149 | |
11fdf7f2 TL |
150 | const char *Name() const override { |
151 | return mop->name(); | |
152 | } | |
153 | ||
154 | bool Merge(const rocksdb::Slice& key, | |
155 | const rocksdb::Slice* existing_value, | |
156 | const rocksdb::Slice& value, | |
157 | std::string* new_value, | |
158 | rocksdb::Logger* logger) const override { | |
159 | if (existing_value) { | |
160 | mop->merge(existing_value->data(), existing_value->size(), | |
161 | value.data(), value.size(), | |
162 | new_value); | |
163 | } else { | |
164 | mop->merge_nonexistent(value.data(), value.size(), new_value); | |
165 | } | |
166 | return true; | |
167 | } | |
7c673cae FG |
168 | }; |
169 | ||
170 | int RocksDBStore::set_merge_operator( | |
171 | const string& prefix, | |
172 | std::shared_ptr<KeyValueDB::MergeOperator> mop) | |
173 | { | |
174 | // If you fail here, it's because you can't do this on an open database | |
11fdf7f2 | 175 | ceph_assert(db == nullptr); |
7c673cae FG |
176 | merge_ops.push_back(std::make_pair(prefix,mop)); |
177 | return 0; | |
178 | } | |
179 | ||
180 | class CephRocksdbLogger : public rocksdb::Logger { | |
181 | CephContext *cct; | |
182 | public: | |
183 | explicit CephRocksdbLogger(CephContext *c) : cct(c) { | |
184 | cct->get(); | |
185 | } | |
186 | ~CephRocksdbLogger() override { | |
187 | cct->put(); | |
188 | } | |
189 | ||
190 | // Write an entry to the log file with the specified format. | |
191 | void Logv(const char* format, va_list ap) override { | |
192 | Logv(rocksdb::INFO_LEVEL, format, ap); | |
193 | } | |
194 | ||
195 | // Write an entry to the log file with the specified log level | |
196 | // and format. Any log with level under the internal log level | |
197 | // of *this (see @SetInfoLogLevel and @GetInfoLogLevel) will not be | |
198 | // printed. | |
199 | void Logv(const rocksdb::InfoLogLevel log_level, const char* format, | |
200 | va_list ap) override { | |
201 | int v = rocksdb::NUM_INFO_LOG_LEVELS - log_level - 1; | |
11fdf7f2 | 202 | dout(ceph::dout::need_dynamic(v)); |
7c673cae FG |
203 | char buf[65536]; |
204 | vsnprintf(buf, sizeof(buf), format, ap); | |
205 | *_dout << buf << dendl; | |
206 | } | |
207 | }; | |
208 | ||
209 | rocksdb::Logger *create_rocksdb_ceph_logger() | |
210 | { | |
211 | return new CephRocksdbLogger(g_ceph_context); | |
212 | } | |
213 | ||
c07f9fc5 | 214 | static int string2bool(const string &val, bool &b_val) |
7c673cae FG |
215 | { |
216 | if (strcasecmp(val.c_str(), "false") == 0) { | |
217 | b_val = false; | |
218 | return 0; | |
219 | } else if (strcasecmp(val.c_str(), "true") == 0) { | |
220 | b_val = true; | |
221 | return 0; | |
222 | } else { | |
223 | std::string err; | |
224 | int b = strict_strtol(val.c_str(), 10, &err); | |
225 | if (!err.empty()) | |
226 | return -EINVAL; | |
227 | b_val = !!b; | |
228 | return 0; | |
229 | } | |
230 | } | |
f67539c2 TL |
231 | |
232 | namespace rocksdb { | |
233 | extern std::string trim(const std::string& str); | |
234 | } | |
235 | ||
236 | // this function is a modification of rocksdb's StringToMap: | |
237 | // 1) accepts ' \n ; as separators | |
238 | // 2) leaves compound options with enclosing { and } | |
239 | rocksdb::Status StringToMap(const std::string& opts_str, | |
240 | std::unordered_map<std::string, std::string>* opts_map) | |
241 | { | |
242 | using rocksdb::Status; | |
243 | using rocksdb::trim; | |
244 | assert(opts_map); | |
245 | // Example: | |
246 | // opts_str = "write_buffer_size=1024;max_write_buffer_number=2;" | |
247 | // "nested_opt={opt1=1;opt2=2};max_bytes_for_level_base=100" | |
248 | size_t pos = 0; | |
249 | std::string opts = trim(opts_str); | |
250 | while (pos < opts.size()) { | |
251 | size_t eq_pos = opts.find('=', pos); | |
252 | if (eq_pos == std::string::npos) { | |
253 | return Status::InvalidArgument("Mismatched key value pair, '=' expected"); | |
254 | } | |
255 | std::string key = trim(opts.substr(pos, eq_pos - pos)); | |
256 | if (key.empty()) { | |
257 | return Status::InvalidArgument("Empty key found"); | |
258 | } | |
259 | ||
260 | // skip space after '=' and look for '{' for possible nested options | |
261 | pos = eq_pos + 1; | |
262 | while (pos < opts.size() && isspace(opts[pos])) { | |
263 | ++pos; | |
264 | } | |
265 | // Empty value at the end | |
266 | if (pos >= opts.size()) { | |
267 | (*opts_map)[key] = ""; | |
268 | break; | |
269 | } | |
270 | if (opts[pos] == '{') { | |
271 | int count = 1; | |
272 | size_t brace_pos = pos + 1; | |
273 | while (brace_pos < opts.size()) { | |
274 | if (opts[brace_pos] == '{') { | |
275 | ++count; | |
276 | } else if (opts[brace_pos] == '}') { | |
277 | --count; | |
278 | if (count == 0) { | |
279 | break; | |
280 | } | |
281 | } | |
282 | ++brace_pos; | |
283 | } | |
284 | // found the matching closing brace | |
285 | if (count == 0) { | |
286 | //include both '{' and '}' | |
287 | (*opts_map)[key] = trim(opts.substr(pos, brace_pos - pos + 1)); | |
288 | // skip all whitespace and move to the next ';,' | |
289 | // brace_pos points to the matching '}' | |
290 | pos = brace_pos + 1; | |
291 | while (pos < opts.size() && isspace(opts[pos])) { | |
292 | ++pos; | |
293 | } | |
294 | if (pos < opts.size() && opts[pos] != ';' && opts[pos] != ',') { | |
295 | return Status::InvalidArgument( | |
296 | "Unexpected chars after nested options"); | |
297 | } | |
298 | ++pos; | |
299 | } else { | |
300 | return Status::InvalidArgument( | |
301 | "Mismatched curly braces for nested options"); | |
302 | } | |
303 | } else { | |
304 | size_t sc_pos = opts.find_first_of(",;", pos); | |
305 | if (sc_pos == std::string::npos) { | |
306 | (*opts_map)[key] = trim(opts.substr(pos)); | |
307 | // It either ends with a trailing , ; or the last key-value pair | |
308 | break; | |
309 | } else { | |
310 | (*opts_map)[key] = trim(opts.substr(pos, sc_pos - pos)); | |
311 | } | |
312 | pos = sc_pos + 1; | |
313 | } | |
314 | } | |
315 | return Status::OK(); | |
316 | } | |
317 | ||
c07f9fc5 | 318 | int RocksDBStore::tryInterpret(const string &key, const string &val, rocksdb::Options &opt) |
7c673cae FG |
319 | { |
320 | if (key == "compaction_threads") { | |
321 | std::string err; | |
20effc67 | 322 | int f = strict_iecstrtoll(val, &err); |
7c673cae FG |
323 | if (!err.empty()) |
324 | return -EINVAL; | |
325 | //Low priority threadpool is used for compaction | |
326 | opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::LOW); | |
327 | } else if (key == "flusher_threads") { | |
328 | std::string err; | |
20effc67 | 329 | int f = strict_iecstrtoll(val, &err); |
7c673cae FG |
330 | if (!err.empty()) |
331 | return -EINVAL; | |
332 | //High priority threadpool is used for flusher | |
333 | opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::HIGH); | |
334 | } else if (key == "compact_on_mount") { | |
335 | int ret = string2bool(val, compact_on_mount); | |
336 | if (ret != 0) | |
337 | return ret; | |
338 | } else if (key == "disableWAL") { | |
339 | int ret = string2bool(val, disableWAL); | |
340 | if (ret != 0) | |
341 | return ret; | |
342 | } else { | |
343 | //unrecognize config options. | |
344 | return -EINVAL; | |
345 | } | |
346 | return 0; | |
347 | } | |
348 | ||
c07f9fc5 | 349 | int RocksDBStore::ParseOptionsFromString(const string &opt_str, rocksdb::Options &opt) |
7c673cae | 350 | { |
9f95a23c TL |
351 | return ParseOptionsFromStringStatic(cct, opt_str, opt, |
352 | [&](const string& k, const string& v, rocksdb::Options& o) { | |
353 | return tryInterpret(k, v, o); | |
354 | } | |
355 | ); | |
356 | } | |
357 | ||
358 | int RocksDBStore::ParseOptionsFromStringStatic( | |
359 | CephContext *cct, | |
360 | const string& opt_str, | |
361 | rocksdb::Options& opt, | |
362 | function<int(const string&, const string&, rocksdb::Options&)> interp) | |
363 | { | |
364 | // keep aligned with func tryInterpret | |
365 | const set<string> need_interp_keys = {"compaction_threads", "flusher_threads", "compact_on_mount", "disableWAL"}; | |
f67539c2 TL |
366 | rocksdb::Status status; |
367 | std::unordered_map<std::string, std::string> str_map; | |
368 | status = StringToMap(opt_str, &str_map); | |
369 | if (!status.ok()) { | |
370 | dout(5) << __func__ << " error '" << status.getState() << | |
371 | "' while parsing options '" << opt_str << "'" << dendl; | |
372 | return -EINVAL; | |
373 | } | |
9f95a23c | 374 | |
f67539c2 | 375 | for (auto it = str_map.begin(); it != str_map.end(); ++it) { |
7c673cae | 376 | string this_opt = it->first + "=" + it->second; |
9f95a23c TL |
377 | rocksdb::Status status = |
378 | rocksdb::GetOptionsFromString(opt, this_opt, &opt); | |
20effc67 | 379 | int r = 0; |
7c673cae | 380 | if (!status.ok()) { |
9f95a23c TL |
381 | if (interp != nullptr) { |
382 | r = interp(it->first, it->second, opt); | |
383 | } else if (!need_interp_keys.count(it->first)) { | |
384 | r = -1; | |
385 | } | |
7c673cae | 386 | if (r < 0) { |
9f95a23c TL |
387 | derr << status.ToString() << dendl; |
388 | return -EINVAL; | |
7c673cae FG |
389 | } |
390 | } | |
f67539c2 | 391 | lgeneric_dout(cct, 1) << " set rocksdb option " << it->first |
9f95a23c | 392 | << " = " << it->second << dendl; |
7c673cae FG |
393 | } |
394 | return 0; | |
395 | } | |
396 | ||
397 | int RocksDBStore::init(string _options_str) | |
398 | { | |
399 | options_str = _options_str; | |
400 | rocksdb::Options opt; | |
401 | //try parse options | |
402 | if (options_str.length()) { | |
403 | int r = ParseOptionsFromString(options_str, opt); | |
404 | if (r != 0) { | |
405 | return -EINVAL; | |
406 | } | |
407 | } | |
408 | return 0; | |
409 | } | |
410 | ||
11fdf7f2 | 411 | int RocksDBStore::create_db_dir() |
7c673cae FG |
412 | { |
413 | if (env) { | |
414 | unique_ptr<rocksdb::Directory> dir; | |
415 | env->NewDirectory(path, &dir); | |
416 | } else { | |
f67539c2 TL |
417 | if (!fs::exists(path)) { |
418 | std::error_code ec; | |
419 | if (!fs::create_directory(path, ec)) { | |
420 | derr << __func__ << " failed to create " << path | |
421 | << ": " << ec.message() << dendl; | |
422 | return -ec.value(); | |
423 | } | |
424 | fs::permissions(path, | |
425 | fs::perms::owner_all | | |
426 | fs::perms::group_read | fs::perms::group_exec | | |
427 | fs::perms::others_read | fs::perms::others_exec); | |
7c673cae FG |
428 | } |
429 | } | |
11fdf7f2 TL |
430 | return 0; |
431 | } | |
432 | ||
433 | int RocksDBStore::install_cf_mergeop( | |
f67539c2 | 434 | const string &key_prefix, |
11fdf7f2 TL |
435 | rocksdb::ColumnFamilyOptions *cf_opt) |
436 | { | |
437 | ceph_assert(cf_opt != nullptr); | |
438 | cf_opt->merge_operator.reset(); | |
439 | for (auto& i : merge_ops) { | |
f67539c2 | 440 | if (i.first == key_prefix) { |
11fdf7f2 TL |
441 | cf_opt->merge_operator.reset(new MergeOperatorLinker(i.second)); |
442 | } | |
443 | } | |
444 | return 0; | |
7c673cae FG |
445 | } |
446 | ||
11fdf7f2 | 447 | int RocksDBStore::create_and_open(ostream &out, |
f67539c2 | 448 | const std::string& cfs) |
11fdf7f2 TL |
449 | { |
450 | int r = create_db_dir(); | |
451 | if (r < 0) | |
452 | return r; | |
f67539c2 TL |
453 | return do_open(out, true, false, cfs); |
454 | } | |
455 | ||
456 | std::shared_ptr<rocksdb::Cache> RocksDBStore::create_block_cache( | |
457 | const std::string& cache_type, size_t cache_size, double cache_prio_high) { | |
458 | std::shared_ptr<rocksdb::Cache> cache; | |
459 | auto shard_bits = cct->_conf->rocksdb_cache_shard_bits; | |
460 | if (cache_type == "binned_lru") { | |
461 | cache = rocksdb_cache::NewBinnedLRUCache(cct, cache_size, shard_bits, false, cache_prio_high); | |
462 | } else if (cache_type == "lru") { | |
463 | cache = rocksdb::NewLRUCache(cache_size, shard_bits); | |
464 | } else if (cache_type == "clock") { | |
465 | cache = rocksdb::NewClockCache(cache_size, shard_bits); | |
466 | if (!cache) { | |
467 | derr << "rocksdb_cache_type '" << cache | |
468 | << "' chosen, but RocksDB not compiled with LibTBB. " | |
469 | << dendl; | |
470 | } | |
11fdf7f2 | 471 | } else { |
f67539c2 | 472 | derr << "unrecognized rocksdb_cache_type '" << cache_type << "'" << dendl; |
11fdf7f2 | 473 | } |
f67539c2 | 474 | return cache; |
11fdf7f2 TL |
475 | } |
476 | ||
477 | int RocksDBStore::load_rocksdb_options(bool create_if_missing, rocksdb::Options& opt) | |
7c673cae | 478 | { |
7c673cae FG |
479 | rocksdb::Status status; |
480 | ||
481 | if (options_str.length()) { | |
482 | int r = ParseOptionsFromString(options_str, opt); | |
483 | if (r != 0) { | |
484 | return -EINVAL; | |
485 | } | |
486 | } | |
487 | ||
f67539c2 | 488 | if (cct->_conf->rocksdb_perf) { |
7c673cae FG |
489 | dbstats = rocksdb::CreateDBStatistics(); |
490 | opt.statistics = dbstats; | |
491 | } | |
492 | ||
493 | opt.create_if_missing = create_if_missing; | |
11fdf7f2 | 494 | if (kv_options.count("separate_wal_dir")) { |
7c673cae FG |
495 | opt.wal_dir = path + ".wal"; |
496 | } | |
11fdf7f2 TL |
497 | |
498 | // Since ceph::for_each_substr doesn't return a value and | |
499 | // std::stoull does throw, we may as well just catch everything here. | |
500 | try { | |
501 | if (kv_options.count("db_paths")) { | |
502 | list<string> paths; | |
503 | get_str_list(kv_options["db_paths"], "; \t", paths); | |
504 | for (auto& p : paths) { | |
505 | size_t pos = p.find(','); | |
506 | if (pos == std::string::npos) { | |
507 | derr << __func__ << " invalid db path item " << p << " in " | |
508 | << kv_options["db_paths"] << dendl; | |
509 | return -EINVAL; | |
510 | } | |
511 | string path = p.substr(0, pos); | |
512 | string size_str = p.substr(pos + 1); | |
513 | uint64_t size = atoll(size_str.c_str()); | |
514 | if (!size) { | |
515 | derr << __func__ << " invalid db path item " << p << " in " | |
516 | << kv_options["db_paths"] << dendl; | |
517 | return -EINVAL; | |
518 | } | |
519 | opt.db_paths.push_back(rocksdb::DbPath(path, size)); | |
520 | dout(10) << __func__ << " db_path " << path << " size " << size << dendl; | |
7c673cae | 521 | } |
7c673cae | 522 | } |
11fdf7f2 TL |
523 | } catch (const std::system_error& e) { |
524 | return -e.code().value(); | |
7c673cae FG |
525 | } |
526 | ||
f67539c2 TL |
527 | if (cct->_conf->rocksdb_log_to_ceph_log) { |
528 | opt.info_log.reset(new CephRocksdbLogger(cct)); | |
7c673cae FG |
529 | } |
530 | ||
531 | if (priv) { | |
532 | dout(10) << __func__ << " using custom Env " << priv << dendl; | |
533 | opt.env = static_cast<rocksdb::Env*>(priv); | |
f67539c2 TL |
534 | } else { |
535 | env = opt.env; | |
7c673cae FG |
536 | } |
537 | ||
eafe8130 TL |
538 | opt.env->SetAllowNonOwnerAccess(false); |
539 | ||
31f18b77 | 540 | // caches |
224ce89b | 541 | if (!set_cache_flag) { |
f67539c2 | 542 | cache_size = cct->_conf->rocksdb_cache_size; |
31f18b77 | 543 | } |
f67539c2 | 544 | uint64_t row_cache_size = cache_size * cct->_conf->rocksdb_cache_row_ratio; |
31f18b77 | 545 | uint64_t block_cache_size = cache_size - row_cache_size; |
224ce89b | 546 | |
f67539c2 TL |
547 | bbt_opts.block_cache = create_block_cache(cct->_conf->rocksdb_cache_type, block_cache_size); |
548 | if (!bbt_opts.block_cache) { | |
91327a77 | 549 | return -EINVAL; |
31f18b77 | 550 | } |
f67539c2 | 551 | bbt_opts.block_size = cct->_conf->rocksdb_block_size; |
31f18b77 | 552 | |
224ce89b WB |
553 | if (row_cache_size > 0) |
554 | opt.row_cache = rocksdb::NewLRUCache(row_cache_size, | |
f67539c2 TL |
555 | cct->_conf->rocksdb_cache_shard_bits); |
556 | uint64_t bloom_bits = cct->_conf.get_val<uint64_t>("rocksdb_bloom_bits_per_key"); | |
c07f9fc5 | 557 | if (bloom_bits > 0) { |
7c673cae | 558 | dout(10) << __func__ << " set bloom filter bits per key to " |
c07f9fc5 FG |
559 | << bloom_bits << dendl; |
560 | bbt_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(bloom_bits)); | |
561 | } | |
11fdf7f2 | 562 | using std::placeholders::_1; |
f67539c2 | 563 | if (cct->_conf.with_val<std::string>("rocksdb_index_type", |
11fdf7f2 TL |
564 | std::bind(std::equal_to<std::string>(), _1, |
565 | "binary_search"))) | |
c07f9fc5 | 566 | bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kBinarySearch; |
f67539c2 | 567 | if (cct->_conf.with_val<std::string>("rocksdb_index_type", |
11fdf7f2 TL |
568 | std::bind(std::equal_to<std::string>(), _1, |
569 | "hash_search"))) | |
c07f9fc5 | 570 | bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kHashSearch; |
f67539c2 | 571 | if (cct->_conf.with_val<std::string>("rocksdb_index_type", |
11fdf7f2 TL |
572 | std::bind(std::equal_to<std::string>(), _1, |
573 | "two_level"))) | |
c07f9fc5 | 574 | bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; |
11fdf7f2 TL |
575 | if (!bbt_opts.no_block_cache) { |
576 | bbt_opts.cache_index_and_filter_blocks = | |
f67539c2 | 577 | cct->_conf.get_val<bool>("rocksdb_cache_index_and_filter_blocks"); |
11fdf7f2 | 578 | bbt_opts.cache_index_and_filter_blocks_with_high_priority = |
f67539c2 | 579 | cct->_conf.get_val<bool>("rocksdb_cache_index_and_filter_blocks_with_high_priority"); |
11fdf7f2 | 580 | bbt_opts.pin_l0_filter_and_index_blocks_in_cache = |
f67539c2 | 581 | cct->_conf.get_val<bool>("rocksdb_pin_l0_filter_and_index_blocks_in_cache"); |
11fdf7f2 | 582 | } |
f67539c2 TL |
583 | bbt_opts.partition_filters = cct->_conf.get_val<bool>("rocksdb_partition_filters"); |
584 | if (cct->_conf.get_val<Option::size_t>("rocksdb_metadata_block_size") > 0) | |
585 | bbt_opts.metadata_block_size = cct->_conf.get_val<Option::size_t>("rocksdb_metadata_block_size"); | |
c07f9fc5 | 586 | |
7c673cae | 587 | opt.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbt_opts)); |
f67539c2 | 588 | dout(10) << __func__ << " block size " << cct->_conf->rocksdb_block_size |
1adf2230 AA |
589 | << ", block_cache size " << byte_u_t(block_cache_size) |
590 | << ", row_cache size " << byte_u_t(row_cache_size) | |
31f18b77 | 591 | << "; shards " |
f67539c2 TL |
592 | << (1 << cct->_conf->rocksdb_cache_shard_bits) |
593 | << ", type " << cct->_conf->rocksdb_cache_type | |
31f18b77 | 594 | << dendl; |
7c673cae FG |
595 | |
596 | opt.merge_operator.reset(new MergeOperatorRouter(*this)); | |
f67539c2 TL |
597 | comparator = opt.comparator; |
598 | return 0; | |
599 | } | |
600 | ||
601 | void RocksDBStore::add_column_family(const std::string& cf_name, uint32_t hash_l, uint32_t hash_h, | |
602 | size_t shard_idx, rocksdb::ColumnFamilyHandle *handle) { | |
603 | dout(10) << __func__ << " column_name=" << cf_name << " shard_idx=" << shard_idx << | |
604 | " hash_l=" << hash_l << " hash_h=" << hash_h << " handle=" << (void*) handle << dendl; | |
605 | bool exists = cf_handles.count(cf_name) > 0; | |
606 | auto& column = cf_handles[cf_name]; | |
607 | if (exists) { | |
608 | ceph_assert(hash_l == column.hash_l); | |
609 | ceph_assert(hash_h == column.hash_h); | |
610 | } else { | |
611 | ceph_assert(hash_l < hash_h); | |
612 | column.hash_l = hash_l; | |
613 | column.hash_h = hash_h; | |
614 | } | |
615 | if (column.handles.size() <= shard_idx) | |
616 | column.handles.resize(shard_idx + 1); | |
617 | column.handles[shard_idx] = handle; | |
618 | cf_ids_to_prefix.emplace(handle->GetID(), cf_name); | |
619 | } | |
620 | ||
621 | bool RocksDBStore::is_column_family(const std::string& prefix) { | |
622 | return cf_handles.count(prefix); | |
623 | } | |
624 | ||
33c7a0ef TL |
625 | std::string_view RocksDBStore::get_key_hash_view(const prefix_shards& shards, const char* key, const size_t keylen) { |
626 | uint32_t hash_l = std::min<uint32_t>(shards.hash_l, keylen); | |
627 | uint32_t hash_h = std::min<uint32_t>(shards.hash_h, keylen); | |
628 | return { key + hash_l, hash_h - hash_l }; | |
629 | } | |
630 | ||
631 | rocksdb::ColumnFamilyHandle *RocksDBStore::get_key_cf(const prefix_shards& shards, const char* key, const size_t keylen) { | |
632 | auto sv = get_key_hash_view(shards, key, keylen); | |
633 | uint32_t hash = ceph_str_hash_rjenkins(sv.data(), sv.size()); | |
634 | return shards.handles[hash % shards.handles.size()]; | |
635 | } | |
636 | ||
f67539c2 TL |
637 | rocksdb::ColumnFamilyHandle *RocksDBStore::get_cf_handle(const std::string& prefix, const std::string& key) { |
638 | auto iter = cf_handles.find(prefix); | |
639 | if (iter == cf_handles.end()) { | |
640 | return nullptr; | |
641 | } else { | |
642 | if (iter->second.handles.size() == 1) { | |
643 | return iter->second.handles[0]; | |
644 | } else { | |
33c7a0ef | 645 | return get_key_cf(iter->second, key.data(), key.size()); |
f67539c2 TL |
646 | } |
647 | } | |
648 | } | |
649 | ||
650 | rocksdb::ColumnFamilyHandle *RocksDBStore::get_cf_handle(const std::string& prefix, const char* key, size_t keylen) { | |
651 | auto iter = cf_handles.find(prefix); | |
652 | if (iter == cf_handles.end()) { | |
653 | return nullptr; | |
654 | } else { | |
655 | if (iter->second.handles.size() == 1) { | |
656 | return iter->second.handles[0]; | |
657 | } else { | |
33c7a0ef | 658 | return get_key_cf(iter->second, key, keylen); |
f67539c2 TL |
659 | } |
660 | } | |
661 | } | |
662 | ||
33c7a0ef TL |
663 | /** |
664 | * If the specified IteratorBounds arg has both an upper and a lower bound defined, and they have equal placement hash | |
665 | * strings, we can be sure that the entire iteration range exists in a single CF. In that case, we return the relevant | |
666 | * CF handle. In all other cases, we return a nullptr to indicate that the specified bounds cannot necessarily be mapped | |
667 | * to a single CF. | |
668 | */ | |
669 | rocksdb::ColumnFamilyHandle *RocksDBStore::get_cf_handle(const std::string& prefix, const IteratorBounds& bounds) { | |
670 | if (!bounds.lower_bound || !bounds.upper_bound) { | |
671 | return nullptr; | |
672 | } | |
673 | auto iter = cf_handles.find(prefix); | |
674 | ceph_assert(iter != cf_handles.end()); | |
675 | ceph_assert(iter->second.handles.size() != 1); | |
676 | if (iter->second.hash_l != 0) { | |
677 | return nullptr; | |
678 | } | |
679 | auto lower_bound_hash_str = get_key_hash_view(iter->second, bounds.lower_bound->data(), bounds.lower_bound->size()); | |
680 | auto upper_bound_hash_str = get_key_hash_view(iter->second, bounds.upper_bound->data(), bounds.upper_bound->size()); | |
681 | if (lower_bound_hash_str == upper_bound_hash_str) { | |
682 | auto key = *bounds.lower_bound; | |
683 | return get_key_cf(iter->second, key.data(), key.size()); | |
684 | } else { | |
685 | return nullptr; | |
686 | } | |
687 | } | |
688 | ||
f67539c2 TL |
689 | /** |
690 | * Definition of sharding: | |
691 | * space-separated list of: column_def [ '=' options ] | |
692 | * column_def := column_name '(' shard_count ')' | |
693 | * column_def := column_name '(' shard_count ',' hash_begin '-' ')' | |
694 | * column_def := column_name '(' shard_count ',' hash_begin '-' hash_end ')' | |
695 | * I=write_buffer_size=1048576 O(6) m(7,10-) prefix(4,0-10)=disable_auto_compactions=true,max_bytes_for_level_base=1048576 | |
696 | */ | |
697 | bool RocksDBStore::parse_sharding_def(const std::string_view text_def_in, | |
698 | std::vector<ColumnFamily>& sharding_def, | |
699 | char const* *error_position, | |
700 | std::string *error_msg) | |
701 | { | |
702 | std::string_view text_def = text_def_in; | |
703 | char const* error_position_local = nullptr; | |
704 | std::string error_msg_local; | |
705 | if (error_position == nullptr) { | |
706 | error_position = &error_position_local; | |
707 | } | |
708 | *error_position = nullptr; | |
709 | if (error_msg == nullptr) { | |
710 | error_msg = &error_msg_local; | |
711 | error_msg->clear(); | |
712 | } | |
713 | ||
714 | sharding_def.clear(); | |
715 | while (!text_def.empty()) { | |
716 | std::string_view options; | |
717 | std::string_view name; | |
718 | size_t shard_cnt = 1; | |
719 | uint32_t l_bound = 0; | |
720 | uint32_t h_bound = std::numeric_limits<uint32_t>::max(); | |
721 | ||
722 | std::string_view column_def; | |
723 | size_t spos = text_def.find(' '); | |
724 | if (spos == std::string_view::npos) { | |
725 | column_def = text_def; | |
726 | text_def = std::string_view(text_def.end(), 0); | |
727 | } else { | |
728 | column_def = text_def.substr(0, spos); | |
729 | text_def = text_def.substr(spos + 1); | |
730 | } | |
731 | size_t eqpos = column_def.find('='); | |
732 | if (eqpos != std::string_view::npos) { | |
733 | options = column_def.substr(eqpos + 1); | |
734 | column_def = column_def.substr(0, eqpos); | |
735 | } | |
736 | ||
737 | size_t bpos = column_def.find('('); | |
738 | if (bpos != std::string_view::npos) { | |
739 | name = column_def.substr(0, bpos); | |
740 | const char* nptr = &column_def[bpos + 1]; | |
741 | char* endptr; | |
742 | shard_cnt = strtol(nptr, &endptr, 10); | |
743 | if (nptr == endptr) { | |
744 | *error_position = nptr; | |
745 | *error_msg = "expecting integer"; | |
746 | break; | |
747 | } | |
748 | nptr = endptr; | |
749 | if (*nptr == ',') { | |
750 | nptr++; | |
751 | l_bound = strtol(nptr, &endptr, 10); | |
752 | if (nptr == endptr) { | |
753 | *error_position = nptr; | |
754 | *error_msg = "expecting integer"; | |
755 | break; | |
756 | } | |
757 | nptr = endptr; | |
758 | if (*nptr != '-') { | |
759 | *error_position = nptr; | |
760 | *error_msg = "expecting '-'"; | |
761 | break; | |
762 | } | |
763 | nptr++; | |
764 | h_bound = strtol(nptr, &endptr, 10); | |
765 | if (nptr == endptr) { | |
766 | h_bound = std::numeric_limits<uint32_t>::max(); | |
767 | } | |
768 | nptr = endptr; | |
769 | } | |
770 | if (*nptr != ')') { | |
771 | *error_position = nptr; | |
772 | *error_msg = "expecting ')'"; | |
773 | break; | |
774 | } | |
775 | } else { | |
776 | name = column_def; | |
777 | } | |
778 | sharding_def.emplace_back(std::string(name), shard_cnt, std::string(options), l_bound, h_bound); | |
779 | } | |
780 | return *error_position == nullptr; | |
781 | } | |
782 | ||
783 | void RocksDBStore::sharding_def_to_columns(const std::vector<ColumnFamily>& sharding_def, | |
784 | std::vector<std::string>& columns) | |
785 | { | |
786 | columns.clear(); | |
787 | for (size_t i = 0; i < sharding_def.size(); i++) { | |
788 | if (sharding_def[i].shard_cnt == 1) { | |
789 | columns.push_back(sharding_def[i].name); | |
790 | } else { | |
791 | for (size_t j = 0; j < sharding_def[i].shard_cnt; j++) { | |
20effc67 | 792 | columns.push_back(sharding_def[i].name + "-" + std::to_string(j)); |
f67539c2 TL |
793 | } |
794 | } | |
795 | } | |
796 | } | |
797 | ||
798 | int RocksDBStore::create_shards(const rocksdb::Options& opt, | |
799 | const std::vector<ColumnFamily>& sharding_def) | |
800 | { | |
801 | for (auto& p : sharding_def) { | |
802 | // copy default CF settings, block cache, merge operators as | |
803 | // the base for new CF | |
804 | rocksdb::ColumnFamilyOptions cf_opt(opt); | |
f67539c2 | 805 | rocksdb::Status status; |
522d829b TL |
806 | // apply options to column family |
807 | int r = update_column_family_options(p.name, p.options, &cf_opt); | |
808 | if (r != 0) { | |
809 | return r; | |
f67539c2 | 810 | } |
f67539c2 TL |
811 | for (size_t idx = 0; idx < p.shard_cnt; idx++) { |
812 | std::string cf_name; | |
813 | if (p.shard_cnt == 1) | |
814 | cf_name = p.name; | |
815 | else | |
20effc67 | 816 | cf_name = p.name + "-" + std::to_string(idx); |
f67539c2 TL |
817 | rocksdb::ColumnFamilyHandle *cf; |
818 | status = db->CreateColumnFamily(cf_opt, cf_name, &cf); | |
819 | if (!status.ok()) { | |
820 | derr << __func__ << " Failed to create rocksdb column family: " | |
821 | << cf_name << dendl; | |
822 | return -EINVAL; | |
823 | } | |
824 | // store the new CF handle | |
825 | add_column_family(p.name, p.hash_l, p.hash_h, idx, cf); | |
826 | } | |
827 | } | |
828 | return 0; | |
829 | } | |
830 | ||
831 | int RocksDBStore::apply_sharding(const rocksdb::Options& opt, | |
832 | const std::string& sharding_text) | |
833 | { | |
834 | // create and open column families | |
835 | if (!sharding_text.empty()) { | |
836 | bool b; | |
837 | int r; | |
838 | rocksdb::Status status; | |
839 | std::vector<ColumnFamily> sharding_def; | |
840 | char const* error_position; | |
841 | std::string error_msg; | |
842 | b = parse_sharding_def(sharding_text, sharding_def, &error_position, &error_msg); | |
843 | if (!b) { | |
844 | dout(1) << __func__ << " bad sharding: " << dendl; | |
845 | dout(1) << __func__ << sharding_text << dendl; | |
846 | dout(1) << __func__ << std::string(error_position - &sharding_text[0], ' ') << "^" << error_msg << dendl; | |
847 | return -EINVAL; | |
848 | } | |
849 | r = create_shards(opt, sharding_def); | |
850 | if (r != 0 ) { | |
851 | derr << __func__ << " create_shards failed error=" << r << dendl; | |
852 | return r; | |
853 | } | |
854 | opt.env->CreateDir(sharding_def_dir); | |
855 | status = rocksdb::WriteStringToFile(opt.env, sharding_text, | |
856 | sharding_def_file, true); | |
857 | if (!status.ok()) { | |
858 | derr << __func__ << " cannot write to " << sharding_def_file << dendl; | |
859 | return -EIO; | |
860 | } | |
861 | } else { | |
862 | opt.env->DeleteFile(sharding_def_file); | |
863 | } | |
864 | return 0; | |
865 | } | |
522d829b | 866 | |
f67539c2 TL |
867 | // linking to rocksdb function defined in options_helper.cc |
868 | // it can parse nested params like "nested_opt={opt1=1;opt2=2}" | |
f67539c2 TL |
869 | extern rocksdb::Status rocksdb::StringToMap(const std::string& opts_str, |
870 | std::unordered_map<std::string, std::string>* opts_map); | |
871 | ||
522d829b TL |
872 | // Splits column family options from single string into name->value column_opts_map. |
873 | // The split is done using RocksDB parser that understands "{" and "}", so it | |
874 | // properly extracts compound options. | |
875 | // If non-RocksDB option "block_cache" is defined it is extracted to block_cache_opt. | |
876 | int RocksDBStore::split_column_family_options(const std::string& options, | |
877 | std::unordered_map<std::string, std::string>* opt_map, | |
f67539c2 TL |
878 | std::string* block_cache_opt) |
879 | { | |
522d829b TL |
880 | dout(20) << __func__ << " options=" << options << dendl; |
881 | rocksdb::Status status = rocksdb::StringToMap(options, opt_map); | |
f67539c2 | 882 | if (!status.ok()) { |
522d829b TL |
883 | dout(5) << __func__ << " error '" << status.getState() |
884 | << "' while parsing options '" << options << "'" << dendl; | |
f67539c2 TL |
885 | return -EINVAL; |
886 | } | |
522d829b TL |
887 | // if "block_cache" option exists, then move it to separate string |
888 | if (auto it = opt_map->find("block_cache"); it != opt_map->end()) { | |
f67539c2 | 889 | *block_cache_opt = it->second; |
522d829b | 890 | opt_map->erase(it); |
f67539c2 TL |
891 | } else { |
892 | block_cache_opt->clear(); | |
893 | } | |
894 | return 0; | |
895 | } | |
896 | ||
522d829b TL |
897 | // Updates column family options. |
898 | // Take options from more_options and apply them to cf_opt. | |
899 | // Allowed options are exactly the same as allowed for column families in RocksDB. | |
900 | // Ceph addition is "block_cache" option that is translated to block_cache and | |
901 | // allows to specialize separate block cache for O column family. | |
902 | // | |
903 | // base_name - name of column without shard suffix: "-"+number | |
904 | // options - additional options to apply | |
905 | // cf_opt - column family options to update | |
906 | int RocksDBStore::update_column_family_options(const std::string& base_name, | |
907 | const std::string& more_options, | |
908 | rocksdb::ColumnFamilyOptions* cf_opt) | |
909 | { | |
910 | std::unordered_map<std::string, std::string> options_map; | |
911 | std::string block_cache_opt; | |
912 | rocksdb::Status status; | |
913 | int r = split_column_family_options(more_options, &options_map, &block_cache_opt); | |
914 | if (r != 0) { | |
915 | dout(5) << __func__ << " failed to parse options; column family=" << base_name | |
916 | << " options=" << more_options << dendl; | |
917 | return r; | |
918 | } | |
919 | status = rocksdb::GetColumnFamilyOptionsFromMap(*cf_opt, options_map, cf_opt); | |
920 | if (!status.ok()) { | |
921 | dout(5) << __func__ << " invalid column family optionsp; column family=" | |
922 | << base_name << " options=" << more_options << dendl; | |
923 | dout(5) << __func__ << " RocksDB error='" << status.getState() << "'" << dendl; | |
924 | return -EINVAL; | |
925 | } | |
926 | if (base_name != rocksdb::kDefaultColumnFamilyName) { | |
927 | // default cf has its merge operator defined in load_rocksdb_options, should not override it | |
928 | install_cf_mergeop(base_name, cf_opt); | |
929 | } | |
930 | if (!block_cache_opt.empty()) { | |
931 | r = apply_block_cache_options(base_name, block_cache_opt, cf_opt); | |
932 | if (r != 0) { | |
933 | // apply_block_cache_options already does all necessary douts | |
934 | return r; | |
935 | } | |
936 | } | |
937 | return 0; | |
938 | } | |
939 | ||
940 | int RocksDBStore::apply_block_cache_options(const std::string& column_name, | |
941 | const std::string& block_cache_opt, | |
942 | rocksdb::ColumnFamilyOptions* cf_opt) | |
943 | { | |
944 | rocksdb::Status status; | |
945 | std::unordered_map<std::string, std::string> cache_options_map; | |
946 | status = rocksdb::StringToMap(block_cache_opt, &cache_options_map); | |
947 | if (!status.ok()) { | |
948 | dout(5) << __func__ << " invalid block cache options; column=" << column_name | |
949 | << " options=" << block_cache_opt << dendl; | |
950 | dout(5) << __func__ << " RocksDB error='" << status.getState() << "'" << dendl; | |
951 | return -EINVAL; | |
952 | } | |
953 | bool require_new_block_cache = false; | |
954 | std::string cache_type = cct->_conf->rocksdb_cache_type; | |
955 | if (const auto it = cache_options_map.find("type"); it != cache_options_map.end()) { | |
956 | cache_type = it->second; | |
957 | cache_options_map.erase(it); | |
958 | require_new_block_cache = true; | |
959 | } | |
960 | size_t cache_size = cct->_conf->rocksdb_cache_size; | |
961 | if (auto it = cache_options_map.find("size"); it != cache_options_map.end()) { | |
962 | std::string error; | |
963 | cache_size = strict_iecstrtoll(it->second.c_str(), &error); | |
964 | if (!error.empty()) { | |
965 | dout(10) << __func__ << " invalid size: '" << it->second << "'" << dendl; | |
966 | return -EINVAL; | |
967 | } | |
968 | cache_options_map.erase(it); | |
969 | require_new_block_cache = true; | |
970 | } | |
971 | double high_pri_pool_ratio = 0.0; | |
972 | if (auto it = cache_options_map.find("high_ratio"); it != cache_options_map.end()) { | |
973 | std::string error; | |
974 | high_pri_pool_ratio = strict_strtod(it->second.c_str(), &error); | |
975 | if (!error.empty()) { | |
976 | dout(10) << __func__ << " invalid high_pri (float): '" << it->second << "'" << dendl; | |
977 | return -EINVAL; | |
978 | } | |
979 | cache_options_map.erase(it); | |
980 | require_new_block_cache = true; | |
981 | } | |
f67539c2 | 982 | |
522d829b TL |
983 | rocksdb::BlockBasedTableOptions column_bbt_opts; |
984 | status = GetBlockBasedTableOptionsFromMap(bbt_opts, cache_options_map, &column_bbt_opts); | |
985 | if (!status.ok()) { | |
986 | dout(5) << __func__ << " invalid block cache options; column=" << column_name | |
987 | << " options=" << block_cache_opt << dendl; | |
988 | dout(5) << __func__ << " RocksDB error='" << status.getState() << "'" << dendl; | |
989 | return -EINVAL; | |
990 | } | |
991 | std::shared_ptr<rocksdb::Cache> block_cache; | |
992 | if (column_bbt_opts.no_block_cache) { | |
993 | // clear all settings except no_block_cache | |
994 | // rocksdb does not like then | |
995 | column_bbt_opts = rocksdb::BlockBasedTableOptions(); | |
996 | column_bbt_opts.no_block_cache = true; | |
997 | } else { | |
998 | if (require_new_block_cache) { | |
999 | block_cache = create_block_cache(cache_type, cache_size, high_pri_pool_ratio); | |
1000 | if (!block_cache) { | |
1001 | dout(5) << __func__ << " failed to create block cache for params: " << block_cache_opt << dendl; | |
1002 | return -EINVAL; | |
1003 | } | |
1004 | } else { | |
1005 | block_cache = bbt_opts.block_cache; | |
1006 | } | |
1007 | } | |
1008 | column_bbt_opts.block_cache = block_cache; | |
1009 | cf_bbt_opts[column_name] = column_bbt_opts; | |
1010 | cf_opt->table_factory.reset(NewBlockBasedTableFactory(cf_bbt_opts[column_name])); | |
1011 | return 0; | |
1012 | } | |
f67539c2 TL |
1013 | |
1014 | int RocksDBStore::verify_sharding(const rocksdb::Options& opt, | |
1015 | std::vector<rocksdb::ColumnFamilyDescriptor>& existing_cfs, | |
1016 | std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> >& existing_cfs_shard, | |
1017 | std::vector<rocksdb::ColumnFamilyDescriptor>& missing_cfs, | |
1018 | std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> >& missing_cfs_shard) | |
1019 | { | |
1020 | rocksdb::Status status; | |
1021 | std::string stored_sharding_text; | |
1022 | status = opt.env->FileExists(sharding_def_file); | |
1023 | if (status.ok()) { | |
1024 | status = rocksdb::ReadFileToString(opt.env, | |
1025 | sharding_def_file, | |
1026 | &stored_sharding_text); | |
1027 | if(!status.ok()) { | |
1028 | derr << __func__ << " cannot read from " << sharding_def_file << dendl; | |
1029 | return -EIO; | |
1030 | } | |
1031 | dout(20) << __func__ << " sharding=" << stored_sharding_text << dendl; | |
1032 | } else { | |
1033 | dout(30) << __func__ << " no sharding" << dendl; | |
1034 | //no "sharding_def" present | |
1035 | } | |
1036 | //check if sharding_def matches stored_sharding_def | |
1037 | std::vector<ColumnFamily> stored_sharding_def; | |
1038 | parse_sharding_def(stored_sharding_text, stored_sharding_def); | |
1039 | ||
1040 | std::sort(stored_sharding_def.begin(), stored_sharding_def.end(), | |
1041 | [](ColumnFamily& a, ColumnFamily& b) { return a.name < b.name; } ); | |
1042 | ||
1043 | std::vector<string> rocksdb_cfs; | |
1044 | status = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(opt), | |
1045 | path, &rocksdb_cfs); | |
1046 | if (!status.ok()) { | |
522d829b | 1047 | derr << __func__ << " unable to list column families: " << status.ToString() << dendl; |
f67539c2 TL |
1048 | return -EIO; |
1049 | } | |
1050 | dout(5) << __func__ << " column families from rocksdb: " << rocksdb_cfs << dendl; | |
1051 | ||
1052 | auto emplace_cf = [&] (const RocksDBStore::ColumnFamily& column, | |
1053 | int32_t shard_id, | |
1054 | const std::string& shard_name, | |
1055 | const rocksdb::ColumnFamilyOptions& opt) { | |
1056 | if (std::find(rocksdb_cfs.begin(), rocksdb_cfs.end(), shard_name) != rocksdb_cfs.end()) { | |
1057 | existing_cfs.emplace_back(shard_name, opt); | |
1058 | existing_cfs_shard.emplace_back(shard_id, column); | |
1059 | } else { | |
1060 | missing_cfs.emplace_back(shard_name, opt); | |
1061 | missing_cfs_shard.emplace_back(shard_id, column); | |
1062 | } | |
1063 | }; | |
1064 | ||
1065 | for (auto& column : stored_sharding_def) { | |
1066 | rocksdb::ColumnFamilyOptions cf_opt(opt); | |
522d829b | 1067 | int r = update_column_family_options(column.name, column.options, &cf_opt); |
f67539c2 | 1068 | if (r != 0) { |
522d829b | 1069 | return r; |
f67539c2 TL |
1070 | } |
1071 | if (column.shard_cnt == 1) { | |
1072 | emplace_cf(column, 0, column.name, cf_opt); | |
1073 | } else { | |
1074 | for (size_t i = 0; i < column.shard_cnt; i++) { | |
20effc67 | 1075 | std::string cf_name = column.name + "-" + std::to_string(i); |
f67539c2 TL |
1076 | emplace_cf(column, i, cf_name, cf_opt); |
1077 | } | |
1078 | } | |
1079 | } | |
1080 | existing_cfs.emplace_back("default", opt); | |
11fdf7f2 | 1081 | |
f67539c2 TL |
1082 | if (existing_cfs.size() != rocksdb_cfs.size()) { |
1083 | std::vector<std::string> columns_from_stored; | |
1084 | sharding_def_to_columns(stored_sharding_def, columns_from_stored); | |
1085 | derr << __func__ << " extra columns in rocksdb. rocksdb columns = " << rocksdb_cfs | |
1086 | << " target columns = " << columns_from_stored << dendl; | |
1087 | return -EIO; | |
1088 | } | |
11fdf7f2 TL |
1089 | return 0; |
1090 | } | |
1091 | ||
f67539c2 TL |
1092 | std::ostream& operator<<(std::ostream& out, const RocksDBStore::ColumnFamily& cf) |
1093 | { | |
1094 | out << "("; | |
1095 | out << cf.name; | |
1096 | out << ","; | |
1097 | out << cf.shard_cnt; | |
1098 | out << ","; | |
1099 | out << cf.hash_l; | |
1100 | out << "-"; | |
1101 | if (cf.hash_h != std::numeric_limits<uint32_t>::max()) { | |
1102 | out << cf.hash_h; | |
1103 | } | |
1104 | out << ","; | |
1105 | out << cf.options; | |
1106 | out << ")"; | |
1107 | return out; | |
1108 | } | |
1109 | ||
11fdf7f2 TL |
1110 | int RocksDBStore::do_open(ostream &out, |
1111 | bool create_if_missing, | |
1112 | bool open_readonly, | |
f67539c2 | 1113 | const std::string& sharding_text) |
11fdf7f2 TL |
1114 | { |
1115 | ceph_assert(!(create_if_missing && open_readonly)); | |
1116 | rocksdb::Options opt; | |
1117 | int r = load_rocksdb_options(create_if_missing, opt); | |
1118 | if (r) { | |
1119 | dout(1) << __func__ << " load rocksdb options failed" << dendl; | |
1120 | return r; | |
1121 | } | |
1122 | rocksdb::Status status; | |
1123 | if (create_if_missing) { | |
1124 | status = rocksdb::DB::Open(opt, path, &db); | |
1125 | if (!status.ok()) { | |
1126 | derr << status.ToString() << dendl; | |
1127 | return -EINVAL; | |
1128 | } | |
f67539c2 TL |
1129 | r = apply_sharding(opt, sharding_text); |
1130 | if (r < 0) { | |
1131 | return r; | |
11fdf7f2 TL |
1132 | } |
1133 | default_cf = db->DefaultColumnFamily(); | |
1134 | } else { | |
f67539c2 TL |
1135 | std::vector<rocksdb::ColumnFamilyDescriptor> existing_cfs; |
1136 | std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> > existing_cfs_shard; | |
1137 | std::vector<rocksdb::ColumnFamilyDescriptor> missing_cfs; | |
1138 | std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> > missing_cfs_shard; | |
1139 | ||
1140 | r = verify_sharding(opt, | |
1141 | existing_cfs, existing_cfs_shard, | |
1142 | missing_cfs, missing_cfs_shard); | |
1143 | if (r < 0) { | |
1144 | return r; | |
1145 | } | |
1146 | std::string sharding_recreate_text; | |
1147 | status = rocksdb::ReadFileToString(opt.env, | |
1148 | sharding_recreate, | |
1149 | &sharding_recreate_text); | |
1150 | bool recreate_mode = status.ok() && sharding_recreate_text == "1"; | |
1151 | ||
1152 | ceph_assert(!recreate_mode || !open_readonly); | |
1153 | if (recreate_mode == false && missing_cfs.size() != 0) { | |
1154 | // We do not accept when there are missing column families, except case that we are during resharding. | |
1155 | // We can get into this case if resharding was interrupted. It gives a chance to continue. | |
1156 | // Opening DB is only allowed in read-only mode. | |
1157 | if (open_readonly == false && | |
1158 | std::find_if(missing_cfs.begin(), missing_cfs.end(), | |
1159 | [](const rocksdb::ColumnFamilyDescriptor& c) { return c.name == resharding_column_lock; } | |
1160 | ) != missing_cfs.end()) { | |
1161 | derr << __func__ << " missing column families: " << missing_cfs_shard << dendl; | |
1162 | return -EIO; | |
1163 | } | |
1164 | } | |
1165 | ||
11fdf7f2 TL |
1166 | if (existing_cfs.empty()) { |
1167 | // no column families | |
1168 | if (open_readonly) { | |
f67539c2 | 1169 | status = rocksdb::DB::OpenForReadOnly(opt, path, &db); |
11fdf7f2 | 1170 | } else { |
f67539c2 | 1171 | status = rocksdb::DB::Open(opt, path, &db); |
11fdf7f2 TL |
1172 | } |
1173 | if (!status.ok()) { | |
1174 | derr << status.ToString() << dendl; | |
1175 | return -EINVAL; | |
1176 | } | |
1177 | default_cf = db->DefaultColumnFamily(); | |
1178 | } else { | |
11fdf7f2 TL |
1179 | std::vector<rocksdb::ColumnFamilyHandle*> handles; |
1180 | if (open_readonly) { | |
1181 | status = rocksdb::DB::OpenForReadOnly(rocksdb::DBOptions(opt), | |
f67539c2 | 1182 | path, existing_cfs, |
11fdf7f2 TL |
1183 | &handles, &db); |
1184 | } else { | |
1185 | status = rocksdb::DB::Open(rocksdb::DBOptions(opt), | |
f67539c2 | 1186 | path, existing_cfs, &handles, &db); |
11fdf7f2 TL |
1187 | } |
1188 | if (!status.ok()) { | |
1189 | derr << status.ToString() << dendl; | |
1190 | return -EINVAL; | |
1191 | } | |
f67539c2 TL |
1192 | ceph_assert(existing_cfs.size() == existing_cfs_shard.size() + 1); |
1193 | ceph_assert(handles.size() == existing_cfs.size()); | |
1194 | dout(10) << __func__ << " existing_cfs=" << existing_cfs.size() << dendl; | |
1195 | for (size_t i = 0; i < existing_cfs_shard.size(); i++) { | |
1196 | add_column_family(existing_cfs_shard[i].second.name, | |
1197 | existing_cfs_shard[i].second.hash_l, | |
1198 | existing_cfs_shard[i].second.hash_h, | |
1199 | existing_cfs_shard[i].first, | |
1200 | handles[i]); | |
1201 | } | |
1202 | default_cf = handles[handles.size() - 1]; | |
1203 | must_close_default_cf = true; | |
1204 | ||
1205 | if (missing_cfs.size() > 0 && | |
1206 | std::find_if(missing_cfs.begin(), missing_cfs.end(), | |
1207 | [](const rocksdb::ColumnFamilyDescriptor& c) { return c.name == resharding_column_lock; } | |
1208 | ) == missing_cfs.end()) | |
1209 | { | |
1210 | dout(10) << __func__ << " missing_cfs=" << missing_cfs.size() << dendl; | |
1211 | ceph_assert(recreate_mode); | |
1212 | ceph_assert(missing_cfs.size() == missing_cfs_shard.size()); | |
1213 | for (size_t i = 0; i < missing_cfs.size(); i++) { | |
1214 | rocksdb::ColumnFamilyHandle *cf; | |
1215 | status = db->CreateColumnFamily(missing_cfs[i].options, missing_cfs[i].name, &cf); | |
1216 | if (!status.ok()) { | |
1217 | derr << __func__ << " Failed to create rocksdb column family: " | |
1218 | << missing_cfs[i].name << dendl; | |
1219 | return -EINVAL; | |
1220 | } | |
1221 | add_column_family(missing_cfs_shard[i].second.name, | |
1222 | missing_cfs_shard[i].second.hash_l, | |
1223 | missing_cfs_shard[i].second.hash_h, | |
1224 | missing_cfs_shard[i].first, | |
1225 | cf); | |
11fdf7f2 | 1226 | } |
f67539c2 | 1227 | opt.env->DeleteFile(sharding_recreate); |
11fdf7f2 TL |
1228 | } |
1229 | } | |
7c673cae | 1230 | } |
11fdf7f2 | 1231 | ceph_assert(default_cf != nullptr); |
7c673cae | 1232 | |
f67539c2 | 1233 | PerfCountersBuilder plb(cct, "rocksdb", l_rocksdb_first, l_rocksdb_last); |
7c673cae FG |
1234 | plb.add_time_avg(l_rocksdb_get_latency, "get_latency", "Get latency"); |
1235 | plb.add_time_avg(l_rocksdb_submit_latency, "submit_latency", "Submit Latency"); | |
1236 | plb.add_time_avg(l_rocksdb_submit_sync_latency, "submit_sync_latency", "Submit Sync Latency"); | |
1237 | plb.add_u64_counter(l_rocksdb_compact, "compact", "Compactions"); | |
1238 | plb.add_u64_counter(l_rocksdb_compact_range, "compact_range", "Compactions by range"); | |
1239 | plb.add_u64_counter(l_rocksdb_compact_queue_merge, "compact_queue_merge", "Mergings of ranges in compaction queue"); | |
1240 | plb.add_u64(l_rocksdb_compact_queue_len, "compact_queue_len", "Length of compaction queue"); | |
1241 | plb.add_time_avg(l_rocksdb_write_wal_time, "rocksdb_write_wal_time", "Rocksdb write wal time"); | |
1242 | plb.add_time_avg(l_rocksdb_write_memtable_time, "rocksdb_write_memtable_time", "Rocksdb write memtable time"); | |
1243 | plb.add_time_avg(l_rocksdb_write_delay_time, "rocksdb_write_delay_time", "Rocksdb write delay time"); | |
1244 | plb.add_time_avg(l_rocksdb_write_pre_and_post_process_time, | |
1245 | "rocksdb_write_pre_and_post_time", "total time spent on writing a record, excluding write process"); | |
1246 | logger = plb.create_perf_counters(); | |
1247 | cct->get_perfcounters_collection()->add(logger); | |
1248 | ||
1249 | if (compact_on_mount) { | |
1250 | derr << "Compacting rocksdb store..." << dendl; | |
1251 | compact(); | |
1252 | derr << "Finished compacting rocksdb store" << dendl; | |
1253 | } | |
1254 | return 0; | |
1255 | } | |
1256 | ||
1257 | int RocksDBStore::_test_init(const string& dir) | |
1258 | { | |
1259 | rocksdb::Options options; | |
1260 | options.create_if_missing = true; | |
1261 | rocksdb::DB *db; | |
1262 | rocksdb::Status status = rocksdb::DB::Open(options, dir, &db); | |
1263 | delete db; | |
1264 | db = nullptr; | |
1265 | return status.ok() ? 0 : -EIO; | |
1266 | } | |
1267 | ||
1268 | RocksDBStore::~RocksDBStore() | |
1269 | { | |
1270 | close(); | |
7c673cae FG |
1271 | if (priv) { |
1272 | delete static_cast<rocksdb::Env*>(priv); | |
1273 | } | |
1274 | } | |
1275 | ||
1276 | void RocksDBStore::close() | |
1277 | { | |
1278 | // stop compaction thread | |
9f95a23c | 1279 | compact_queue_lock.lock(); |
7c673cae | 1280 | if (compact_thread.is_started()) { |
92f5a8d4 | 1281 | dout(1) << __func__ << " waiting for compaction thread to stop" << dendl; |
7c673cae | 1282 | compact_queue_stop = true; |
9f95a23c TL |
1283 | compact_queue_cond.notify_all(); |
1284 | compact_queue_lock.unlock(); | |
7c673cae | 1285 | compact_thread.join(); |
f67539c2 | 1286 | dout(1) << __func__ << " compaction thread to stopped" << dendl; |
7c673cae | 1287 | } else { |
9f95a23c | 1288 | compact_queue_lock.unlock(); |
7c673cae FG |
1289 | } |
1290 | ||
f67539c2 | 1291 | if (logger) { |
7c673cae | 1292 | cct->get_perfcounters_collection()->remove(logger); |
f67539c2 TL |
1293 | delete logger; |
1294 | logger = nullptr; | |
1295 | } | |
1296 | ||
1297 | // Ensure db is destroyed before dependent db_cache and filterpolicy | |
1298 | for (auto& p : cf_handles) { | |
1299 | for (size_t i = 0; i < p.second.handles.size(); i++) { | |
1300 | db->DestroyColumnFamilyHandle(p.second.handles[i]); | |
1301 | } | |
1302 | } | |
1303 | cf_handles.clear(); | |
1304 | if (must_close_default_cf) { | |
1305 | db->DestroyColumnFamilyHandle(default_cf); | |
1306 | must_close_default_cf = false; | |
1307 | } | |
1308 | default_cf = nullptr; | |
1309 | delete db; | |
1310 | db = nullptr; | |
7c673cae FG |
1311 | } |
1312 | ||
11fdf7f2 TL |
1313 | int RocksDBStore::repair(std::ostream &out) |
1314 | { | |
f67539c2 | 1315 | rocksdb::Status status; |
11fdf7f2 TL |
1316 | rocksdb::Options opt; |
1317 | int r = load_rocksdb_options(false, opt); | |
1318 | if (r) { | |
1319 | dout(1) << __func__ << " load rocksdb options failed" << dendl; | |
1320 | out << "load rocksdb options failed" << std::endl; | |
1321 | return r; | |
1322 | } | |
f67539c2 TL |
1323 | //need to save sharding definition, repairDB will delete files it does not know |
1324 | std::string stored_sharding_text; | |
1325 | status = opt.env->FileExists(sharding_def_file); | |
11fdf7f2 | 1326 | if (status.ok()) { |
f67539c2 TL |
1327 | status = rocksdb::ReadFileToString(opt.env, |
1328 | sharding_def_file, | |
1329 | &stored_sharding_text); | |
1330 | if (!status.ok()) { | |
1331 | stored_sharding_text.clear(); | |
1332 | } | |
1333 | } | |
1334 | dout(10) << __func__ << " stored_sharding: " << stored_sharding_text << dendl; | |
1335 | status = rocksdb::RepairDB(path, opt); | |
1336 | bool repaired = status.ok(); | |
1337 | if (!stored_sharding_text.empty()) { | |
1338 | //recreate markers even if repair failed | |
1339 | opt.env->CreateDir(sharding_def_dir); | |
1340 | status = rocksdb::WriteStringToFile(opt.env, stored_sharding_text, | |
1341 | sharding_def_file, true); | |
1342 | if (!status.ok()) { | |
1343 | derr << __func__ << " cannot write to " << sharding_def_file << dendl; | |
1344 | return -1; | |
1345 | } | |
1346 | status = rocksdb::WriteStringToFile(opt.env, "1", | |
1347 | sharding_recreate, true); | |
1348 | if (!status.ok()) { | |
1349 | derr << __func__ << " cannot write to " << sharding_recreate << dendl; | |
1350 | return -1; | |
1351 | } | |
1352 | // fiinalize sharding recreate | |
1353 | if (do_open(out, false, false)) { | |
1354 | derr << __func__ << " cannot finalize repair" << dendl; | |
1355 | return -1; | |
1356 | } | |
1357 | close(); | |
1358 | } | |
1359 | ||
1360 | if (repaired && status.ok()) { | |
11fdf7f2 TL |
1361 | return 0; |
1362 | } else { | |
1363 | out << "repair rocksdb failed : " << status.ToString() << std::endl; | |
f67539c2 | 1364 | return -1; |
11fdf7f2 TL |
1365 | } |
1366 | } | |
1367 | ||
7c673cae FG |
1368 | void RocksDBStore::split_stats(const std::string &s, char delim, std::vector<std::string> &elems) { |
1369 | std::stringstream ss; | |
1370 | ss.str(s); | |
1371 | std::string item; | |
1372 | while (std::getline(ss, item, delim)) { | |
1373 | elems.push_back(item); | |
1374 | } | |
1375 | } | |
1376 | ||
9f95a23c TL |
1377 | bool RocksDBStore::get_property( |
1378 | const std::string &property, | |
1379 | uint64_t *out) | |
1380 | { | |
1381 | return db->GetIntProperty(property, out); | |
1382 | } | |
1383 | ||
1384 | int64_t RocksDBStore::estimate_prefix_size(const string& prefix, | |
1385 | const string& key_prefix) | |
11fdf7f2 | 1386 | { |
11fdf7f2 | 1387 | uint64_t size = 0; |
f67539c2 TL |
1388 | auto p_iter = cf_handles.find(prefix); |
1389 | if (p_iter != cf_handles.end()) { | |
1390 | for (auto cf : p_iter->second.handles) { | |
1391 | uint64_t s = 0; | |
1392 | string start = key_prefix + string(1, '\x00'); | |
1393 | string limit = key_prefix + string("\xff\xff\xff\xff"); | |
1394 | rocksdb::Range r(start, limit); | |
33c7a0ef | 1395 | db->GetApproximateSizes(cf, &r, 1, &s); |
f67539c2 TL |
1396 | size += s; |
1397 | } | |
11fdf7f2 | 1398 | } else { |
9f95a23c TL |
1399 | string start = combine_strings(prefix , key_prefix); |
1400 | string limit = combine_strings(prefix , key_prefix + "\xff\xff\xff\xff"); | |
1401 | rocksdb::Range r(start, limit); | |
33c7a0ef | 1402 | db->GetApproximateSizes(default_cf, &r, 1, &size); |
11fdf7f2 TL |
1403 | } |
1404 | return size; | |
1405 | } | |
1406 | ||
7c673cae FG |
1407 | void RocksDBStore::get_statistics(Formatter *f) |
1408 | { | |
f67539c2 | 1409 | if (!cct->_conf->rocksdb_perf) { |
11fdf7f2 | 1410 | dout(20) << __func__ << " RocksDB perf is disabled, can't probe for stats" |
7c673cae FG |
1411 | << dendl; |
1412 | return; | |
1413 | } | |
1414 | ||
f67539c2 | 1415 | if (cct->_conf->rocksdb_collect_compaction_stats) { |
7c673cae FG |
1416 | std::string stat_str; |
1417 | bool status = db->GetProperty("rocksdb.stats", &stat_str); | |
1418 | if (status) { | |
1419 | f->open_object_section("rocksdb_statistics"); | |
1420 | f->dump_string("rocksdb_compaction_statistics", ""); | |
1421 | vector<string> stats; | |
1422 | split_stats(stat_str, '\n', stats); | |
1423 | for (auto st :stats) { | |
1424 | f->dump_string("", st); | |
1425 | } | |
1426 | f->close_section(); | |
1427 | } | |
1428 | } | |
f67539c2 | 1429 | if (cct->_conf->rocksdb_collect_extended_stats) { |
7c673cae FG |
1430 | if (dbstats) { |
1431 | f->open_object_section("rocksdb_extended_statistics"); | |
1432 | string stat_str = dbstats->ToString(); | |
1433 | vector<string> stats; | |
1434 | split_stats(stat_str, '\n', stats); | |
1435 | f->dump_string("rocksdb_extended_statistics", ""); | |
1436 | for (auto st :stats) { | |
1437 | f->dump_string(".", st); | |
1438 | } | |
1439 | f->close_section(); | |
1440 | } | |
1441 | f->open_object_section("rocksdbstore_perf_counters"); | |
1442 | logger->dump_formatted(f,0); | |
1443 | f->close_section(); | |
1444 | } | |
f67539c2 | 1445 | if (cct->_conf->rocksdb_collect_memory_stats) { |
7c673cae | 1446 | f->open_object_section("rocksdb_memtable_statistics"); |
11fdf7f2 TL |
1447 | std::string str; |
1448 | if (!bbt_opts.no_block_cache) { | |
1449 | str.append(stringify(bbt_opts.block_cache->GetUsage())); | |
1450 | f->dump_string("block_cache_usage", str.data()); | |
1451 | str.clear(); | |
1452 | str.append(stringify(bbt_opts.block_cache->GetPinnedUsage())); | |
1453 | f->dump_string("block_cache_pinned_blocks_usage", str); | |
1454 | str.clear(); | |
1455 | } | |
7c673cae FG |
1456 | db->GetProperty("rocksdb.cur-size-all-mem-tables", &str); |
1457 | f->dump_string("rocksdb_memtable_usage", str); | |
11fdf7f2 TL |
1458 | str.clear(); |
1459 | db->GetProperty("rocksdb.estimate-table-readers-mem", &str); | |
1460 | f->dump_string("rocksdb_index_filter_blocks_usage", str); | |
7c673cae FG |
1461 | f->close_section(); |
1462 | } | |
1463 | } | |
1464 | ||
f67539c2 TL |
1465 | struct RocksDBStore::RocksWBHandler: public rocksdb::WriteBatch::Handler { |
1466 | RocksWBHandler(const RocksDBStore& db) : db(db) {} | |
1467 | const RocksDBStore& db; | |
1468 | std::stringstream seen; | |
1469 | int num_seen = 0; | |
1470 | ||
1471 | void dump(const char* op_name, | |
1472 | uint32_t column_family_id, | |
1473 | const rocksdb::Slice& key_in, | |
1474 | const rocksdb::Slice* value = nullptr) { | |
1475 | string prefix; | |
1476 | string key; | |
1477 | ssize_t size = value ? value->size() : -1; | |
1478 | seen << std::endl << op_name << "("; | |
1479 | ||
1480 | if (column_family_id == 0) { | |
1481 | db.split_key(key_in, &prefix, &key); | |
1482 | } else { | |
1483 | auto it = db.cf_ids_to_prefix.find(column_family_id); | |
1484 | ceph_assert(it != db.cf_ids_to_prefix.end()); | |
1485 | prefix = it->second; | |
1486 | key = key_in.ToString(); | |
1487 | } | |
1488 | seen << " prefix = " << prefix; | |
1489 | seen << " key = " << pretty_binary_string(key); | |
1490 | if (size != -1) | |
1491 | seen << " value size = " << std::to_string(size); | |
1492 | seen << ")"; | |
1493 | num_seen++; | |
1494 | } | |
1495 | void Put(const rocksdb::Slice& key, | |
1496 | const rocksdb::Slice& value) override { | |
1497 | dump("Put", 0, key, &value); | |
1498 | } | |
1499 | rocksdb::Status PutCF(uint32_t column_family_id, const rocksdb::Slice& key, | |
1500 | const rocksdb::Slice& value) override { | |
1501 | dump("PutCF", column_family_id, key, &value); | |
1502 | return rocksdb::Status::OK(); | |
1503 | } | |
1504 | void SingleDelete(const rocksdb::Slice& key) override { | |
1505 | dump("SingleDelete", 0, key); | |
1506 | } | |
1507 | rocksdb::Status SingleDeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override { | |
1508 | dump("SingleDeleteCF", column_family_id, key); | |
1509 | return rocksdb::Status::OK(); | |
1510 | } | |
1511 | void Delete(const rocksdb::Slice& key) override { | |
1512 | dump("Delete", 0, key); | |
1513 | } | |
1514 | rocksdb::Status DeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override { | |
1515 | dump("DeleteCF", column_family_id, key); | |
1516 | return rocksdb::Status::OK(); | |
1517 | } | |
1518 | void Merge(const rocksdb::Slice& key, | |
1519 | const rocksdb::Slice& value) override { | |
1520 | dump("Merge", 0, key, &value); | |
1521 | } | |
1522 | rocksdb::Status MergeCF(uint32_t column_family_id, const rocksdb::Slice& key, | |
1523 | const rocksdb::Slice& value) override { | |
1524 | dump("MergeCF", column_family_id, key, &value); | |
1525 | return rocksdb::Status::OK(); | |
1526 | } | |
1527 | bool Continue() override { return num_seen < 50; } | |
1528 | }; | |
1529 | ||
11fdf7f2 | 1530 | int RocksDBStore::submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t) |
7c673cae | 1531 | { |
7c673cae FG |
1532 | // enable rocksdb breakdown |
1533 | // considering performance overhead, default is disabled | |
f67539c2 | 1534 | if (cct->_conf->rocksdb_perf) { |
7c673cae | 1535 | rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex); |
11fdf7f2 | 1536 | rocksdb::get_perf_context()->Reset(); |
7c673cae FG |
1537 | } |
1538 | ||
1539 | RocksDBTransactionImpl * _t = | |
1540 | static_cast<RocksDBTransactionImpl *>(t.get()); | |
7c673cae FG |
1541 | woptions.disableWAL = disableWAL; |
1542 | lgeneric_subdout(cct, rocksdb, 30) << __func__; | |
f67539c2 | 1543 | RocksWBHandler bat_txc(*this); |
7c673cae | 1544 | _t->bat.Iterate(&bat_txc); |
f67539c2 | 1545 | *_dout << " Rocksdb transaction: " << bat_txc.seen.str() << dendl; |
7c673cae FG |
1546 | |
1547 | rocksdb::Status s = db->Write(woptions, &_t->bat); | |
1548 | if (!s.ok()) { | |
f67539c2 | 1549 | RocksWBHandler rocks_txc(*this); |
7c673cae FG |
1550 | _t->bat.Iterate(&rocks_txc); |
1551 | derr << __func__ << " error: " << s.ToString() << " code = " << s.code() | |
f67539c2 | 1552 | << " Rocksdb transaction: " << rocks_txc.seen.str() << dendl; |
7c673cae | 1553 | } |
7c673cae | 1554 | |
f67539c2 | 1555 | if (cct->_conf->rocksdb_perf) { |
7c673cae FG |
1556 | utime_t write_memtable_time; |
1557 | utime_t write_delay_time; | |
1558 | utime_t write_wal_time; | |
1559 | utime_t write_pre_and_post_process_time; | |
1560 | write_wal_time.set_from_double( | |
11fdf7f2 | 1561 | static_cast<double>(rocksdb::get_perf_context()->write_wal_time)/1000000000); |
7c673cae | 1562 | write_memtable_time.set_from_double( |
11fdf7f2 | 1563 | static_cast<double>(rocksdb::get_perf_context()->write_memtable_time)/1000000000); |
7c673cae | 1564 | write_delay_time.set_from_double( |
11fdf7f2 | 1565 | static_cast<double>(rocksdb::get_perf_context()->write_delay_time)/1000000000); |
7c673cae | 1566 | write_pre_and_post_process_time.set_from_double( |
11fdf7f2 | 1567 | static_cast<double>(rocksdb::get_perf_context()->write_pre_and_post_process_time)/1000000000); |
7c673cae FG |
1568 | logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time); |
1569 | logger->tinc(l_rocksdb_write_delay_time, write_delay_time); | |
1570 | logger->tinc(l_rocksdb_write_wal_time, write_wal_time); | |
1571 | logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time); | |
1572 | } | |
1573 | ||
7c673cae FG |
1574 | return s.ok() ? 0 : -1; |
1575 | } | |
1576 | ||
11fdf7f2 | 1577 | int RocksDBStore::submit_transaction(KeyValueDB::Transaction t) |
7c673cae FG |
1578 | { |
1579 | utime_t start = ceph_clock_now(); | |
7c673cae | 1580 | rocksdb::WriteOptions woptions; |
11fdf7f2 | 1581 | woptions.sync = false; |
7c673cae | 1582 | |
11fdf7f2 | 1583 | int result = submit_common(woptions, t); |
7c673cae | 1584 | |
11fdf7f2 | 1585 | utime_t lat = ceph_clock_now() - start; |
11fdf7f2 TL |
1586 | logger->tinc(l_rocksdb_submit_latency, lat); |
1587 | ||
1588 | return result; | |
1589 | } | |
7c673cae | 1590 | |
11fdf7f2 TL |
1591 | int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t) |
1592 | { | |
1593 | utime_t start = ceph_clock_now(); | |
1594 | rocksdb::WriteOptions woptions; | |
1595 | // if disableWAL, sync can't set | |
1596 | woptions.sync = !disableWAL; | |
1597 | ||
1598 | int result = submit_common(woptions, t); | |
1599 | ||
1600 | utime_t lat = ceph_clock_now() - start; | |
7c673cae FG |
1601 | logger->tinc(l_rocksdb_submit_sync_latency, lat); |
1602 | ||
11fdf7f2 | 1603 | return result; |
7c673cae FG |
1604 | } |
1605 | ||
1606 | RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db) | |
1607 | { | |
1608 | db = _db; | |
1609 | } | |
1610 | ||
11fdf7f2 TL |
1611 | void RocksDBStore::RocksDBTransactionImpl::put_bat( |
1612 | rocksdb::WriteBatch& bat, | |
1613 | rocksdb::ColumnFamilyHandle *cf, | |
1614 | const string &key, | |
7c673cae FG |
1615 | const bufferlist &to_set_bl) |
1616 | { | |
7c673cae FG |
1617 | // bufferlist::c_str() is non-constant, so we can't call c_str() |
1618 | if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) { | |
11fdf7f2 TL |
1619 | bat.Put(cf, |
1620 | rocksdb::Slice(key), | |
1621 | rocksdb::Slice(to_set_bl.buffers().front().c_str(), | |
1622 | to_set_bl.length())); | |
7c673cae | 1623 | } else { |
31f18b77 | 1624 | rocksdb::Slice key_slice(key); |
9f95a23c | 1625 | vector<rocksdb::Slice> value_slices(to_set_bl.get_num_buffers()); |
11fdf7f2 TL |
1626 | bat.Put(cf, |
1627 | rocksdb::SliceParts(&key_slice, 1), | |
c07f9fc5 | 1628 | prepare_sliceparts(to_set_bl, &value_slices)); |
7c673cae FG |
1629 | } |
1630 | } | |
1631 | ||
1632 | void RocksDBStore::RocksDBTransactionImpl::set( | |
1633 | const string &prefix, | |
11fdf7f2 | 1634 | const string &k, |
7c673cae FG |
1635 | const bufferlist &to_set_bl) |
1636 | { | |
f67539c2 | 1637 | auto cf = db->get_cf_handle(prefix, k); |
11fdf7f2 TL |
1638 | if (cf) { |
1639 | put_bat(bat, cf, k, to_set_bl); | |
1640 | } else { | |
1641 | string key = combine_strings(prefix, k); | |
1642 | put_bat(bat, db->default_cf, key, to_set_bl); | |
1643 | } | |
1644 | } | |
7c673cae | 1645 | |
11fdf7f2 TL |
1646 | void RocksDBStore::RocksDBTransactionImpl::set( |
1647 | const string &prefix, | |
1648 | const char *k, size_t keylen, | |
1649 | const bufferlist &to_set_bl) | |
1650 | { | |
f67539c2 | 1651 | auto cf = db->get_cf_handle(prefix, k, keylen); |
11fdf7f2 TL |
1652 | if (cf) { |
1653 | string key(k, keylen); // fixme? | |
1654 | put_bat(bat, cf, key, to_set_bl); | |
7c673cae | 1655 | } else { |
11fdf7f2 TL |
1656 | string key; |
1657 | combine_strings(prefix, k, keylen, &key); | |
1658 | put_bat(bat, cf, key, to_set_bl); | |
7c673cae FG |
1659 | } |
1660 | } | |
1661 | ||
1662 | void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix, | |
1663 | const string &k) | |
1664 | { | |
f67539c2 | 1665 | auto cf = db->get_cf_handle(prefix, k); |
11fdf7f2 TL |
1666 | if (cf) { |
1667 | bat.Delete(cf, rocksdb::Slice(k)); | |
1668 | } else { | |
1669 | bat.Delete(db->default_cf, combine_strings(prefix, k)); | |
1670 | } | |
7c673cae FG |
1671 | } |
1672 | ||
1673 | void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix, | |
1674 | const char *k, | |
1675 | size_t keylen) | |
1676 | { | |
f67539c2 | 1677 | auto cf = db->get_cf_handle(prefix, k, keylen); |
11fdf7f2 TL |
1678 | if (cf) { |
1679 | bat.Delete(cf, rocksdb::Slice(k, keylen)); | |
1680 | } else { | |
1681 | string key; | |
1682 | combine_strings(prefix, k, keylen, &key); | |
1683 | bat.Delete(db->default_cf, rocksdb::Slice(key)); | |
1684 | } | |
7c673cae FG |
1685 | } |
1686 | ||
1687 | void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix, | |
1688 | const string &k) | |
1689 | { | |
f67539c2 | 1690 | auto cf = db->get_cf_handle(prefix, k); |
11fdf7f2 TL |
1691 | if (cf) { |
1692 | bat.SingleDelete(cf, k); | |
1693 | } else { | |
1694 | bat.SingleDelete(db->default_cf, combine_strings(prefix, k)); | |
1695 | } | |
7c673cae FG |
1696 | } |
1697 | ||
1698 | void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix) | |
1699 | { | |
f67539c2 TL |
1700 | auto p_iter = db->cf_handles.find(prefix); |
1701 | if (p_iter == db->cf_handles.end()) { | |
1702 | uint64_t cnt = db->delete_range_threshold; | |
1703 | bat.SetSavePoint(); | |
1704 | auto it = db->get_iterator(prefix); | |
1705 | for (it->seek_to_first(); it->valid() && (--cnt) != 0; it->next()) { | |
1706 | bat.Delete(db->default_cf, combine_strings(prefix, it->key())); | |
1707 | } | |
1708 | if (cnt == 0) { | |
1709 | bat.RollbackToSavePoint(); | |
1710 | string endprefix = prefix; | |
9f95a23c TL |
1711 | endprefix.push_back('\x01'); |
1712 | bat.DeleteRange(db->default_cf, | |
1713 | combine_strings(prefix, string()), | |
1714 | combine_strings(endprefix, string())); | |
11fdf7f2 | 1715 | } else { |
f67539c2 TL |
1716 | bat.PopSavePoint(); |
1717 | } | |
1718 | } else { | |
1719 | ceph_assert(p_iter->second.handles.size() >= 1); | |
1720 | for (auto cf : p_iter->second.handles) { | |
1721 | uint64_t cnt = db->delete_range_threshold; | |
1722 | bat.SetSavePoint(); | |
1723 | auto it = db->new_shard_iterator(cf); | |
1724 | for (it->SeekToFirst(); it->Valid() && (--cnt) != 0; it->Next()) { | |
1725 | bat.Delete(cf, it->key()); | |
1726 | } | |
1727 | if (cnt == 0) { | |
1728 | bat.RollbackToSavePoint(); | |
1729 | string endprefix = "\xff\xff\xff\xff"; // FIXME: this is cheating... | |
1730 | bat.DeleteRange(cf, string(), endprefix); | |
1731 | } else { | |
1732 | bat.PopSavePoint(); | |
1733 | } | |
31f18b77 | 1734 | } |
7c673cae FG |
1735 | } |
1736 | } | |
1737 | ||
1738 | void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix, | |
1739 | const string &start, | |
1740 | const string &end) | |
1741 | { | |
2a845540 TL |
1742 | ldout(db->cct, 10) << __func__ << " enter start=" << start |
1743 | << " end=" << end << dendl; | |
f67539c2 TL |
1744 | auto p_iter = db->cf_handles.find(prefix); |
1745 | if (p_iter == db->cf_handles.end()) { | |
1746 | uint64_t cnt = db->delete_range_threshold; | |
1747 | bat.SetSavePoint(); | |
1748 | auto it = db->get_iterator(prefix); | |
1749 | for (it->lower_bound(start); | |
1750 | it->valid() && db->comparator->Compare(it->key(), end) < 0 && (--cnt) != 0; | |
1751 | it->next()) { | |
1752 | bat.Delete(db->default_cf, combine_strings(prefix, it->key())); | |
11fdf7f2 | 1753 | } |
f67539c2 | 1754 | if (cnt == 0) { |
2a845540 TL |
1755 | ldout(db->cct, 10) << __func__ << " p_iter == end(), resorting to DeleteRange" |
1756 | << dendl; | |
9f95a23c | 1757 | bat.RollbackToSavePoint(); |
f67539c2 TL |
1758 | bat.DeleteRange(db->default_cf, |
1759 | rocksdb::Slice(combine_strings(prefix, start)), | |
1760 | rocksdb::Slice(combine_strings(prefix, end))); | |
1761 | } else { | |
1762 | bat.PopSavePoint(); | |
1763 | } | |
1764 | } else { | |
1765 | ceph_assert(p_iter->second.handles.size() >= 1); | |
1766 | for (auto cf : p_iter->second.handles) { | |
1767 | uint64_t cnt = db->delete_range_threshold; | |
1768 | bat.SetSavePoint(); | |
1769 | rocksdb::Iterator* it = db->new_shard_iterator(cf); | |
1770 | ceph_assert(it != nullptr); | |
1771 | for (it->Seek(start); | |
1772 | it->Valid() && db->comparator->Compare(it->key(), end) < 0 && (--cnt) != 0; | |
1773 | it->Next()) { | |
1774 | bat.Delete(cf, it->key()); | |
1775 | } | |
1776 | if (cnt == 0) { | |
2a845540 TL |
1777 | ldout(db->cct, 10) << __func__ << " p_iter != end(), resorting to DeleteRange" |
1778 | << dendl; | |
f67539c2 TL |
1779 | bat.RollbackToSavePoint(); |
1780 | bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end)); | |
494da23a | 1781 | } else { |
f67539c2 | 1782 | bat.PopSavePoint(); |
494da23a | 1783 | } |
f67539c2 | 1784 | delete it; |
9f95a23c | 1785 | } |
7c673cae | 1786 | } |
2a845540 | 1787 | ldout(db->cct, 10) << __func__ << " end" << dendl; |
7c673cae FG |
1788 | } |
1789 | ||
1790 | void RocksDBStore::RocksDBTransactionImpl::merge( | |
1791 | const string &prefix, | |
1792 | const string &k, | |
1793 | const bufferlist &to_set_bl) | |
1794 | { | |
f67539c2 | 1795 | auto cf = db->get_cf_handle(prefix, k); |
11fdf7f2 TL |
1796 | if (cf) { |
1797 | // bufferlist::c_str() is non-constant, so we can't call c_str() | |
1798 | if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) { | |
1799 | bat.Merge( | |
1800 | cf, | |
1801 | rocksdb::Slice(k), | |
1802 | rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length())); | |
1803 | } else { | |
1804 | // make a copy | |
1805 | rocksdb::Slice key_slice(k); | |
9f95a23c | 1806 | vector<rocksdb::Slice> value_slices(to_set_bl.get_num_buffers()); |
11fdf7f2 TL |
1807 | bat.Merge(cf, rocksdb::SliceParts(&key_slice, 1), |
1808 | prepare_sliceparts(to_set_bl, &value_slices)); | |
1809 | } | |
7c673cae | 1810 | } else { |
11fdf7f2 TL |
1811 | string key = combine_strings(prefix, k); |
1812 | // bufferlist::c_str() is non-constant, so we can't call c_str() | |
1813 | if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) { | |
1814 | bat.Merge( | |
1815 | db->default_cf, | |
1816 | rocksdb::Slice(key), | |
1817 | rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length())); | |
1818 | } else { | |
1819 | // make a copy | |
1820 | rocksdb::Slice key_slice(key); | |
9f95a23c | 1821 | vector<rocksdb::Slice> value_slices(to_set_bl.get_num_buffers()); |
11fdf7f2 TL |
1822 | bat.Merge( |
1823 | db->default_cf, | |
1824 | rocksdb::SliceParts(&key_slice, 1), | |
1825 | prepare_sliceparts(to_set_bl, &value_slices)); | |
1826 | } | |
7c673cae FG |
1827 | } |
1828 | } | |
1829 | ||
7c673cae FG |
1830 | int RocksDBStore::get( |
1831 | const string &prefix, | |
1832 | const std::set<string> &keys, | |
1833 | std::map<string, bufferlist> *out) | |
1834 | { | |
f67539c2 | 1835 | rocksdb::PinnableSlice value; |
7c673cae | 1836 | utime_t start = ceph_clock_now(); |
f67539c2 | 1837 | if (cf_handles.count(prefix) > 0) { |
11fdf7f2 | 1838 | for (auto& key : keys) { |
f67539c2 | 1839 | auto cf_handle = get_cf_handle(prefix, key); |
11fdf7f2 | 1840 | auto status = db->Get(rocksdb::ReadOptions(), |
f67539c2 | 1841 | cf_handle, |
11fdf7f2 TL |
1842 | rocksdb::Slice(key), |
1843 | &value); | |
1844 | if (status.ok()) { | |
f67539c2 | 1845 | (*out)[key].append(value.data(), value.size()); |
11fdf7f2 TL |
1846 | } else if (status.IsIOError()) { |
1847 | ceph_abort_msg(status.getState()); | |
1848 | } | |
f67539c2 | 1849 | value.Reset(); |
11fdf7f2 TL |
1850 | } |
1851 | } else { | |
1852 | for (auto& key : keys) { | |
11fdf7f2 TL |
1853 | string k = combine_strings(prefix, key); |
1854 | auto status = db->Get(rocksdb::ReadOptions(), | |
1855 | default_cf, | |
1856 | rocksdb::Slice(k), | |
1857 | &value); | |
1858 | if (status.ok()) { | |
f67539c2 | 1859 | (*out)[key].append(value.data(), value.size()); |
11fdf7f2 TL |
1860 | } else if (status.IsIOError()) { |
1861 | ceph_abort_msg(status.getState()); | |
1862 | } | |
f67539c2 | 1863 | value.Reset(); |
224ce89b | 1864 | } |
7c673cae FG |
1865 | } |
1866 | utime_t lat = ceph_clock_now() - start; | |
7c673cae FG |
1867 | logger->tinc(l_rocksdb_get_latency, lat); |
1868 | return 0; | |
1869 | } | |
1870 | ||
1871 | int RocksDBStore::get( | |
1872 | const string &prefix, | |
1873 | const string &key, | |
1874 | bufferlist *out) | |
1875 | { | |
11fdf7f2 | 1876 | ceph_assert(out && (out->length() == 0)); |
7c673cae FG |
1877 | utime_t start = ceph_clock_now(); |
1878 | int r = 0; | |
f67539c2 | 1879 | rocksdb::PinnableSlice value; |
7c673cae | 1880 | rocksdb::Status s; |
f67539c2 | 1881 | auto cf = get_cf_handle(prefix, key); |
11fdf7f2 TL |
1882 | if (cf) { |
1883 | s = db->Get(rocksdb::ReadOptions(), | |
1884 | cf, | |
1885 | rocksdb::Slice(key), | |
1886 | &value); | |
1887 | } else { | |
1888 | string k = combine_strings(prefix, key); | |
1889 | s = db->Get(rocksdb::ReadOptions(), | |
1890 | default_cf, | |
1891 | rocksdb::Slice(k), | |
1892 | &value); | |
1893 | } | |
7c673cae | 1894 | if (s.ok()) { |
f67539c2 | 1895 | out->append(value.data(), value.size()); |
224ce89b | 1896 | } else if (s.IsNotFound()) { |
7c673cae | 1897 | r = -ENOENT; |
224ce89b | 1898 | } else { |
11fdf7f2 | 1899 | ceph_abort_msg(s.getState()); |
7c673cae FG |
1900 | } |
1901 | utime_t lat = ceph_clock_now() - start; | |
7c673cae FG |
1902 | logger->tinc(l_rocksdb_get_latency, lat); |
1903 | return r; | |
1904 | } | |
1905 | ||
1906 | int RocksDBStore::get( | |
1907 | const string& prefix, | |
1908 | const char *key, | |
1909 | size_t keylen, | |
1910 | bufferlist *out) | |
1911 | { | |
11fdf7f2 | 1912 | ceph_assert(out && (out->length() == 0)); |
7c673cae FG |
1913 | utime_t start = ceph_clock_now(); |
1914 | int r = 0; | |
f67539c2 | 1915 | rocksdb::PinnableSlice value; |
7c673cae | 1916 | rocksdb::Status s; |
f67539c2 | 1917 | auto cf = get_cf_handle(prefix, key, keylen); |
11fdf7f2 TL |
1918 | if (cf) { |
1919 | s = db->Get(rocksdb::ReadOptions(), | |
1920 | cf, | |
1921 | rocksdb::Slice(key, keylen), | |
1922 | &value); | |
1923 | } else { | |
1924 | string k; | |
1925 | combine_strings(prefix, key, keylen, &k); | |
1926 | s = db->Get(rocksdb::ReadOptions(), | |
1927 | default_cf, | |
1928 | rocksdb::Slice(k), | |
1929 | &value); | |
1930 | } | |
7c673cae | 1931 | if (s.ok()) { |
f67539c2 | 1932 | out->append(value.data(), value.size()); |
224ce89b | 1933 | } else if (s.IsNotFound()) { |
7c673cae | 1934 | r = -ENOENT; |
224ce89b | 1935 | } else { |
11fdf7f2 | 1936 | ceph_abort_msg(s.getState()); |
7c673cae FG |
1937 | } |
1938 | utime_t lat = ceph_clock_now() - start; | |
7c673cae FG |
1939 | logger->tinc(l_rocksdb_get_latency, lat); |
1940 | return r; | |
1941 | } | |
1942 | ||
1943 | int RocksDBStore::split_key(rocksdb::Slice in, string *prefix, string *key) | |
1944 | { | |
1945 | size_t prefix_len = 0; | |
f67539c2 | 1946 | |
7c673cae FG |
1947 | // Find separator inside Slice |
1948 | char* separator = (char*) memchr(in.data(), 0, in.size()); | |
1949 | if (separator == NULL) | |
1950 | return -EINVAL; | |
1951 | prefix_len = size_t(separator - in.data()); | |
1952 | if (prefix_len >= in.size()) | |
1953 | return -EINVAL; | |
1954 | ||
1955 | // Fetch prefix and/or key directly from Slice | |
1956 | if (prefix) | |
1957 | *prefix = string(in.data(), prefix_len); | |
1958 | if (key) | |
1959 | *key = string(separator+1, in.size()-prefix_len-1); | |
1960 | return 0; | |
1961 | } | |
1962 | ||
1963 | void RocksDBStore::compact() | |
1964 | { | |
1965 | logger->inc(l_rocksdb_compact); | |
1966 | rocksdb::CompactRangeOptions options; | |
11fdf7f2 TL |
1967 | db->CompactRange(options, default_cf, nullptr, nullptr); |
1968 | for (auto cf : cf_handles) { | |
f67539c2 TL |
1969 | for (auto shard_cf : cf.second.handles) { |
1970 | db->CompactRange( | |
1971 | options, | |
1972 | shard_cf, | |
1973 | nullptr, nullptr); | |
1974 | } | |
11fdf7f2 | 1975 | } |
7c673cae FG |
1976 | } |
1977 | ||
7c673cae FG |
1978 | void RocksDBStore::compact_thread_entry() |
1979 | { | |
9f95a23c | 1980 | std::unique_lock l{compact_queue_lock}; |
92f5a8d4 | 1981 | dout(10) << __func__ << " enter" << dendl; |
7c673cae | 1982 | while (!compact_queue_stop) { |
92f5a8d4 | 1983 | if (!compact_queue.empty()) { |
f67539c2 | 1984 | auto range = compact_queue.front(); |
7c673cae FG |
1985 | compact_queue.pop_front(); |
1986 | logger->set(l_rocksdb_compact_queue_len, compact_queue.size()); | |
9f95a23c | 1987 | l.unlock(); |
7c673cae | 1988 | logger->inc(l_rocksdb_compact_range); |
11fdf7f2 TL |
1989 | if (range.first.empty() && range.second.empty()) { |
1990 | compact(); | |
1991 | } else { | |
1992 | compact_range(range.first, range.second); | |
1993 | } | |
9f95a23c | 1994 | l.lock(); |
7c673cae FG |
1995 | continue; |
1996 | } | |
92f5a8d4 | 1997 | dout(10) << __func__ << " waiting" << dendl; |
9f95a23c | 1998 | compact_queue_cond.wait(l); |
7c673cae | 1999 | } |
9f95a23c | 2000 | dout(10) << __func__ << " exit" << dendl; |
7c673cae FG |
2001 | } |
2002 | ||
2003 | void RocksDBStore::compact_range_async(const string& start, const string& end) | |
2004 | { | |
11fdf7f2 | 2005 | std::lock_guard l(compact_queue_lock); |
7c673cae FG |
2006 | |
2007 | // try to merge adjacent ranges. this is O(n), but the queue should | |
2008 | // be short. note that we do not cover all overlap cases and merge | |
2009 | // opportunities here, but we capture the ones we currently need. | |
2010 | list< pair<string,string> >::iterator p = compact_queue.begin(); | |
2011 | while (p != compact_queue.end()) { | |
2012 | if (p->first == start && p->second == end) { | |
2013 | // dup; no-op | |
2014 | return; | |
2015 | } | |
9f95a23c TL |
2016 | if (start <= p->first && p->first <= end) { |
2017 | // new region crosses start of existing range | |
2018 | // select right bound that is bigger | |
2019 | compact_queue.push_back(make_pair(start, end > p->second ? end : p->second)); | |
7c673cae FG |
2020 | compact_queue.erase(p); |
2021 | logger->inc(l_rocksdb_compact_queue_merge); | |
2022 | break; | |
2023 | } | |
9f95a23c TL |
2024 | if (start <= p->second && p->second <= end) { |
2025 | // new region crosses end of existing range | |
2026 | //p->first < p->second and p->second <= end, so p->first <= end. | |
2027 | //But we break if previous condition, so start > p->first. | |
7c673cae FG |
2028 | compact_queue.push_back(make_pair(p->first, end)); |
2029 | compact_queue.erase(p); | |
2030 | logger->inc(l_rocksdb_compact_queue_merge); | |
2031 | break; | |
2032 | } | |
2033 | ++p; | |
2034 | } | |
2035 | if (p == compact_queue.end()) { | |
2036 | // no merge, new entry. | |
2037 | compact_queue.push_back(make_pair(start, end)); | |
2038 | logger->set(l_rocksdb_compact_queue_len, compact_queue.size()); | |
2039 | } | |
9f95a23c | 2040 | compact_queue_cond.notify_all(); |
7c673cae FG |
2041 | if (!compact_thread.is_started()) { |
2042 | compact_thread.create("rstore_compact"); | |
2043 | } | |
2044 | } | |
2045 | bool RocksDBStore::check_omap_dir(string &omap_dir) | |
2046 | { | |
2047 | rocksdb::Options options; | |
2048 | options.create_if_missing = true; | |
2049 | rocksdb::DB *db; | |
2050 | rocksdb::Status status = rocksdb::DB::Open(options, omap_dir, &db); | |
2051 | delete db; | |
2052 | db = nullptr; | |
2053 | return status.ok(); | |
2054 | } | |
f67539c2 | 2055 | |
7c673cae FG |
2056 | void RocksDBStore::compact_range(const string& start, const string& end) |
2057 | { | |
2058 | rocksdb::CompactRangeOptions options; | |
2059 | rocksdb::Slice cstart(start); | |
2060 | rocksdb::Slice cend(end); | |
f67539c2 TL |
2061 | string prefix_start, key_start; |
2062 | string prefix_end, key_end; | |
2063 | string key_highest = "\xff\xff\xff\xff"; //cheating | |
2064 | string key_lowest = ""; | |
2065 | ||
2066 | auto compact_range = [&] (const decltype(cf_handles)::iterator column_it, | |
2067 | const std::string& start, | |
2068 | const std::string& end) { | |
2069 | rocksdb::Slice cstart(start); | |
2070 | rocksdb::Slice cend(end); | |
2071 | for (const auto& shard_it : column_it->second.handles) { | |
2072 | db->CompactRange(options, shard_it, &cstart, &cend); | |
2073 | } | |
2074 | }; | |
2075 | db->CompactRange(options, default_cf, &cstart, &cend); | |
2076 | split_key(cstart, &prefix_start, &key_start); | |
2077 | split_key(cend, &prefix_end, &key_end); | |
2078 | if (prefix_start == prefix_end) { | |
2079 | const auto& column = cf_handles.find(prefix_start); | |
2080 | if (column != cf_handles.end()) { | |
2081 | compact_range(column, key_start, key_end); | |
2082 | } | |
2083 | } else { | |
2084 | auto column = cf_handles.find(prefix_start); | |
2085 | if (column != cf_handles.end()) { | |
2086 | compact_range(column, key_start, key_highest); | |
2087 | ++column; | |
2088 | } | |
2089 | const auto& column_end = cf_handles.find(prefix_end); | |
2090 | while (column != column_end) { | |
2091 | compact_range(column, key_lowest, key_highest); | |
2092 | column++; | |
2093 | } | |
2094 | if (column != cf_handles.end()) { | |
2095 | compact_range(column, key_lowest, key_end); | |
2096 | } | |
2097 | } | |
7c673cae | 2098 | } |
91327a77 | 2099 | |
7c673cae FG |
2100 | RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl() |
2101 | { | |
2102 | delete dbiter; | |
2103 | } | |
2104 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first() | |
2105 | { | |
2106 | dbiter->SeekToFirst(); | |
11fdf7f2 | 2107 | ceph_assert(!dbiter->status().IsIOError()); |
7c673cae FG |
2108 | return dbiter->status().ok() ? 0 : -1; |
2109 | } | |
2110 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string &prefix) | |
2111 | { | |
2112 | rocksdb::Slice slice_prefix(prefix); | |
2113 | dbiter->Seek(slice_prefix); | |
11fdf7f2 | 2114 | ceph_assert(!dbiter->status().IsIOError()); |
7c673cae FG |
2115 | return dbiter->status().ok() ? 0 : -1; |
2116 | } | |
2117 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last() | |
2118 | { | |
2119 | dbiter->SeekToLast(); | |
11fdf7f2 | 2120 | ceph_assert(!dbiter->status().IsIOError()); |
7c673cae FG |
2121 | return dbiter->status().ok() ? 0 : -1; |
2122 | } | |
2123 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string &prefix) | |
2124 | { | |
2125 | string limit = past_prefix(prefix); | |
2126 | rocksdb::Slice slice_limit(limit); | |
2127 | dbiter->Seek(slice_limit); | |
2128 | ||
2129 | if (!dbiter->Valid()) { | |
2130 | dbiter->SeekToLast(); | |
2131 | } else { | |
2132 | dbiter->Prev(); | |
2133 | } | |
2134 | return dbiter->status().ok() ? 0 : -1; | |
2135 | } | |
2136 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after) | |
2137 | { | |
2138 | lower_bound(prefix, after); | |
2139 | if (valid()) { | |
2140 | pair<string,string> key = raw_key(); | |
2141 | if (key.first == prefix && key.second == after) | |
2142 | next(); | |
2143 | } | |
2144 | return dbiter->status().ok() ? 0 : -1; | |
2145 | } | |
2146 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to) | |
2147 | { | |
2148 | string bound = combine_strings(prefix, to); | |
2149 | rocksdb::Slice slice_bound(bound); | |
2150 | dbiter->Seek(slice_bound); | |
2151 | return dbiter->status().ok() ? 0 : -1; | |
2152 | } | |
2153 | bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid() | |
2154 | { | |
2155 | return dbiter->Valid(); | |
2156 | } | |
2157 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next() | |
2158 | { | |
2159 | if (valid()) { | |
2160 | dbiter->Next(); | |
2161 | } | |
11fdf7f2 | 2162 | ceph_assert(!dbiter->status().IsIOError()); |
7c673cae FG |
2163 | return dbiter->status().ok() ? 0 : -1; |
2164 | } | |
2165 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev() | |
2166 | { | |
2167 | if (valid()) { | |
2168 | dbiter->Prev(); | |
2169 | } | |
11fdf7f2 | 2170 | ceph_assert(!dbiter->status().IsIOError()); |
7c673cae FG |
2171 | return dbiter->status().ok() ? 0 : -1; |
2172 | } | |
2173 | string RocksDBStore::RocksDBWholeSpaceIteratorImpl::key() | |
2174 | { | |
2175 | string out_key; | |
2176 | split_key(dbiter->key(), 0, &out_key); | |
2177 | return out_key; | |
2178 | } | |
2179 | pair<string,string> RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key() | |
2180 | { | |
2181 | string prefix, key; | |
2182 | split_key(dbiter->key(), &prefix, &key); | |
2183 | return make_pair(prefix, key); | |
2184 | } | |
2185 | ||
2186 | bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key_is_prefixed(const string &prefix) { | |
2187 | // Look for "prefix\0" right in rocksb::Slice | |
2188 | rocksdb::Slice key = dbiter->key(); | |
2189 | if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) { | |
2190 | return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0; | |
2191 | } else { | |
2192 | return false; | |
2193 | } | |
2194 | } | |
2195 | ||
2196 | bufferlist RocksDBStore::RocksDBWholeSpaceIteratorImpl::value() | |
2197 | { | |
2198 | return to_bufferlist(dbiter->value()); | |
2199 | } | |
2200 | ||
2201 | size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::key_size() | |
2202 | { | |
2203 | return dbiter->key().size(); | |
2204 | } | |
2205 | ||
2206 | size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_size() | |
2207 | { | |
2208 | return dbiter->value().size(); | |
2209 | } | |
2210 | ||
2211 | bufferptr RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_as_ptr() | |
2212 | { | |
2213 | rocksdb::Slice val = dbiter->value(); | |
2214 | return bufferptr(val.data(), val.size()); | |
2215 | } | |
2216 | ||
2217 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status() | |
2218 | { | |
2219 | return dbiter->status().ok() ? 0 : -1; | |
2220 | } | |
2221 | ||
2222 | string RocksDBStore::past_prefix(const string &prefix) | |
2223 | { | |
2224 | string limit = prefix; | |
2225 | limit.push_back(1); | |
2226 | return limit; | |
2227 | } | |
2228 | ||
11fdf7f2 TL |
2229 | class CFIteratorImpl : public KeyValueDB::IteratorImpl { |
2230 | protected: | |
2231 | string prefix; | |
2232 | rocksdb::Iterator *dbiter; | |
33c7a0ef TL |
2233 | const KeyValueDB::IteratorBounds bounds; |
2234 | const rocksdb::Slice iterate_lower_bound; | |
2235 | const rocksdb::Slice iterate_upper_bound; | |
11fdf7f2 | 2236 | public: |
33c7a0ef TL |
2237 | explicit CFIteratorImpl(const RocksDBStore* db, |
2238 | const std::string& p, | |
2239 | rocksdb::ColumnFamilyHandle* cf, | |
2240 | KeyValueDB::IteratorBounds bounds_) | |
2241 | : prefix(p), bounds(std::move(bounds_)), | |
2242 | iterate_lower_bound(make_slice(bounds.lower_bound)), | |
2243 | iterate_upper_bound(make_slice(bounds.upper_bound)) | |
2244 | { | |
2245 | auto options = rocksdb::ReadOptions(); | |
2246 | if (db->cct->_conf->osd_rocksdb_iterator_bounds_enabled) { | |
2247 | if (bounds.lower_bound) { | |
2248 | options.iterate_lower_bound = &iterate_lower_bound; | |
2249 | } | |
2250 | if (bounds.upper_bound) { | |
2251 | options.iterate_upper_bound = &iterate_upper_bound; | |
2252 | } | |
2253 | } | |
2254 | dbiter = db->db->NewIterator(options, cf); | |
2255 | } | |
11fdf7f2 TL |
2256 | ~CFIteratorImpl() { |
2257 | delete dbiter; | |
2258 | } | |
2259 | ||
2260 | int seek_to_first() override { | |
2261 | dbiter->SeekToFirst(); | |
2262 | return dbiter->status().ok() ? 0 : -1; | |
2263 | } | |
2264 | int seek_to_last() override { | |
2265 | dbiter->SeekToLast(); | |
2266 | return dbiter->status().ok() ? 0 : -1; | |
2267 | } | |
2268 | int upper_bound(const string &after) override { | |
2269 | lower_bound(after); | |
2270 | if (valid() && (key() == after)) { | |
2271 | next(); | |
2272 | } | |
2273 | return dbiter->status().ok() ? 0 : -1; | |
2274 | } | |
2275 | int lower_bound(const string &to) override { | |
2276 | rocksdb::Slice slice_bound(to); | |
2277 | dbiter->Seek(slice_bound); | |
2278 | return dbiter->status().ok() ? 0 : -1; | |
2279 | } | |
2280 | int next() override { | |
2281 | if (valid()) { | |
2282 | dbiter->Next(); | |
2283 | } | |
2284 | return dbiter->status().ok() ? 0 : -1; | |
2285 | } | |
2286 | int prev() override { | |
2287 | if (valid()) { | |
2288 | dbiter->Prev(); | |
2289 | } | |
2290 | return dbiter->status().ok() ? 0 : -1; | |
2291 | } | |
2292 | bool valid() override { | |
2293 | return dbiter->Valid(); | |
2294 | } | |
2295 | string key() override { | |
2296 | return dbiter->key().ToString(); | |
2297 | } | |
2298 | std::pair<std::string, std::string> raw_key() override { | |
2299 | return make_pair(prefix, key()); | |
2300 | } | |
2301 | bufferlist value() override { | |
2302 | return to_bufferlist(dbiter->value()); | |
2303 | } | |
2304 | bufferptr value_as_ptr() override { | |
2305 | rocksdb::Slice val = dbiter->value(); | |
2306 | return bufferptr(val.data(), val.size()); | |
2307 | } | |
2308 | int status() override { | |
2309 | return dbiter->status().ok() ? 0 : -1; | |
2310 | } | |
2311 | }; | |
2312 | ||
f67539c2 TL |
2313 | |
2314 | //merge column iterators and rest iterator | |
2315 | class WholeMergeIteratorImpl : public KeyValueDB::WholeSpaceIteratorImpl { | |
2316 | private: | |
2317 | RocksDBStore* db; | |
2318 | KeyValueDB::WholeSpaceIterator main; | |
2319 | std::map<std::string, KeyValueDB::Iterator> shards; | |
2320 | std::map<std::string, KeyValueDB::Iterator>::iterator current_shard; | |
2321 | enum {on_main, on_shard} smaller; | |
2322 | ||
2323 | public: | |
2324 | WholeMergeIteratorImpl(RocksDBStore* db) | |
2325 | : db(db) | |
2326 | , main(db->get_default_cf_iterator()) | |
2327 | { | |
2328 | for (auto& e : db->cf_handles) { | |
2329 | shards.emplace(e.first, db->get_iterator(e.first)); | |
2330 | } | |
2331 | } | |
2332 | ||
2333 | // returns true if value in main is smaller then in shards | |
2334 | // invalid is larger then actual value | |
2335 | bool is_main_smaller() { | |
2336 | if (main->valid()) { | |
2337 | if (current_shard != shards.end()) { | |
2338 | auto main_rk = main->raw_key(); | |
2339 | ceph_assert(current_shard->second->valid()); | |
2340 | auto shards_rk = current_shard->second->raw_key(); | |
2341 | if (main_rk.first < shards_rk.first) | |
2342 | return true; | |
2343 | if (main_rk.first > shards_rk.first) | |
2344 | return false; | |
2345 | return main_rk.second < shards_rk.second; | |
2346 | } else { | |
2347 | return true; | |
2348 | } | |
2349 | } else { | |
2350 | if (current_shard != shards.end()) { | |
2351 | return false; | |
2352 | } else { | |
2353 | //this means that neither is valid | |
2354 | //we select main to be smaller, so valid() will signal properly | |
2355 | return true; | |
2356 | } | |
2357 | } | |
2358 | } | |
2359 | ||
2360 | int seek_to_first() override { | |
2361 | int r0 = main->seek_to_first(); | |
2362 | int r1 = 0; | |
2363 | // find first shard that has some data | |
2364 | current_shard = shards.begin(); | |
2365 | while (current_shard != shards.end()) { | |
2366 | r1 = current_shard->second->seek_to_first(); | |
2367 | if (r1 != 0 || current_shard->second->valid()) { | |
2368 | //this is the first shard that will yield some keys | |
2369 | break; | |
2370 | } | |
2371 | ++current_shard; | |
2372 | } | |
2373 | smaller = is_main_smaller() ? on_main : on_shard; | |
2374 | return r0 == 0 && r1 == 0 ? 0 : -1; | |
2375 | } | |
2376 | ||
2377 | int seek_to_first(const std::string &prefix) override { | |
2378 | int r0 = main->seek_to_first(prefix); | |
2379 | int r1 = 0; | |
2380 | // find first shard that has some data | |
2381 | current_shard = shards.lower_bound(prefix); | |
2382 | while (current_shard != shards.end()) { | |
2383 | r1 = current_shard->second->seek_to_first(); | |
2384 | if (r1 != 0 || current_shard->second->valid()) { | |
2385 | //this is the first shard that will yield some keys | |
2386 | break; | |
2387 | } | |
2388 | ++current_shard; | |
2389 | } | |
2390 | smaller = is_main_smaller() ? on_main : on_shard; | |
2391 | return r0 == 0 && r1 == 0 ? 0 : -1; | |
2392 | }; | |
2393 | ||
2394 | int seek_to_last() override { | |
2395 | int r0 = main->seek_to_last(); | |
2396 | int r1 = 0; | |
2397 | r1 = shards_seek_to_last(); | |
2398 | //if we have 2 candidates, we need to select | |
2399 | if (main->valid()) { | |
2400 | if (shards_valid()) { | |
2401 | if (is_main_smaller()) { | |
2402 | smaller = on_shard; | |
2403 | main->next(); | |
2404 | } else { | |
2405 | smaller = on_main; | |
2406 | shards_next(); | |
2407 | } | |
2408 | } else { | |
2409 | smaller = on_main; | |
2410 | } | |
2411 | } else { | |
2412 | if (shards_valid()) { | |
2413 | smaller = on_shard; | |
2414 | } else { | |
2415 | smaller = on_main; | |
2416 | } | |
2417 | } | |
2418 | return r0 == 0 && r1 == 0 ? 0 : -1; | |
2419 | } | |
2420 | ||
2421 | int seek_to_last(const std::string &prefix) override { | |
2422 | int r0 = main->seek_to_last(prefix); | |
2423 | int r1 = 0; | |
2424 | // find last shard that has some data | |
2425 | bool found = false; | |
2426 | current_shard = shards.lower_bound(prefix); | |
2427 | while (current_shard != shards.begin()) { | |
2428 | r1 = current_shard->second->seek_to_last(); | |
2429 | if (r1 != 0) | |
2430 | break; | |
2431 | if (current_shard->second->valid()) { | |
2432 | found = true; | |
2433 | break; | |
2434 | } | |
2435 | } | |
2436 | //if we have 2 candidates, we need to select | |
2437 | if (main->valid() && found) { | |
2438 | if (is_main_smaller()) { | |
2439 | main->next(); | |
2440 | } else { | |
2441 | shards_next(); | |
2442 | } | |
2443 | } | |
2444 | if (!found) { | |
2445 | //set shards state that properly represents eof | |
2446 | current_shard = shards.end(); | |
2447 | } | |
2448 | smaller = is_main_smaller() ? on_main : on_shard; | |
2449 | return r0 == 0 && r1 == 0 ? 0 : -1; | |
2450 | } | |
2451 | ||
2452 | int upper_bound(const std::string &prefix, const std::string &after) override { | |
2453 | int r0 = main->upper_bound(prefix, after); | |
2454 | int r1 = 0; | |
2455 | if (r0 != 0) | |
2456 | return r0; | |
2457 | current_shard = shards.lower_bound(prefix); | |
2458 | if (current_shard != shards.end()) { | |
2459 | bool located = false; | |
2460 | if (current_shard->first == prefix) { | |
2461 | r1 = current_shard->second->upper_bound(after); | |
2462 | if (r1 != 0) | |
2463 | return r1; | |
2464 | if (current_shard->second->valid()) { | |
2465 | located = true; | |
2466 | } | |
2467 | } | |
2468 | if (!located) { | |
2469 | while (current_shard != shards.end()) { | |
2470 | r1 = current_shard->second->seek_to_first(); | |
2471 | if (r1 != 0) | |
2472 | return r1; | |
2473 | if (current_shard->second->valid()) | |
2474 | break; | |
2475 | ++current_shard; | |
2476 | } | |
2477 | } | |
2478 | } | |
2479 | smaller = is_main_smaller() ? on_main : on_shard; | |
2480 | return 0; | |
2481 | } | |
2482 | ||
2483 | int lower_bound(const std::string &prefix, const std::string &to) override { | |
2484 | int r0 = main->lower_bound(prefix, to); | |
2485 | int r1 = 0; | |
2486 | if (r0 != 0) | |
2487 | return r0; | |
2488 | current_shard = shards.lower_bound(prefix); | |
2489 | if (current_shard != shards.end()) { | |
2490 | bool located = false; | |
2491 | if (current_shard->first == prefix) { | |
2492 | r1 = current_shard->second->lower_bound(to); | |
2493 | if (r1 != 0) | |
2494 | return r1; | |
2495 | if (current_shard->second->valid()) { | |
2496 | located = true; | |
2497 | } | |
2498 | } | |
2499 | if (!located) { | |
2500 | while (current_shard != shards.end()) { | |
2501 | r1 = current_shard->second->seek_to_first(); | |
2502 | if (r1 != 0) | |
2503 | return r1; | |
2504 | if (current_shard->second->valid()) | |
2505 | break; | |
2506 | ++current_shard; | |
2507 | } | |
2508 | } | |
2509 | } | |
2510 | smaller = is_main_smaller() ? on_main : on_shard; | |
2511 | return 0; | |
2512 | } | |
2513 | ||
2514 | bool valid() override { | |
2515 | if (smaller == on_main) { | |
2516 | return main->valid(); | |
2517 | } else { | |
2518 | if (current_shard == shards.end()) | |
2519 | return false; | |
2520 | return current_shard->second->valid(); | |
2521 | } | |
2522 | }; | |
2523 | ||
2524 | int next() override { | |
2525 | int r; | |
2526 | if (smaller == on_main) { | |
2527 | r = main->next(); | |
2528 | } else { | |
2529 | r = shards_next(); | |
2530 | } | |
2531 | if (r != 0) | |
2532 | return r; | |
2533 | smaller = is_main_smaller() ? on_main : on_shard; | |
2534 | return 0; | |
2535 | } | |
2536 | ||
2537 | int prev() override { | |
2538 | int r; | |
2539 | bool main_was_valid = false; | |
2540 | if (main->valid()) { | |
2541 | main_was_valid = true; | |
2542 | r = main->prev(); | |
2543 | } else { | |
2544 | r = main->seek_to_last(); | |
2545 | } | |
2546 | if (r != 0) | |
2547 | return r; | |
2548 | ||
2549 | bool shards_was_valid = false; | |
2550 | if (shards_valid()) { | |
2551 | shards_was_valid = true; | |
2552 | r = shards_prev(); | |
2553 | } else { | |
2554 | r = shards_seek_to_last(); | |
2555 | } | |
2556 | if (r != 0) | |
2557 | return r; | |
2558 | ||
2559 | if (!main->valid() && !shards_valid()) { | |
2560 | //end, no previous. set marker so valid() can work | |
2561 | smaller = on_main; | |
2562 | return 0; | |
2563 | } | |
2564 | ||
2565 | //if 1 is valid, select it | |
2566 | //if 2 are valid select larger and advance the other | |
2567 | if (main->valid()) { | |
2568 | if (shards_valid()) { | |
2569 | if (is_main_smaller()) { | |
2570 | smaller = on_shard; | |
2571 | if (main_was_valid) { | |
2572 | if (main->valid()) { | |
2573 | r = main->next(); | |
2574 | } else { | |
2575 | r = main->seek_to_first(); | |
2576 | } | |
2577 | } else { | |
2578 | //if we have resurrected main, kill it | |
2579 | if (main->valid()) { | |
2580 | main->next(); | |
2581 | } | |
2582 | } | |
2583 | } else { | |
2584 | smaller = on_main; | |
2585 | if (shards_was_valid) { | |
2586 | if (shards_valid()) { | |
2587 | r = shards_next(); | |
2588 | } else { | |
2589 | r = shards_seek_to_first(); | |
2590 | } | |
2591 | } else { | |
2592 | //if we have resurected shards, kill it | |
2593 | if (shards_valid()) { | |
2594 | shards_next(); | |
2595 | } | |
2596 | } | |
2597 | } | |
2598 | } else { | |
2599 | smaller = on_main; | |
2600 | r = shards_seek_to_first(); | |
2601 | } | |
2602 | } else { | |
2603 | smaller = on_shard; | |
2604 | r = main->seek_to_first(); | |
2605 | } | |
2606 | return r; | |
2607 | } | |
2608 | ||
2609 | std::string key() override | |
2610 | { | |
2611 | if (smaller == on_main) { | |
2612 | return main->key(); | |
2613 | } else { | |
2614 | return current_shard->second->key(); | |
2615 | } | |
2616 | } | |
2617 | ||
2618 | std::pair<std::string,std::string> raw_key() override | |
2619 | { | |
2620 | if (smaller == on_main) { | |
2621 | return main->raw_key(); | |
2622 | } else { | |
2623 | return { current_shard->first, current_shard->second->key() }; | |
2624 | } | |
2625 | } | |
2626 | ||
2627 | bool raw_key_is_prefixed(const std::string &prefix) override | |
2628 | { | |
2629 | if (smaller == on_main) { | |
2630 | return main->raw_key_is_prefixed(prefix); | |
2631 | } else { | |
2632 | return current_shard->first == prefix; | |
2633 | } | |
2634 | } | |
2635 | ||
2636 | ceph::buffer::list value() override | |
2637 | { | |
2638 | if (smaller == on_main) { | |
2639 | return main->value(); | |
2640 | } else { | |
2641 | return current_shard->second->value(); | |
2642 | } | |
2643 | } | |
2644 | ||
2645 | int status() override | |
2646 | { | |
2647 | //because we already had to inspect key, it must be ok | |
2648 | return 0; | |
2649 | } | |
2650 | ||
2651 | size_t key_size() override | |
2652 | { | |
2653 | if (smaller == on_main) { | |
2654 | return main->key_size(); | |
2655 | } else { | |
2656 | return current_shard->second->key().size(); | |
2657 | } | |
2658 | } | |
2659 | size_t value_size() override | |
2660 | { | |
2661 | if (smaller == on_main) { | |
2662 | return main->value_size(); | |
2663 | } else { | |
2664 | return current_shard->second->value().length(); | |
2665 | } | |
2666 | } | |
2667 | ||
2668 | int shards_valid() { | |
2669 | if (current_shard == shards.end()) | |
2670 | return false; | |
2671 | return current_shard->second->valid(); | |
2672 | } | |
2673 | ||
2674 | int shards_next() { | |
2675 | if (current_shard == shards.end()) { | |
2676 | //illegal to next() on !valid() | |
2677 | return -1; | |
2678 | } | |
2679 | int r = 0; | |
2680 | r = current_shard->second->next(); | |
2681 | if (r != 0) | |
2682 | return r; | |
2683 | if (current_shard->second->valid()) | |
2684 | return 0; | |
2685 | //current shard exhaused, search for key | |
2686 | ++current_shard; | |
2687 | while (current_shard != shards.end()) { | |
2688 | r = current_shard->second->seek_to_first(); | |
2689 | if (r != 0) | |
2690 | return r; | |
2691 | if (current_shard->second->valid()) | |
2692 | break; | |
2693 | ++current_shard; | |
2694 | } | |
2695 | //either we found key or not, but it is success | |
2696 | return 0; | |
2697 | } | |
2698 | ||
2699 | int shards_prev() { | |
2700 | if (current_shard == shards.end()) { | |
2701 | //illegal to prev() on !valid() | |
2702 | return -1; | |
2703 | } | |
2704 | int r = current_shard->second->prev(); | |
2705 | while (r == 0) { | |
2706 | if (current_shard->second->valid()) { | |
2707 | break; | |
2708 | } | |
2709 | if (current_shard == shards.begin()) { | |
2710 | //we have reached pre-first element | |
2711 | //this makes it !valid(), but guarantees next() moves to first element | |
2712 | break; | |
2713 | } | |
2714 | --current_shard; | |
2715 | r = current_shard->second->seek_to_last(); | |
2716 | } | |
2717 | return r; | |
2718 | } | |
2719 | ||
2720 | int shards_seek_to_last() { | |
2721 | int r = 0; | |
2722 | current_shard = shards.end(); | |
2723 | if (current_shard == shards.begin()) { | |
2724 | //no shards at all | |
2725 | return 0; | |
2726 | } | |
2727 | while (current_shard != shards.begin()) { | |
2728 | --current_shard; | |
2729 | r = current_shard->second->seek_to_last(); | |
2730 | if (r != 0) | |
2731 | return r; | |
2732 | if (current_shard->second->valid()) { | |
2733 | return 0; | |
2734 | } | |
2735 | } | |
2736 | //no keys at all | |
2737 | current_shard = shards.end(); | |
2738 | return r; | |
2739 | } | |
2740 | ||
2741 | int shards_seek_to_first() { | |
2742 | int r = 0; | |
2743 | current_shard = shards.begin(); | |
2744 | while (current_shard != shards.end()) { | |
2745 | r = current_shard->second->seek_to_first(); | |
2746 | if (r != 0) | |
2747 | break; | |
2748 | if (current_shard->second->valid()) { | |
2749 | //this is the first shard that will yield some keys | |
2750 | break; | |
2751 | } | |
2752 | ++current_shard; | |
2753 | } | |
2754 | return r; | |
2755 | } | |
2756 | }; | |
2757 | ||
2758 | class ShardMergeIteratorImpl : public KeyValueDB::IteratorImpl { | |
2759 | private: | |
2760 | struct KeyLess { | |
2761 | private: | |
2762 | const rocksdb::Comparator* comparator; | |
2763 | public: | |
2764 | KeyLess(const rocksdb::Comparator* comparator) : comparator(comparator) { }; | |
2765 | ||
2766 | bool operator()(rocksdb::Iterator* a, rocksdb::Iterator* b) const | |
2767 | { | |
2768 | if (a->Valid()) { | |
2769 | if (b->Valid()) { | |
2770 | return comparator->Compare(a->key(), b->key()) < 0; | |
2771 | } else { | |
2772 | return true; | |
2773 | } | |
2774 | } else { | |
2775 | if (b->Valid()) { | |
2776 | return false; | |
2777 | } else { | |
2778 | return false; | |
2779 | } | |
2780 | } | |
2781 | } | |
2782 | }; | |
2783 | ||
2784 | const RocksDBStore* db; | |
2785 | KeyLess keyless; | |
2786 | string prefix; | |
33c7a0ef TL |
2787 | const KeyValueDB::IteratorBounds bounds; |
2788 | const rocksdb::Slice iterate_lower_bound; | |
2789 | const rocksdb::Slice iterate_upper_bound; | |
f67539c2 TL |
2790 | std::vector<rocksdb::Iterator*> iters; |
2791 | public: | |
2792 | explicit ShardMergeIteratorImpl(const RocksDBStore* db, | |
2793 | const std::string& prefix, | |
33c7a0ef TL |
2794 | const std::vector<rocksdb::ColumnFamilyHandle*>& shards, |
2795 | KeyValueDB::IteratorBounds bounds_) | |
2796 | : db(db), keyless(db->comparator), prefix(prefix), bounds(std::move(bounds_)), | |
2797 | iterate_lower_bound(make_slice(bounds.lower_bound)), | |
2798 | iterate_upper_bound(make_slice(bounds.upper_bound)) | |
f67539c2 TL |
2799 | { |
2800 | iters.reserve(shards.size()); | |
33c7a0ef TL |
2801 | auto options = rocksdb::ReadOptions(); |
2802 | if (db->cct->_conf->osd_rocksdb_iterator_bounds_enabled) { | |
2803 | if (bounds.lower_bound) { | |
2804 | options.iterate_lower_bound = &iterate_lower_bound; | |
2805 | } | |
2806 | if (bounds.upper_bound) { | |
2807 | options.iterate_upper_bound = &iterate_upper_bound; | |
2808 | } | |
2809 | } | |
f67539c2 | 2810 | for (auto& s : shards) { |
33c7a0ef | 2811 | iters.push_back(db->db->NewIterator(options, s)); |
f67539c2 TL |
2812 | } |
2813 | } | |
2814 | ~ShardMergeIteratorImpl() { | |
2815 | for (auto& it : iters) { | |
2816 | delete it; | |
2817 | } | |
2818 | } | |
2819 | int seek_to_first() override { | |
2820 | for (auto& it : iters) { | |
2821 | it->SeekToFirst(); | |
2822 | if (!it->status().ok()) { | |
2823 | return -1; | |
2824 | } | |
2825 | } | |
2826 | //all iterators seeked, sort | |
2827 | std::sort(iters.begin(), iters.end(), keyless); | |
2828 | return 0; | |
2829 | } | |
2830 | int seek_to_last() override { | |
2831 | for (auto& it : iters) { | |
2832 | it->SeekToLast(); | |
2833 | if (!it->status().ok()) { | |
2834 | return -1; | |
2835 | } | |
2836 | } | |
2837 | for (size_t i = 1; i < iters.size(); i++) { | |
2838 | if (iters[0]->Valid()) { | |
2839 | if (iters[i]->Valid()) { | |
2840 | if (keyless(iters[0], iters[i])) { | |
20effc67 | 2841 | std::swap(iters[0], iters[i]); |
f67539c2 TL |
2842 | } |
2843 | } else { | |
2844 | //iters[i] empty | |
2845 | } | |
2846 | } else { | |
2847 | if (iters[i]->Valid()) { | |
20effc67 | 2848 | std::swap(iters[0], iters[i]); |
f67539c2 TL |
2849 | } |
2850 | } | |
2851 | //it might happen that cf was empty | |
2852 | if (iters[i]->Valid()) { | |
2853 | iters[i]->Next(); | |
2854 | } | |
2855 | } | |
2856 | //no need to sort, as at most 1 iterator is valid now | |
2857 | return 0; | |
2858 | } | |
2859 | int upper_bound(const string &after) override { | |
2860 | rocksdb::Slice slice_bound(after); | |
2861 | for (auto& it : iters) { | |
2862 | it->Seek(slice_bound); | |
2863 | if (it->Valid() && it->key() == after) { | |
2864 | it->Next(); | |
2865 | } | |
2866 | if (!it->status().ok()) { | |
2867 | return -1; | |
2868 | } | |
2869 | } | |
2870 | std::sort(iters.begin(), iters.end(), keyless); | |
2871 | return 0; | |
2872 | } | |
2873 | int lower_bound(const string &to) override { | |
2874 | rocksdb::Slice slice_bound(to); | |
2875 | for (auto& it : iters) { | |
2876 | it->Seek(slice_bound); | |
2877 | if (!it->status().ok()) { | |
2878 | return -1; | |
2879 | } | |
2880 | } | |
2881 | std::sort(iters.begin(), iters.end(), keyless); | |
2882 | return 0; | |
2883 | } | |
2884 | int next() override { | |
2885 | int r = -1; | |
2886 | if (iters[0]->Valid()) { | |
2887 | iters[0]->Next(); | |
2888 | if (iters[0]->status().ok()) { | |
2889 | r = 0; | |
2890 | //bubble up | |
2891 | for (size_t i = 0; i < iters.size() - 1; i++) { | |
2892 | if (keyless(iters[i], iters[i + 1])) { | |
2893 | //matches, fixed | |
2894 | break; | |
2895 | } | |
2896 | std::swap(iters[i], iters[i + 1]); | |
2897 | } | |
2898 | } | |
2899 | } | |
2900 | return r; | |
2901 | } | |
2902 | // iters are sorted, so | |
2903 | // a[0] < b[0] < c[0] < d[0] | |
2904 | // a[0] > a[-1], a[0] > b[-1], a[0] > c[-1], a[0] > d[-1] | |
2905 | // so, prev() will be one of: | |
2906 | // a[-1], b[-1], c[-1], d[-1] | |
2907 | // prev() will be the one that is *largest* of them | |
2908 | // | |
2909 | // alg: | |
2910 | // 1. go prev() on each iterator we can | |
2911 | // 2. select largest key from those iterators | |
2912 | // 3. go next() on all iterators except (2) | |
2913 | // 4. sort | |
2914 | int prev() override { | |
2915 | std::vector<rocksdb::Iterator*> prev_done; | |
2916 | //1 | |
2917 | for (auto it: iters) { | |
2918 | if (it->Valid()) { | |
2919 | it->Prev(); | |
2920 | if (it->Valid()) { | |
2921 | prev_done.push_back(it); | |
2922 | } else { | |
2923 | it->SeekToFirst(); | |
2924 | } | |
2925 | } else { | |
2926 | it->SeekToLast(); | |
2927 | if (it->Valid()) { | |
2928 | prev_done.push_back(it); | |
2929 | } | |
2930 | } | |
2931 | } | |
2932 | if (prev_done.size() == 0) { | |
2933 | /* there is no previous element */ | |
2934 | if (iters[0]->Valid()) { | |
2935 | iters[0]->Prev(); | |
2936 | ceph_assert(!iters[0]->Valid()); | |
2937 | } | |
2938 | return 0; | |
2939 | } | |
2940 | //2,3 | |
2941 | rocksdb::Iterator* highest = prev_done[0]; | |
2942 | for (size_t i = 1; i < prev_done.size(); i++) { | |
2943 | if (keyless(highest, prev_done[i])) { | |
2944 | highest->Next(); | |
2945 | highest = prev_done[i]; | |
2946 | } else { | |
2947 | prev_done[i]->Next(); | |
2948 | } | |
2949 | } | |
2950 | //4 | |
2951 | //insert highest in the beginning, and shift values until we pick highest | |
2952 | //untouched rest is sorted - we just prev()/next() them | |
2953 | rocksdb::Iterator* hold = highest; | |
2954 | for (size_t i = 0; i < iters.size(); i++) { | |
2955 | std::swap(hold, iters[i]); | |
2956 | if (hold == highest) break; | |
2957 | } | |
2958 | ceph_assert(hold == highest); | |
2959 | return 0; | |
2960 | } | |
2961 | bool valid() override { | |
2962 | return iters[0]->Valid(); | |
2963 | } | |
2964 | string key() override { | |
2965 | return iters[0]->key().ToString(); | |
2966 | } | |
2967 | std::pair<std::string, std::string> raw_key() override { | |
2968 | return make_pair(prefix, key()); | |
2969 | } | |
2970 | bufferlist value() override { | |
2971 | return to_bufferlist(iters[0]->value()); | |
2972 | } | |
2973 | bufferptr value_as_ptr() override { | |
2974 | rocksdb::Slice val = iters[0]->value(); | |
2975 | return bufferptr(val.data(), val.size()); | |
2976 | } | |
2977 | int status() override { | |
2978 | return iters[0]->status().ok() ? 0 : -1; | |
2979 | } | |
2980 | }; | |
2981 | ||
33c7a0ef | 2982 | KeyValueDB::Iterator RocksDBStore::get_iterator(const std::string& prefix, IteratorOpts opts, IteratorBounds bounds) |
f67539c2 TL |
2983 | { |
2984 | auto cf_it = cf_handles.find(prefix); | |
2985 | if (cf_it != cf_handles.end()) { | |
33c7a0ef | 2986 | rocksdb::ColumnFamilyHandle* cf = nullptr; |
f67539c2 | 2987 | if (cf_it->second.handles.size() == 1) { |
33c7a0ef TL |
2988 | cf = cf_it->second.handles[0]; |
2989 | } else if (cct->_conf->osd_rocksdb_iterator_bounds_enabled) { | |
2990 | cf = get_cf_handle(prefix, bounds); | |
2991 | } | |
2992 | if (cf) { | |
f67539c2 | 2993 | return std::make_shared<CFIteratorImpl>( |
33c7a0ef TL |
2994 | this, |
2995 | prefix, | |
2996 | cf, | |
2997 | std::move(bounds)); | |
f67539c2 TL |
2998 | } else { |
2999 | return std::make_shared<ShardMergeIteratorImpl>( | |
3000 | this, | |
3001 | prefix, | |
33c7a0ef TL |
3002 | cf_it->second.handles, |
3003 | std::move(bounds)); | |
f67539c2 TL |
3004 | } |
3005 | } else { | |
3006 | return KeyValueDB::get_iterator(prefix, opts); | |
3007 | } | |
3008 | } | |
3009 | ||
3010 | rocksdb::Iterator* RocksDBStore::new_shard_iterator(rocksdb::ColumnFamilyHandle* cf) | |
3011 | { | |
3012 | return db->NewIterator(rocksdb::ReadOptions(), cf); | |
3013 | } | |
3014 | ||
3015 | RocksDBStore::WholeSpaceIterator RocksDBStore::get_wholespace_iterator(IteratorOpts opts) | |
11fdf7f2 | 3016 | { |
f67539c2 | 3017 | if (cf_handles.size() == 0) { |
f67539c2 | 3018 | return std::make_shared<RocksDBWholeSpaceIteratorImpl>( |
33c7a0ef | 3019 | this, default_cf, opts); |
11fdf7f2 | 3020 | } else { |
f67539c2 TL |
3021 | return std::make_shared<WholeMergeIteratorImpl>(this); |
3022 | } | |
3023 | } | |
3024 | ||
3025 | RocksDBStore::WholeSpaceIterator RocksDBStore::get_default_cf_iterator() | |
3026 | { | |
33c7a0ef | 3027 | return std::make_shared<RocksDBWholeSpaceIteratorImpl>(this, default_cf, 0); |
f67539c2 TL |
3028 | } |
3029 | ||
3030 | int RocksDBStore::prepare_for_reshard(const std::string& new_sharding, | |
3031 | RocksDBStore::columns_t& to_process_columns) | |
3032 | { | |
3033 | //0. lock db from opening | |
3034 | //1. list existing columns | |
3035 | //2. apply merge operator to (main + columns) opts | |
3036 | //3. prepare std::vector<rocksdb::ColumnFamilyDescriptor> existing_cfs | |
3037 | //4. open db, acquire existing column handles | |
3038 | //5. calculate missing columns | |
3039 | //6. create missing columns | |
3040 | //7. construct cf_handles according to new sharding | |
3041 | //8. check is all cf_handles are filled | |
3042 | ||
3043 | bool b; | |
3044 | std::vector<ColumnFamily> new_sharding_def; | |
3045 | char const* error_position; | |
3046 | std::string error_msg; | |
3047 | b = parse_sharding_def(new_sharding, new_sharding_def, &error_position, &error_msg); | |
3048 | if (!b) { | |
3049 | dout(1) << __func__ << " bad sharding: " << dendl; | |
3050 | dout(1) << __func__ << new_sharding << dendl; | |
3051 | dout(1) << __func__ << std::string(error_position - &new_sharding[0], ' ') << "^" << error_msg << dendl; | |
3052 | return -EINVAL; | |
3053 | } | |
3054 | ||
3055 | //0. lock db from opening | |
3056 | std::string stored_sharding_text; | |
3057 | rocksdb::ReadFileToString(env, | |
3058 | sharding_def_file, | |
3059 | &stored_sharding_text); | |
3060 | if (stored_sharding_text.find(resharding_column_lock) == string::npos) { | |
3061 | rocksdb::Status status; | |
3062 | if (stored_sharding_text.size() != 0) | |
3063 | stored_sharding_text += " "; | |
3064 | stored_sharding_text += resharding_column_lock; | |
3065 | env->CreateDir(sharding_def_dir); | |
3066 | status = rocksdb::WriteStringToFile(env, stored_sharding_text, | |
3067 | sharding_def_file, true); | |
3068 | if (!status.ok()) { | |
3069 | derr << __func__ << " cannot write to " << sharding_def_file << dendl; | |
3070 | return -EIO; | |
3071 | } | |
3072 | } | |
3073 | ||
3074 | //1. list existing columns | |
3075 | ||
3076 | rocksdb::Status status; | |
3077 | std::vector<std::string> existing_columns; | |
3078 | rocksdb::Options opt; | |
3079 | int r = load_rocksdb_options(false, opt); | |
3080 | if (r) { | |
3081 | dout(1) << __func__ << " load rocksdb options failed" << dendl; | |
3082 | return r; | |
11fdf7f2 | 3083 | } |
f67539c2 TL |
3084 | status = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(opt), path, &existing_columns); |
3085 | if (!status.ok()) { | |
3086 | derr << "Unable to list column families: " << status.ToString() << dendl; | |
3087 | return -EINVAL; | |
3088 | } | |
3089 | dout(5) << "existing columns = " << existing_columns << dendl; | |
3090 | ||
3091 | //2. apply merge operator to (main + columns) opts | |
3092 | //3. prepare std::vector<rocksdb::ColumnFamilyDescriptor> cfs_to_open | |
3093 | ||
3094 | std::vector<rocksdb::ColumnFamilyDescriptor> cfs_to_open; | |
3095 | for (const auto& full_name : existing_columns) { | |
3096 | //split col_name to <prefix>-<number> | |
3097 | std::string base_name; | |
3098 | size_t pos = full_name.find('-'); | |
3099 | if (std::string::npos == pos) | |
3100 | base_name = full_name; | |
3101 | else | |
3102 | base_name = full_name.substr(0,pos); | |
3103 | ||
3104 | rocksdb::ColumnFamilyOptions cf_opt(opt); | |
3105 | // search if we have options for this column | |
3106 | std::string options; | |
3107 | for (const auto& nsd : new_sharding_def) { | |
3108 | if (nsd.name == base_name) { | |
3109 | options = nsd.options; | |
3110 | break; | |
3111 | } | |
3112 | } | |
522d829b TL |
3113 | int r = update_column_family_options(base_name, options, &cf_opt); |
3114 | if (r != 0) { | |
3115 | return r; | |
f67539c2 | 3116 | } |
f67539c2 TL |
3117 | cfs_to_open.emplace_back(full_name, cf_opt); |
3118 | } | |
3119 | ||
3120 | //4. open db, acquire existing column handles | |
3121 | std::vector<rocksdb::ColumnFamilyHandle*> handles; | |
3122 | status = rocksdb::DB::Open(rocksdb::DBOptions(opt), | |
3123 | path, cfs_to_open, &handles, &db); | |
3124 | if (!status.ok()) { | |
3125 | derr << status.ToString() << dendl; | |
3126 | return -EINVAL; | |
3127 | } | |
3128 | for (size_t i = 0; i < cfs_to_open.size(); i++) { | |
3129 | dout(10) << "column " << cfs_to_open[i].name << " handle " << (void*)handles[i] << dendl; | |
3130 | } | |
3131 | ||
3132 | //5. calculate missing columns | |
3133 | std::vector<std::string> new_sharding_columns; | |
3134 | std::vector<std::string> missing_columns; | |
3135 | sharding_def_to_columns(new_sharding_def, | |
3136 | new_sharding_columns); | |
3137 | dout(5) << "target columns = " << new_sharding_columns << dendl; | |
3138 | for (const auto& n : new_sharding_columns) { | |
3139 | bool found = false; | |
3140 | for (const auto& e : existing_columns) { | |
3141 | if (n == e) { | |
3142 | found = true; | |
3143 | break; | |
3144 | } | |
3145 | } | |
3146 | if (!found) { | |
3147 | missing_columns.push_back(n); | |
3148 | } | |
3149 | } | |
3150 | dout(5) << "missing columns = " << missing_columns << dendl; | |
3151 | ||
3152 | //6. create missing columns | |
3153 | for (const auto& full_name : missing_columns) { | |
3154 | std::string base_name; | |
3155 | size_t pos = full_name.find('-'); | |
3156 | if (std::string::npos == pos) | |
3157 | base_name = full_name; | |
3158 | else | |
3159 | base_name = full_name.substr(0,pos); | |
3160 | ||
3161 | rocksdb::ColumnFamilyOptions cf_opt(opt); | |
3162 | // search if we have options for this column | |
3163 | std::string options; | |
3164 | for (const auto& nsd : new_sharding_def) { | |
3165 | if (nsd.name == base_name) { | |
3166 | options = nsd.options; | |
3167 | break; | |
3168 | } | |
3169 | } | |
522d829b TL |
3170 | int r = update_column_family_options(base_name, options, &cf_opt); |
3171 | if (r != 0) { | |
3172 | return r; | |
f67539c2 | 3173 | } |
f67539c2 TL |
3174 | rocksdb::ColumnFamilyHandle *cf; |
3175 | status = db->CreateColumnFamily(cf_opt, full_name, &cf); | |
3176 | if (!status.ok()) { | |
3177 | derr << __func__ << " Failed to create rocksdb column family: " | |
3178 | << full_name << dendl; | |
3179 | return -EINVAL; | |
3180 | } | |
3181 | dout(10) << "created column " << full_name << " handle = " << (void*)cf << dendl; | |
3182 | existing_columns.push_back(full_name); | |
3183 | handles.push_back(cf); | |
3184 | } | |
3185 | ||
3186 | //7. construct cf_handles according to new sharding | |
3187 | for (size_t i = 0; i < existing_columns.size(); i++) { | |
3188 | std::string full_name = existing_columns[i]; | |
3189 | rocksdb::ColumnFamilyHandle *cf = handles[i]; | |
3190 | std::string base_name; | |
3191 | size_t shard_idx = 0; | |
3192 | size_t pos = full_name.find('-'); | |
3193 | dout(10) << "processing column " << full_name << dendl; | |
3194 | if (std::string::npos == pos) { | |
3195 | base_name = full_name; | |
3196 | } else { | |
3197 | base_name = full_name.substr(0,pos); | |
3198 | shard_idx = atoi(full_name.substr(pos+1).c_str()); | |
3199 | } | |
3200 | if (rocksdb::kDefaultColumnFamilyName == base_name) { | |
3201 | default_cf = handles[i]; | |
3202 | must_close_default_cf = true; | |
3203 | std::unique_ptr<rocksdb::ColumnFamilyHandle, cf_deleter_t> ptr{ | |
3204 | cf, [](rocksdb::ColumnFamilyHandle*) {}}; | |
3205 | to_process_columns.emplace(full_name, std::move(ptr)); | |
3206 | } else { | |
3207 | for (const auto& nsd : new_sharding_def) { | |
3208 | if (nsd.name == base_name) { | |
3209 | if (shard_idx < nsd.shard_cnt) { | |
3210 | add_column_family(base_name, nsd.hash_l, nsd.hash_h, shard_idx, cf); | |
3211 | } else { | |
3212 | //ignore columns with index larger then shard count | |
3213 | } | |
3214 | break; | |
3215 | } | |
3216 | } | |
3217 | std::unique_ptr<rocksdb::ColumnFamilyHandle, cf_deleter_t> ptr{ | |
3218 | cf, [this](rocksdb::ColumnFamilyHandle* handle) { | |
3219 | db->DestroyColumnFamilyHandle(handle); | |
3220 | }}; | |
3221 | to_process_columns.emplace(full_name, std::move(ptr)); | |
3222 | } | |
3223 | } | |
3224 | ||
3225 | //8. check if all cf_handles are filled | |
3226 | for (const auto& col : cf_handles) { | |
3227 | for (size_t i = 0; i < col.second.handles.size(); i++) { | |
3228 | if (col.second.handles[i] == nullptr) { | |
3229 | derr << "missing handle for column " << col.first << " shard " << i << dendl; | |
3230 | return -EIO; | |
3231 | } | |
3232 | } | |
3233 | } | |
3234 | return 0; | |
3235 | } | |
3236 | ||
3237 | int RocksDBStore::reshard_cleanup(const RocksDBStore::columns_t& current_columns) | |
3238 | { | |
3239 | std::vector<std::string> new_sharding_columns; | |
3240 | for (const auto& [name, handle] : cf_handles) { | |
3241 | if (handle.handles.size() == 1) { | |
3242 | new_sharding_columns.push_back(name); | |
3243 | } else { | |
3244 | for (size_t i = 0; i < handle.handles.size(); i++) { | |
20effc67 | 3245 | new_sharding_columns.push_back(name + "-" + std::to_string(i)); |
f67539c2 TL |
3246 | } |
3247 | } | |
3248 | } | |
3249 | ||
3250 | for (auto& [name, handle] : current_columns) { | |
3251 | auto found = std::find(new_sharding_columns.begin(), | |
3252 | new_sharding_columns.end(), | |
3253 | name) != new_sharding_columns.end(); | |
3254 | if (found || name == rocksdb::kDefaultColumnFamilyName) { | |
3255 | dout(5) << "Column " << name << " is part of new sharding." << dendl; | |
3256 | continue; | |
3257 | } | |
3258 | dout(5) << "Column " << name << " not part of new sharding. Deleting." << dendl; | |
3259 | ||
3260 | // verify that column is empty | |
3261 | std::unique_ptr<rocksdb::Iterator> it{ | |
3262 | db->NewIterator(rocksdb::ReadOptions(), handle.get())}; | |
3263 | ceph_assert(it); | |
3264 | it->SeekToFirst(); | |
3265 | ceph_assert(!it->Valid()); | |
3266 | ||
3267 | if (rocksdb::Status status = db->DropColumnFamily(handle.get()); !status.ok()) { | |
3268 | derr << __func__ << " Failed to drop column: " << name << dendl; | |
3269 | return -EINVAL; | |
3270 | } | |
3271 | } | |
3272 | return 0; | |
3273 | } | |
3274 | ||
3275 | int RocksDBStore::reshard(const std::string& new_sharding, const RocksDBStore::resharding_ctrl* ctrl_in) | |
3276 | { | |
3277 | ||
3278 | resharding_ctrl ctrl = ctrl_in ? *ctrl_in : resharding_ctrl(); | |
3279 | size_t bytes_in_batch = 0; | |
3280 | size_t keys_in_batch = 0; | |
3281 | size_t bytes_per_iterator = 0; | |
3282 | size_t keys_per_iterator = 0; | |
3283 | size_t keys_processed = 0; | |
3284 | size_t keys_moved = 0; | |
3285 | ||
3286 | auto flush_batch = [&](rocksdb::WriteBatch* batch) { | |
3287 | dout(10) << "flushing batch, " << keys_in_batch << " keys, for " | |
3288 | << bytes_in_batch << " bytes" << dendl; | |
3289 | rocksdb::WriteOptions woptions; | |
3290 | woptions.sync = true; | |
3291 | rocksdb::Status s = db->Write(woptions, batch); | |
3292 | ceph_assert(s.ok()); | |
3293 | bytes_in_batch = 0; | |
3294 | keys_in_batch = 0; | |
3295 | batch->Clear(); | |
3296 | }; | |
3297 | ||
3298 | auto process_column = [&](rocksdb::ColumnFamilyHandle* handle, | |
3299 | const std::string& fixed_prefix) | |
3300 | { | |
3301 | dout(5) << " column=" << (void*)handle << " prefix=" << fixed_prefix << dendl; | |
3302 | std::unique_ptr<rocksdb::Iterator> it{ | |
3303 | db->NewIterator(rocksdb::ReadOptions(), handle)}; | |
3304 | ceph_assert(it); | |
3305 | ||
3306 | rocksdb::WriteBatch bat; | |
3307 | for (it->SeekToFirst(); it->Valid(); it->Next()) { | |
3308 | rocksdb::Slice raw_key = it->key(); | |
3309 | dout(30) << "key=" << pretty_binary_string(raw_key.ToString()) << dendl; | |
3310 | //check if need to refresh iterator | |
3311 | if (bytes_per_iterator >= ctrl.bytes_per_iterator || | |
3312 | keys_per_iterator >= ctrl.keys_per_iterator) { | |
3313 | dout(8) << "refreshing iterator" << dendl; | |
3314 | bytes_per_iterator = 0; | |
3315 | keys_per_iterator = 0; | |
3316 | std::string raw_key_str = raw_key.ToString(); | |
3317 | it.reset(db->NewIterator(rocksdb::ReadOptions(), handle)); | |
3318 | ceph_assert(it); | |
3319 | it->Seek(raw_key_str); | |
3320 | ceph_assert(it->Valid()); | |
3321 | raw_key = it->key(); | |
3322 | } | |
3323 | rocksdb::Slice value = it->value(); | |
3324 | std::string prefix, key; | |
3325 | if (fixed_prefix.size() == 0) { | |
3326 | split_key(raw_key, &prefix, &key); | |
3327 | } else { | |
3328 | prefix = fixed_prefix; | |
3329 | key = raw_key.ToString(); | |
3330 | } | |
3331 | keys_processed++; | |
3332 | if ((keys_processed % 10000) == 0) { | |
3333 | dout(10) << "processed " << keys_processed << " keys, moved " << keys_moved << dendl; | |
3334 | } | |
3335 | rocksdb::ColumnFamilyHandle* new_handle = get_cf_handle(prefix, key); | |
3336 | if (new_handle == nullptr) { | |
3337 | new_handle = default_cf; | |
3338 | } | |
3339 | if (handle == new_handle) { | |
3340 | continue; | |
3341 | } | |
3342 | std::string new_raw_key; | |
3343 | if (new_handle == default_cf) { | |
3344 | new_raw_key = combine_strings(prefix, key); | |
3345 | } else { | |
3346 | new_raw_key = key; | |
3347 | } | |
3348 | bat.Delete(handle, raw_key); | |
3349 | bat.Put(new_handle, new_raw_key, value); | |
3350 | dout(25) << "moving " << (void*)handle << "/" << pretty_binary_string(raw_key.ToString()) << | |
3351 | " to " << (void*)new_handle << "/" << pretty_binary_string(new_raw_key) << | |
3352 | " size " << value.size() << dendl; | |
3353 | keys_moved++; | |
3354 | bytes_in_batch += new_raw_key.size() * 2 + value.size(); | |
3355 | keys_in_batch++; | |
3356 | bytes_per_iterator += new_raw_key.size() * 2 + value.size(); | |
3357 | keys_per_iterator++; | |
3358 | ||
3359 | //check if need to write batch | |
3360 | if (bytes_in_batch >= ctrl.bytes_per_batch || | |
3361 | keys_in_batch >= ctrl.keys_per_batch) { | |
3362 | flush_batch(&bat); | |
3363 | if (ctrl.unittest_fail_after_first_batch) { | |
3364 | return -1000; | |
3365 | } | |
3366 | } | |
3367 | } | |
3368 | if (bat.Count() > 0) { | |
3369 | flush_batch(&bat); | |
3370 | } | |
3371 | return 0; | |
3372 | }; | |
3373 | ||
3374 | auto close_column_handles = make_scope_guard([this] { | |
3375 | cf_handles.clear(); | |
3376 | close(); | |
3377 | }); | |
3378 | columns_t to_process_columns; | |
3379 | int r = prepare_for_reshard(new_sharding, to_process_columns); | |
3380 | if (r != 0) { | |
3381 | dout(1) << "failed to prepare db for reshard" << dendl; | |
3382 | return r; | |
3383 | } | |
3384 | ||
3385 | for (auto& [name, handle] : to_process_columns) { | |
3386 | dout(5) << "Processing column=" << name | |
3387 | << " handle=" << handle.get() << dendl; | |
3388 | if (name == rocksdb::kDefaultColumnFamilyName) { | |
3389 | ceph_assert(handle.get() == default_cf); | |
3390 | r = process_column(default_cf, std::string()); | |
3391 | } else { | |
3392 | std::string fixed_prefix = name.substr(0, name.find('-')); | |
3393 | dout(10) << "Prefix: " << fixed_prefix << dendl; | |
3394 | r = process_column(handle.get(), fixed_prefix); | |
3395 | } | |
3396 | if (r != 0) { | |
3397 | derr << "Error processing column " << name << dendl; | |
3398 | return r; | |
3399 | } | |
3400 | if (ctrl.unittest_fail_after_processing_column) { | |
3401 | return -1001; | |
3402 | } | |
3403 | } | |
3404 | ||
3405 | r = reshard_cleanup(to_process_columns); | |
3406 | if (r != 0) { | |
3407 | dout(5) << "failed to cleanup after reshard" << dendl; | |
3408 | return r; | |
3409 | } | |
3410 | ||
3411 | if (ctrl.unittest_fail_after_successful_processing) { | |
3412 | return -1002; | |
3413 | } | |
3414 | env->CreateDir(sharding_def_dir); | |
3415 | if (auto status = rocksdb::WriteStringToFile(env, new_sharding, | |
3416 | sharding_def_file, true); | |
3417 | !status.ok()) { | |
3418 | derr << __func__ << " cannot write to " << sharding_def_file << dendl; | |
3419 | return -EIO; | |
3420 | } | |
3421 | ||
3422 | return r; | |
3423 | } | |
3424 | ||
3425 | bool RocksDBStore::get_sharding(std::string& sharding) { | |
3426 | rocksdb::Status status; | |
3427 | std::string stored_sharding_text; | |
3428 | bool result = false; | |
3429 | sharding.clear(); | |
3430 | ||
3431 | status = env->FileExists(sharding_def_file); | |
3432 | if (status.ok()) { | |
3433 | status = rocksdb::ReadFileToString(env, | |
3434 | sharding_def_file, | |
3435 | &stored_sharding_text); | |
3436 | if(status.ok()) { | |
3437 | result = true; | |
3438 | sharding = stored_sharding_text; | |
3439 | } | |
3440 | } | |
3441 | return result; | |
11fdf7f2 | 3442 | } |