]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include <set> | |
5 | #include <map> | |
6 | #include <string> | |
7 | #include <memory> | |
8 | #include <errno.h> | |
9 | #include <unistd.h> | |
10 | #include <sys/types.h> | |
11 | #include <sys/stat.h> | |
12 | ||
13 | #include "rocksdb/db.h" | |
14 | #include "rocksdb/table.h" | |
15 | #include "rocksdb/env.h" | |
16 | #include "rocksdb/slice.h" | |
17 | #include "rocksdb/cache.h" | |
18 | #include "rocksdb/filter_policy.h" | |
19 | #include "rocksdb/utilities/convenience.h" | |
20 | #include "rocksdb/merge_operator.h" | |
91327a77 | 21 | |
7c673cae FG |
22 | using std::string; |
23 | #include "common/perf_counters.h" | |
91327a77 | 24 | #include "common/PriorityCache.h" |
7c673cae FG |
25 | #include "include/str_list.h" |
26 | #include "include/stringify.h" | |
27 | #include "include/str_map.h" | |
28 | #include "KeyValueDB.h" | |
29 | #include "RocksDBStore.h" | |
30 | ||
31 | #include "common/debug.h" | |
32 | ||
33 | #define dout_context cct | |
34 | #define dout_subsys ceph_subsys_rocksdb | |
35 | #undef dout_prefix | |
36 | #define dout_prefix *_dout << "rocksdb: " | |
37 | ||
11fdf7f2 TL |
38 | static bufferlist to_bufferlist(rocksdb::Slice in) { |
39 | bufferlist bl; | |
40 | bl.append(bufferptr(in.data(), in.size())); | |
41 | return bl; | |
42 | } | |
43 | ||
c07f9fc5 FG |
44 | static rocksdb::SliceParts prepare_sliceparts(const bufferlist &bl, |
45 | vector<rocksdb::Slice> *slices) | |
31f18b77 FG |
46 | { |
47 | unsigned n = 0; | |
c07f9fc5 FG |
48 | for (auto& buf : bl.buffers()) { |
49 | (*slices)[n].data_ = buf.c_str(); | |
50 | (*slices)[n].size_ = buf.length(); | |
51 | n++; | |
31f18b77 | 52 | } |
c07f9fc5 | 53 | return rocksdb::SliceParts(slices->data(), slices->size()); |
31f18b77 FG |
54 | } |
55 | ||
11fdf7f2 | 56 | |
7c673cae | 57 | // |
11fdf7f2 TL |
58 | // One of these for the default rocksdb column family, routing each prefix |
59 | // to the appropriate MergeOperator. | |
7c673cae | 60 | // |
11fdf7f2 TL |
61 | class RocksDBStore::MergeOperatorRouter |
62 | : public rocksdb::AssociativeMergeOperator | |
63 | { | |
7c673cae | 64 | RocksDBStore& store; |
11fdf7f2 | 65 | public: |
7c673cae FG |
66 | const char *Name() const override { |
67 | // Construct a name that rocksDB will validate against. We want to | |
68 | // do this in a way that doesn't constrain the ordering of calls | |
69 | // to set_merge_operator, so sort the merge operators and then | |
70 | // construct a name from all of those parts. | |
71 | store.assoc_name.clear(); | |
72 | map<std::string,std::string> names; | |
11fdf7f2 TL |
73 | |
74 | for (auto& p : store.merge_ops) { | |
75 | names[p.first] = p.second->name(); | |
76 | } | |
77 | for (auto& p : store.cf_handles) { | |
78 | names.erase(p.first); | |
79 | } | |
7c673cae FG |
80 | for (auto& p : names) { |
81 | store.assoc_name += '.'; | |
82 | store.assoc_name += p.first; | |
83 | store.assoc_name += ':'; | |
84 | store.assoc_name += p.second; | |
85 | } | |
86 | return store.assoc_name.c_str(); | |
87 | } | |
88 | ||
11fdf7f2 | 89 | explicit MergeOperatorRouter(RocksDBStore &_store) : store(_store) {} |
7c673cae FG |
90 | |
91 | bool Merge(const rocksdb::Slice& key, | |
11fdf7f2 TL |
92 | const rocksdb::Slice* existing_value, |
93 | const rocksdb::Slice& value, | |
94 | std::string* new_value, | |
95 | rocksdb::Logger* logger) const override { | |
96 | // for default column family | |
97 | // extract prefix from key and compare against each registered merge op; | |
98 | // even though merge operator for explicit CF is included in merge_ops, | |
99 | // it won't be picked up, since it won't match. | |
7c673cae FG |
100 | for (auto& p : store.merge_ops) { |
101 | if (p.first.compare(0, p.first.length(), | |
102 | key.data(), p.first.length()) == 0 && | |
103 | key.data()[p.first.length()] == 0) { | |
11fdf7f2 TL |
104 | if (existing_value) { |
105 | p.second->merge(existing_value->data(), existing_value->size(), | |
7c673cae FG |
106 | value.data(), value.size(), |
107 | new_value); | |
11fdf7f2 TL |
108 | } else { |
109 | p.second->merge_nonexistent(value.data(), value.size(), new_value); | |
110 | } | |
111 | break; | |
7c673cae FG |
112 | } |
113 | } | |
114 | return true; // OK :) | |
115 | } | |
11fdf7f2 TL |
116 | }; |
117 | ||
118 | // | |
119 | // One of these per non-default column family, linked directly to the | |
120 | // merge operator for that CF/prefix (if any). | |
121 | // | |
122 | class RocksDBStore::MergeOperatorLinker | |
123 | : public rocksdb::AssociativeMergeOperator | |
124 | { | |
125 | private: | |
126 | std::shared_ptr<KeyValueDB::MergeOperator> mop; | |
127 | public: | |
128 | explicit MergeOperatorLinker(const std::shared_ptr<KeyValueDB::MergeOperator> &o) : mop(o) {} | |
7c673cae | 129 | |
11fdf7f2 TL |
130 | const char *Name() const override { |
131 | return mop->name(); | |
132 | } | |
133 | ||
134 | bool Merge(const rocksdb::Slice& key, | |
135 | const rocksdb::Slice* existing_value, | |
136 | const rocksdb::Slice& value, | |
137 | std::string* new_value, | |
138 | rocksdb::Logger* logger) const override { | |
139 | if (existing_value) { | |
140 | mop->merge(existing_value->data(), existing_value->size(), | |
141 | value.data(), value.size(), | |
142 | new_value); | |
143 | } else { | |
144 | mop->merge_nonexistent(value.data(), value.size(), new_value); | |
145 | } | |
146 | return true; | |
147 | } | |
7c673cae FG |
148 | }; |
149 | ||
150 | int RocksDBStore::set_merge_operator( | |
151 | const string& prefix, | |
152 | std::shared_ptr<KeyValueDB::MergeOperator> mop) | |
153 | { | |
154 | // If you fail here, it's because you can't do this on an open database | |
11fdf7f2 | 155 | ceph_assert(db == nullptr); |
7c673cae FG |
156 | merge_ops.push_back(std::make_pair(prefix,mop)); |
157 | return 0; | |
158 | } | |
159 | ||
160 | class CephRocksdbLogger : public rocksdb::Logger { | |
161 | CephContext *cct; | |
162 | public: | |
163 | explicit CephRocksdbLogger(CephContext *c) : cct(c) { | |
164 | cct->get(); | |
165 | } | |
166 | ~CephRocksdbLogger() override { | |
167 | cct->put(); | |
168 | } | |
169 | ||
170 | // Write an entry to the log file with the specified format. | |
171 | void Logv(const char* format, va_list ap) override { | |
172 | Logv(rocksdb::INFO_LEVEL, format, ap); | |
173 | } | |
174 | ||
175 | // Write an entry to the log file with the specified log level | |
176 | // and format. Any log with level under the internal log level | |
177 | // of *this (see @SetInfoLogLevel and @GetInfoLogLevel) will not be | |
178 | // printed. | |
179 | void Logv(const rocksdb::InfoLogLevel log_level, const char* format, | |
180 | va_list ap) override { | |
181 | int v = rocksdb::NUM_INFO_LOG_LEVELS - log_level - 1; | |
11fdf7f2 | 182 | dout(ceph::dout::need_dynamic(v)); |
7c673cae FG |
183 | char buf[65536]; |
184 | vsnprintf(buf, sizeof(buf), format, ap); | |
185 | *_dout << buf << dendl; | |
186 | } | |
187 | }; | |
188 | ||
189 | rocksdb::Logger *create_rocksdb_ceph_logger() | |
190 | { | |
191 | return new CephRocksdbLogger(g_ceph_context); | |
192 | } | |
193 | ||
c07f9fc5 | 194 | static int string2bool(const string &val, bool &b_val) |
7c673cae FG |
195 | { |
196 | if (strcasecmp(val.c_str(), "false") == 0) { | |
197 | b_val = false; | |
198 | return 0; | |
199 | } else if (strcasecmp(val.c_str(), "true") == 0) { | |
200 | b_val = true; | |
201 | return 0; | |
202 | } else { | |
203 | std::string err; | |
204 | int b = strict_strtol(val.c_str(), 10, &err); | |
205 | if (!err.empty()) | |
206 | return -EINVAL; | |
207 | b_val = !!b; | |
208 | return 0; | |
209 | } | |
210 | } | |
211 | ||
c07f9fc5 | 212 | int RocksDBStore::tryInterpret(const string &key, const string &val, rocksdb::Options &opt) |
7c673cae FG |
213 | { |
214 | if (key == "compaction_threads") { | |
215 | std::string err; | |
1adf2230 | 216 | int f = strict_iecstrtoll(val.c_str(), &err); |
7c673cae FG |
217 | if (!err.empty()) |
218 | return -EINVAL; | |
219 | //Low priority threadpool is used for compaction | |
220 | opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::LOW); | |
221 | } else if (key == "flusher_threads") { | |
222 | std::string err; | |
1adf2230 | 223 | int f = strict_iecstrtoll(val.c_str(), &err); |
7c673cae FG |
224 | if (!err.empty()) |
225 | return -EINVAL; | |
226 | //High priority threadpool is used for flusher | |
227 | opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::HIGH); | |
228 | } else if (key == "compact_on_mount") { | |
229 | int ret = string2bool(val, compact_on_mount); | |
230 | if (ret != 0) | |
231 | return ret; | |
232 | } else if (key == "disableWAL") { | |
233 | int ret = string2bool(val, disableWAL); | |
234 | if (ret != 0) | |
235 | return ret; | |
236 | } else { | |
237 | //unrecognize config options. | |
238 | return -EINVAL; | |
239 | } | |
240 | return 0; | |
241 | } | |
242 | ||
c07f9fc5 | 243 | int RocksDBStore::ParseOptionsFromString(const string &opt_str, rocksdb::Options &opt) |
7c673cae FG |
244 | { |
245 | map<string, string> str_map; | |
246 | int r = get_str_map(opt_str, &str_map, ",\n;"); | |
247 | if (r < 0) | |
248 | return r; | |
249 | map<string, string>::iterator it; | |
250 | for(it = str_map.begin(); it != str_map.end(); ++it) { | |
251 | string this_opt = it->first + "=" + it->second; | |
252 | rocksdb::Status status = rocksdb::GetOptionsFromString(opt, this_opt , &opt); | |
253 | if (!status.ok()) { | |
254 | //unrecognized by rocksdb, try to interpret by ourselves. | |
255 | r = tryInterpret(it->first, it->second, opt); | |
256 | if (r < 0) { | |
257 | derr << status.ToString() << dendl; | |
258 | return -EINVAL; | |
259 | } | |
260 | } | |
261 | lgeneric_dout(cct, 0) << " set rocksdb option " << it->first | |
262 | << " = " << it->second << dendl; | |
263 | } | |
264 | return 0; | |
265 | } | |
266 | ||
267 | int RocksDBStore::init(string _options_str) | |
268 | { | |
269 | options_str = _options_str; | |
270 | rocksdb::Options opt; | |
271 | //try parse options | |
272 | if (options_str.length()) { | |
273 | int r = ParseOptionsFromString(options_str, opt); | |
274 | if (r != 0) { | |
275 | return -EINVAL; | |
276 | } | |
277 | } | |
278 | return 0; | |
279 | } | |
280 | ||
11fdf7f2 | 281 | int RocksDBStore::create_db_dir() |
7c673cae FG |
282 | { |
283 | if (env) { | |
284 | unique_ptr<rocksdb::Directory> dir; | |
285 | env->NewDirectory(path, &dir); | |
286 | } else { | |
287 | int r = ::mkdir(path.c_str(), 0755); | |
288 | if (r < 0) | |
289 | r = -errno; | |
290 | if (r < 0 && r != -EEXIST) { | |
291 | derr << __func__ << " failed to create " << path << ": " << cpp_strerror(r) | |
292 | << dendl; | |
293 | return r; | |
294 | } | |
295 | } | |
11fdf7f2 TL |
296 | return 0; |
297 | } | |
298 | ||
299 | int RocksDBStore::install_cf_mergeop( | |
300 | const string &cf_name, | |
301 | rocksdb::ColumnFamilyOptions *cf_opt) | |
302 | { | |
303 | ceph_assert(cf_opt != nullptr); | |
304 | cf_opt->merge_operator.reset(); | |
305 | for (auto& i : merge_ops) { | |
306 | if (i.first == cf_name) { | |
307 | cf_opt->merge_operator.reset(new MergeOperatorLinker(i.second)); | |
308 | } | |
309 | } | |
310 | return 0; | |
7c673cae FG |
311 | } |
312 | ||
11fdf7f2 TL |
313 | int RocksDBStore::create_and_open(ostream &out, |
314 | const vector<ColumnFamily>& cfs) | |
315 | { | |
316 | int r = create_db_dir(); | |
317 | if (r < 0) | |
318 | return r; | |
319 | if (cfs.empty()) { | |
320 | return do_open(out, true, false, nullptr); | |
321 | } else { | |
322 | return do_open(out, true, false, &cfs); | |
323 | } | |
324 | } | |
325 | ||
326 | int RocksDBStore::load_rocksdb_options(bool create_if_missing, rocksdb::Options& opt) | |
7c673cae | 327 | { |
7c673cae FG |
328 | rocksdb::Status status; |
329 | ||
330 | if (options_str.length()) { | |
331 | int r = ParseOptionsFromString(options_str, opt); | |
332 | if (r != 0) { | |
333 | return -EINVAL; | |
334 | } | |
335 | } | |
336 | ||
11fdf7f2 | 337 | if (g_conf()->rocksdb_perf) { |
7c673cae FG |
338 | dbstats = rocksdb::CreateDBStatistics(); |
339 | opt.statistics = dbstats; | |
340 | } | |
341 | ||
342 | opt.create_if_missing = create_if_missing; | |
11fdf7f2 | 343 | if (kv_options.count("separate_wal_dir")) { |
7c673cae FG |
344 | opt.wal_dir = path + ".wal"; |
345 | } | |
11fdf7f2 TL |
346 | |
347 | // Since ceph::for_each_substr doesn't return a value and | |
348 | // std::stoull does throw, we may as well just catch everything here. | |
349 | try { | |
350 | if (kv_options.count("db_paths")) { | |
351 | list<string> paths; | |
352 | get_str_list(kv_options["db_paths"], "; \t", paths); | |
353 | for (auto& p : paths) { | |
354 | size_t pos = p.find(','); | |
355 | if (pos == std::string::npos) { | |
356 | derr << __func__ << " invalid db path item " << p << " in " | |
357 | << kv_options["db_paths"] << dendl; | |
358 | return -EINVAL; | |
359 | } | |
360 | string path = p.substr(0, pos); | |
361 | string size_str = p.substr(pos + 1); | |
362 | uint64_t size = atoll(size_str.c_str()); | |
363 | if (!size) { | |
364 | derr << __func__ << " invalid db path item " << p << " in " | |
365 | << kv_options["db_paths"] << dendl; | |
366 | return -EINVAL; | |
367 | } | |
368 | opt.db_paths.push_back(rocksdb::DbPath(path, size)); | |
369 | dout(10) << __func__ << " db_path " << path << " size " << size << dendl; | |
7c673cae | 370 | } |
7c673cae | 371 | } |
11fdf7f2 TL |
372 | } catch (const std::system_error& e) { |
373 | return -e.code().value(); | |
7c673cae FG |
374 | } |
375 | ||
11fdf7f2 | 376 | if (g_conf()->rocksdb_log_to_ceph_log) { |
7c673cae FG |
377 | opt.info_log.reset(new CephRocksdbLogger(g_ceph_context)); |
378 | } | |
379 | ||
380 | if (priv) { | |
381 | dout(10) << __func__ << " using custom Env " << priv << dendl; | |
382 | opt.env = static_cast<rocksdb::Env*>(priv); | |
383 | } | |
384 | ||
eafe8130 TL |
385 | opt.env->SetAllowNonOwnerAccess(false); |
386 | ||
31f18b77 | 387 | // caches |
224ce89b | 388 | if (!set_cache_flag) { |
11fdf7f2 | 389 | cache_size = g_conf()->rocksdb_cache_size; |
31f18b77 | 390 | } |
11fdf7f2 | 391 | uint64_t row_cache_size = cache_size * g_conf()->rocksdb_cache_row_ratio; |
31f18b77 | 392 | uint64_t block_cache_size = cache_size - row_cache_size; |
224ce89b | 393 | |
11fdf7f2 | 394 | if (g_conf()->rocksdb_cache_type == "binned_lru") { |
91327a77 | 395 | bbt_opts.block_cache = rocksdb_cache::NewBinnedLRUCache( |
11fdf7f2 | 396 | cct, |
91327a77 | 397 | block_cache_size, |
11fdf7f2 TL |
398 | g_conf()->rocksdb_cache_shard_bits); |
399 | } else if (g_conf()->rocksdb_cache_type == "lru") { | |
91327a77 AA |
400 | bbt_opts.block_cache = rocksdb::NewLRUCache( |
401 | block_cache_size, | |
11fdf7f2 TL |
402 | g_conf()->rocksdb_cache_shard_bits); |
403 | } else if (g_conf()->rocksdb_cache_type == "clock") { | |
91327a77 AA |
404 | bbt_opts.block_cache = rocksdb::NewClockCache( |
405 | block_cache_size, | |
11fdf7f2 | 406 | g_conf()->rocksdb_cache_shard_bits); |
91327a77 | 407 | if (!bbt_opts.block_cache) { |
11fdf7f2 | 408 | derr << "rocksdb_cache_type '" << g_conf()->rocksdb_cache_type |
91327a77 AA |
409 | << "' chosen, but RocksDB not compiled with LibTBB. " |
410 | << dendl; | |
224ce89b WB |
411 | return -EINVAL; |
412 | } | |
91327a77 | 413 | } else { |
11fdf7f2 | 414 | derr << "unrecognized rocksdb_cache_type '" << g_conf()->rocksdb_cache_type |
91327a77 AA |
415 | << "'" << dendl; |
416 | return -EINVAL; | |
31f18b77 | 417 | } |
11fdf7f2 | 418 | bbt_opts.block_size = g_conf()->rocksdb_block_size; |
31f18b77 | 419 | |
224ce89b WB |
420 | if (row_cache_size > 0) |
421 | opt.row_cache = rocksdb::NewLRUCache(row_cache_size, | |
11fdf7f2 TL |
422 | g_conf()->rocksdb_cache_shard_bits); |
423 | uint64_t bloom_bits = g_conf().get_val<uint64_t>("rocksdb_bloom_bits_per_key"); | |
c07f9fc5 | 424 | if (bloom_bits > 0) { |
7c673cae | 425 | dout(10) << __func__ << " set bloom filter bits per key to " |
c07f9fc5 FG |
426 | << bloom_bits << dendl; |
427 | bbt_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(bloom_bits)); | |
428 | } | |
11fdf7f2 TL |
429 | using std::placeholders::_1; |
430 | if (g_conf().with_val<std::string>("rocksdb_index_type", | |
431 | std::bind(std::equal_to<std::string>(), _1, | |
432 | "binary_search"))) | |
c07f9fc5 | 433 | bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kBinarySearch; |
11fdf7f2 TL |
434 | if (g_conf().with_val<std::string>("rocksdb_index_type", |
435 | std::bind(std::equal_to<std::string>(), _1, | |
436 | "hash_search"))) | |
c07f9fc5 | 437 | bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kHashSearch; |
11fdf7f2 TL |
438 | if (g_conf().with_val<std::string>("rocksdb_index_type", |
439 | std::bind(std::equal_to<std::string>(), _1, | |
440 | "two_level"))) | |
c07f9fc5 | 441 | bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; |
11fdf7f2 TL |
442 | if (!bbt_opts.no_block_cache) { |
443 | bbt_opts.cache_index_and_filter_blocks = | |
444 | g_conf().get_val<bool>("rocksdb_cache_index_and_filter_blocks"); | |
445 | bbt_opts.cache_index_and_filter_blocks_with_high_priority = | |
446 | g_conf().get_val<bool>("rocksdb_cache_index_and_filter_blocks_with_high_priority"); | |
447 | bbt_opts.pin_l0_filter_and_index_blocks_in_cache = | |
448 | g_conf().get_val<bool>("rocksdb_pin_l0_filter_and_index_blocks_in_cache"); | |
449 | } | |
450 | bbt_opts.partition_filters = g_conf().get_val<bool>("rocksdb_partition_filters"); | |
451 | if (g_conf().get_val<Option::size_t>("rocksdb_metadata_block_size") > 0) | |
452 | bbt_opts.metadata_block_size = g_conf().get_val<Option::size_t>("rocksdb_metadata_block_size"); | |
c07f9fc5 | 453 | |
7c673cae | 454 | opt.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbt_opts)); |
11fdf7f2 | 455 | dout(10) << __func__ << " block size " << g_conf()->rocksdb_block_size |
1adf2230 AA |
456 | << ", block_cache size " << byte_u_t(block_cache_size) |
457 | << ", row_cache size " << byte_u_t(row_cache_size) | |
31f18b77 | 458 | << "; shards " |
11fdf7f2 TL |
459 | << (1 << g_conf()->rocksdb_cache_shard_bits) |
460 | << ", type " << g_conf()->rocksdb_cache_type | |
31f18b77 | 461 | << dendl; |
7c673cae FG |
462 | |
463 | opt.merge_operator.reset(new MergeOperatorRouter(*this)); | |
11fdf7f2 TL |
464 | |
465 | return 0; | |
466 | } | |
467 | ||
468 | int RocksDBStore::do_open(ostream &out, | |
469 | bool create_if_missing, | |
470 | bool open_readonly, | |
471 | const vector<ColumnFamily>* cfs) | |
472 | { | |
473 | ceph_assert(!(create_if_missing && open_readonly)); | |
474 | rocksdb::Options opt; | |
475 | int r = load_rocksdb_options(create_if_missing, opt); | |
476 | if (r) { | |
477 | dout(1) << __func__ << " load rocksdb options failed" << dendl; | |
478 | return r; | |
479 | } | |
480 | rocksdb::Status status; | |
481 | if (create_if_missing) { | |
482 | status = rocksdb::DB::Open(opt, path, &db); | |
483 | if (!status.ok()) { | |
484 | derr << status.ToString() << dendl; | |
485 | return -EINVAL; | |
486 | } | |
487 | // create and open column families | |
488 | if (cfs) { | |
489 | for (auto& p : *cfs) { | |
490 | // copy default CF settings, block cache, merge operators as | |
491 | // the base for new CF | |
492 | rocksdb::ColumnFamilyOptions cf_opt(opt); | |
493 | // user input options will override the base options | |
494 | status = rocksdb::GetColumnFamilyOptionsFromString( | |
495 | cf_opt, p.option, &cf_opt); | |
496 | if (!status.ok()) { | |
497 | derr << __func__ << " invalid db column family option string for CF: " | |
498 | << p.name << dendl; | |
499 | return -EINVAL; | |
500 | } | |
501 | install_cf_mergeop(p.name, &cf_opt); | |
502 | rocksdb::ColumnFamilyHandle *cf; | |
503 | status = db->CreateColumnFamily(cf_opt, p.name, &cf); | |
504 | if (!status.ok()) { | |
505 | derr << __func__ << " Failed to create rocksdb column family: " | |
506 | << p.name << dendl; | |
507 | return -EINVAL; | |
508 | } | |
509 | // store the new CF handle | |
510 | add_column_family(p.name, static_cast<void*>(cf)); | |
511 | } | |
512 | } | |
513 | default_cf = db->DefaultColumnFamily(); | |
514 | } else { | |
515 | std::vector<string> existing_cfs; | |
516 | status = rocksdb::DB::ListColumnFamilies( | |
517 | rocksdb::DBOptions(opt), | |
518 | path, | |
519 | &existing_cfs); | |
520 | dout(1) << __func__ << " column families: " << existing_cfs << dendl; | |
521 | if (existing_cfs.empty()) { | |
522 | // no column families | |
523 | if (open_readonly) { | |
524 | status = rocksdb::DB::Open(opt, path, &db); | |
525 | } else { | |
526 | status = rocksdb::DB::OpenForReadOnly(opt, path, &db); | |
527 | } | |
528 | if (!status.ok()) { | |
529 | derr << status.ToString() << dendl; | |
530 | return -EINVAL; | |
531 | } | |
532 | default_cf = db->DefaultColumnFamily(); | |
533 | } else { | |
534 | // we cannot change column families for a created database. so, map | |
535 | // what options we are given to whatever cf's already exist. | |
536 | std::vector<rocksdb::ColumnFamilyDescriptor> column_families; | |
537 | for (auto& n : existing_cfs) { | |
538 | // copy default CF settings, block cache, merge operators as | |
539 | // the base for new CF | |
540 | rocksdb::ColumnFamilyOptions cf_opt(opt); | |
541 | bool found = false; | |
542 | if (cfs) { | |
543 | for (auto& i : *cfs) { | |
544 | if (i.name == n) { | |
545 | found = true; | |
546 | status = rocksdb::GetColumnFamilyOptionsFromString( | |
547 | cf_opt, i.option, &cf_opt); | |
548 | if (!status.ok()) { | |
549 | derr << __func__ << " invalid db column family options for CF '" | |
550 | << i.name << "': " << i.option << dendl; | |
551 | return -EINVAL; | |
552 | } | |
553 | } | |
554 | } | |
555 | } | |
556 | if (n != rocksdb::kDefaultColumnFamilyName) { | |
557 | install_cf_mergeop(n, &cf_opt); | |
558 | } | |
559 | column_families.push_back(rocksdb::ColumnFamilyDescriptor(n, cf_opt)); | |
560 | if (!found && n != rocksdb::kDefaultColumnFamilyName) { | |
561 | dout(1) << __func__ << " column family '" << n | |
562 | << "' exists but not expected" << dendl; | |
563 | } | |
564 | } | |
565 | std::vector<rocksdb::ColumnFamilyHandle*> handles; | |
566 | if (open_readonly) { | |
567 | status = rocksdb::DB::OpenForReadOnly(rocksdb::DBOptions(opt), | |
568 | path, column_families, | |
569 | &handles, &db); | |
570 | } else { | |
571 | status = rocksdb::DB::Open(rocksdb::DBOptions(opt), | |
572 | path, column_families, &handles, &db); | |
573 | } | |
574 | if (!status.ok()) { | |
575 | derr << status.ToString() << dendl; | |
576 | return -EINVAL; | |
577 | } | |
578 | for (unsigned i = 0; i < existing_cfs.size(); ++i) { | |
579 | if (existing_cfs[i] == rocksdb::kDefaultColumnFamilyName) { | |
580 | default_cf = handles[i]; | |
581 | must_close_default_cf = true; | |
582 | } else { | |
583 | add_column_family(existing_cfs[i], static_cast<void*>(handles[i])); | |
584 | } | |
585 | } | |
586 | } | |
7c673cae | 587 | } |
11fdf7f2 | 588 | ceph_assert(default_cf != nullptr); |
7c673cae FG |
589 | |
590 | PerfCountersBuilder plb(g_ceph_context, "rocksdb", l_rocksdb_first, l_rocksdb_last); | |
591 | plb.add_u64_counter(l_rocksdb_gets, "get", "Gets"); | |
592 | plb.add_u64_counter(l_rocksdb_txns, "submit_transaction", "Submit transactions"); | |
593 | plb.add_u64_counter(l_rocksdb_txns_sync, "submit_transaction_sync", "Submit transactions sync"); | |
594 | plb.add_time_avg(l_rocksdb_get_latency, "get_latency", "Get latency"); | |
595 | plb.add_time_avg(l_rocksdb_submit_latency, "submit_latency", "Submit Latency"); | |
596 | plb.add_time_avg(l_rocksdb_submit_sync_latency, "submit_sync_latency", "Submit Sync Latency"); | |
597 | plb.add_u64_counter(l_rocksdb_compact, "compact", "Compactions"); | |
598 | plb.add_u64_counter(l_rocksdb_compact_range, "compact_range", "Compactions by range"); | |
599 | plb.add_u64_counter(l_rocksdb_compact_queue_merge, "compact_queue_merge", "Mergings of ranges in compaction queue"); | |
600 | plb.add_u64(l_rocksdb_compact_queue_len, "compact_queue_len", "Length of compaction queue"); | |
601 | plb.add_time_avg(l_rocksdb_write_wal_time, "rocksdb_write_wal_time", "Rocksdb write wal time"); | |
602 | plb.add_time_avg(l_rocksdb_write_memtable_time, "rocksdb_write_memtable_time", "Rocksdb write memtable time"); | |
603 | plb.add_time_avg(l_rocksdb_write_delay_time, "rocksdb_write_delay_time", "Rocksdb write delay time"); | |
604 | plb.add_time_avg(l_rocksdb_write_pre_and_post_process_time, | |
605 | "rocksdb_write_pre_and_post_time", "total time spent on writing a record, excluding write process"); | |
606 | logger = plb.create_perf_counters(); | |
607 | cct->get_perfcounters_collection()->add(logger); | |
608 | ||
609 | if (compact_on_mount) { | |
610 | derr << "Compacting rocksdb store..." << dendl; | |
611 | compact(); | |
612 | derr << "Finished compacting rocksdb store" << dendl; | |
613 | } | |
614 | return 0; | |
615 | } | |
616 | ||
617 | int RocksDBStore::_test_init(const string& dir) | |
618 | { | |
619 | rocksdb::Options options; | |
620 | options.create_if_missing = true; | |
621 | rocksdb::DB *db; | |
622 | rocksdb::Status status = rocksdb::DB::Open(options, dir, &db); | |
623 | delete db; | |
624 | db = nullptr; | |
625 | return status.ok() ? 0 : -EIO; | |
626 | } | |
627 | ||
628 | RocksDBStore::~RocksDBStore() | |
629 | { | |
630 | close(); | |
631 | delete logger; | |
632 | ||
633 | // Ensure db is destroyed before dependent db_cache and filterpolicy | |
11fdf7f2 TL |
634 | for (auto& p : cf_handles) { |
635 | db->DestroyColumnFamilyHandle( | |
636 | static_cast<rocksdb::ColumnFamilyHandle*>(p.second)); | |
637 | p.second = nullptr; | |
638 | } | |
639 | if (must_close_default_cf) { | |
640 | db->DestroyColumnFamilyHandle(default_cf); | |
641 | must_close_default_cf = false; | |
642 | } | |
643 | default_cf = nullptr; | |
7c673cae FG |
644 | delete db; |
645 | db = nullptr; | |
646 | ||
647 | if (priv) { | |
648 | delete static_cast<rocksdb::Env*>(priv); | |
649 | } | |
650 | } | |
651 | ||
652 | void RocksDBStore::close() | |
653 | { | |
654 | // stop compaction thread | |
655 | compact_queue_lock.Lock(); | |
656 | if (compact_thread.is_started()) { | |
92f5a8d4 | 657 | dout(1) << __func__ << " waiting for compaction thread to stop" << dendl; |
7c673cae FG |
658 | compact_queue_stop = true; |
659 | compact_queue_cond.Signal(); | |
660 | compact_queue_lock.Unlock(); | |
661 | compact_thread.join(); | |
92f5a8d4 | 662 | dout(1) << __func__ << " compaction thread to stopped" << dendl; |
7c673cae FG |
663 | } else { |
664 | compact_queue_lock.Unlock(); | |
665 | } | |
666 | ||
667 | if (logger) | |
668 | cct->get_perfcounters_collection()->remove(logger); | |
669 | } | |
670 | ||
11fdf7f2 TL |
671 | int RocksDBStore::repair(std::ostream &out) |
672 | { | |
673 | rocksdb::Options opt; | |
674 | int r = load_rocksdb_options(false, opt); | |
675 | if (r) { | |
676 | dout(1) << __func__ << " load rocksdb options failed" << dendl; | |
677 | out << "load rocksdb options failed" << std::endl; | |
678 | return r; | |
679 | } | |
680 | rocksdb::Status status = rocksdb::RepairDB(path, opt); | |
681 | if (status.ok()) { | |
682 | return 0; | |
683 | } else { | |
684 | out << "repair rocksdb failed : " << status.ToString() << std::endl; | |
685 | return 1; | |
686 | } | |
687 | } | |
688 | ||
7c673cae FG |
689 | void RocksDBStore::split_stats(const std::string &s, char delim, std::vector<std::string> &elems) { |
690 | std::stringstream ss; | |
691 | ss.str(s); | |
692 | std::string item; | |
693 | while (std::getline(ss, item, delim)) { | |
694 | elems.push_back(item); | |
695 | } | |
696 | } | |
697 | ||
11fdf7f2 TL |
698 | int64_t RocksDBStore::estimate_prefix_size(const string& prefix) |
699 | { | |
700 | auto cf = get_cf_handle(prefix); | |
701 | uint64_t size = 0; | |
702 | uint8_t flags = | |
703 | //rocksdb::DB::INCLUDE_MEMTABLES | // do not include memtables... | |
704 | rocksdb::DB::INCLUDE_FILES; | |
705 | if (cf) { | |
706 | string start(1, '\x00'); | |
707 | string limit("\xff\xff\xff\xff"); | |
708 | rocksdb::Range r(start, limit); | |
709 | db->GetApproximateSizes(cf, &r, 1, &size, flags); | |
710 | } else { | |
711 | string limit = prefix + "\xff\xff\xff\xff"; | |
712 | rocksdb::Range r(prefix, limit); | |
713 | db->GetApproximateSizes(default_cf, | |
714 | &r, 1, &size, flags); | |
715 | } | |
716 | return size; | |
717 | } | |
718 | ||
7c673cae FG |
719 | void RocksDBStore::get_statistics(Formatter *f) |
720 | { | |
11fdf7f2 TL |
721 | if (!g_conf()->rocksdb_perf) { |
722 | dout(20) << __func__ << " RocksDB perf is disabled, can't probe for stats" | |
7c673cae FG |
723 | << dendl; |
724 | return; | |
725 | } | |
726 | ||
11fdf7f2 | 727 | if (g_conf()->rocksdb_collect_compaction_stats) { |
7c673cae FG |
728 | std::string stat_str; |
729 | bool status = db->GetProperty("rocksdb.stats", &stat_str); | |
730 | if (status) { | |
731 | f->open_object_section("rocksdb_statistics"); | |
732 | f->dump_string("rocksdb_compaction_statistics", ""); | |
733 | vector<string> stats; | |
734 | split_stats(stat_str, '\n', stats); | |
735 | for (auto st :stats) { | |
736 | f->dump_string("", st); | |
737 | } | |
738 | f->close_section(); | |
739 | } | |
740 | } | |
11fdf7f2 | 741 | if (g_conf()->rocksdb_collect_extended_stats) { |
7c673cae FG |
742 | if (dbstats) { |
743 | f->open_object_section("rocksdb_extended_statistics"); | |
744 | string stat_str = dbstats->ToString(); | |
745 | vector<string> stats; | |
746 | split_stats(stat_str, '\n', stats); | |
747 | f->dump_string("rocksdb_extended_statistics", ""); | |
748 | for (auto st :stats) { | |
749 | f->dump_string(".", st); | |
750 | } | |
751 | f->close_section(); | |
752 | } | |
753 | f->open_object_section("rocksdbstore_perf_counters"); | |
754 | logger->dump_formatted(f,0); | |
755 | f->close_section(); | |
756 | } | |
11fdf7f2 | 757 | if (g_conf()->rocksdb_collect_memory_stats) { |
7c673cae | 758 | f->open_object_section("rocksdb_memtable_statistics"); |
11fdf7f2 TL |
759 | std::string str; |
760 | if (!bbt_opts.no_block_cache) { | |
761 | str.append(stringify(bbt_opts.block_cache->GetUsage())); | |
762 | f->dump_string("block_cache_usage", str.data()); | |
763 | str.clear(); | |
764 | str.append(stringify(bbt_opts.block_cache->GetPinnedUsage())); | |
765 | f->dump_string("block_cache_pinned_blocks_usage", str); | |
766 | str.clear(); | |
767 | } | |
7c673cae FG |
768 | db->GetProperty("rocksdb.cur-size-all-mem-tables", &str); |
769 | f->dump_string("rocksdb_memtable_usage", str); | |
11fdf7f2 TL |
770 | str.clear(); |
771 | db->GetProperty("rocksdb.estimate-table-readers-mem", &str); | |
772 | f->dump_string("rocksdb_index_filter_blocks_usage", str); | |
7c673cae FG |
773 | f->close_section(); |
774 | } | |
775 | } | |
776 | ||
11fdf7f2 | 777 | int RocksDBStore::submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t) |
7c673cae | 778 | { |
7c673cae FG |
779 | // enable rocksdb breakdown |
780 | // considering performance overhead, default is disabled | |
11fdf7f2 | 781 | if (g_conf()->rocksdb_perf) { |
7c673cae | 782 | rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex); |
11fdf7f2 | 783 | rocksdb::get_perf_context()->Reset(); |
7c673cae FG |
784 | } |
785 | ||
786 | RocksDBTransactionImpl * _t = | |
787 | static_cast<RocksDBTransactionImpl *>(t.get()); | |
7c673cae FG |
788 | woptions.disableWAL = disableWAL; |
789 | lgeneric_subdout(cct, rocksdb, 30) << __func__; | |
790 | RocksWBHandler bat_txc; | |
791 | _t->bat.Iterate(&bat_txc); | |
792 | *_dout << " Rocksdb transaction: " << bat_txc.seen << dendl; | |
793 | ||
794 | rocksdb::Status s = db->Write(woptions, &_t->bat); | |
795 | if (!s.ok()) { | |
796 | RocksWBHandler rocks_txc; | |
797 | _t->bat.Iterate(&rocks_txc); | |
798 | derr << __func__ << " error: " << s.ToString() << " code = " << s.code() | |
799 | << " Rocksdb transaction: " << rocks_txc.seen << dendl; | |
800 | } | |
7c673cae | 801 | |
11fdf7f2 | 802 | if (g_conf()->rocksdb_perf) { |
7c673cae FG |
803 | utime_t write_memtable_time; |
804 | utime_t write_delay_time; | |
805 | utime_t write_wal_time; | |
806 | utime_t write_pre_and_post_process_time; | |
807 | write_wal_time.set_from_double( | |
11fdf7f2 | 808 | static_cast<double>(rocksdb::get_perf_context()->write_wal_time)/1000000000); |
7c673cae | 809 | write_memtable_time.set_from_double( |
11fdf7f2 | 810 | static_cast<double>(rocksdb::get_perf_context()->write_memtable_time)/1000000000); |
7c673cae | 811 | write_delay_time.set_from_double( |
11fdf7f2 | 812 | static_cast<double>(rocksdb::get_perf_context()->write_delay_time)/1000000000); |
7c673cae | 813 | write_pre_and_post_process_time.set_from_double( |
11fdf7f2 | 814 | static_cast<double>(rocksdb::get_perf_context()->write_pre_and_post_process_time)/1000000000); |
7c673cae FG |
815 | logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time); |
816 | logger->tinc(l_rocksdb_write_delay_time, write_delay_time); | |
817 | logger->tinc(l_rocksdb_write_wal_time, write_wal_time); | |
818 | logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time); | |
819 | } | |
820 | ||
7c673cae FG |
821 | return s.ok() ? 0 : -1; |
822 | } | |
823 | ||
11fdf7f2 | 824 | int RocksDBStore::submit_transaction(KeyValueDB::Transaction t) |
7c673cae FG |
825 | { |
826 | utime_t start = ceph_clock_now(); | |
7c673cae | 827 | rocksdb::WriteOptions woptions; |
11fdf7f2 | 828 | woptions.sync = false; |
7c673cae | 829 | |
11fdf7f2 | 830 | int result = submit_common(woptions, t); |
7c673cae | 831 | |
11fdf7f2 TL |
832 | utime_t lat = ceph_clock_now() - start; |
833 | logger->inc(l_rocksdb_txns); | |
834 | logger->tinc(l_rocksdb_submit_latency, lat); | |
835 | ||
836 | return result; | |
837 | } | |
7c673cae | 838 | |
11fdf7f2 TL |
839 | int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t) |
840 | { | |
841 | utime_t start = ceph_clock_now(); | |
842 | rocksdb::WriteOptions woptions; | |
843 | // if disableWAL, sync can't set | |
844 | woptions.sync = !disableWAL; | |
845 | ||
846 | int result = submit_common(woptions, t); | |
847 | ||
848 | utime_t lat = ceph_clock_now() - start; | |
7c673cae FG |
849 | logger->inc(l_rocksdb_txns_sync); |
850 | logger->tinc(l_rocksdb_submit_sync_latency, lat); | |
851 | ||
11fdf7f2 | 852 | return result; |
7c673cae FG |
853 | } |
854 | ||
855 | RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db) | |
856 | { | |
857 | db = _db; | |
858 | } | |
859 | ||
11fdf7f2 TL |
860 | void RocksDBStore::RocksDBTransactionImpl::put_bat( |
861 | rocksdb::WriteBatch& bat, | |
862 | rocksdb::ColumnFamilyHandle *cf, | |
863 | const string &key, | |
7c673cae FG |
864 | const bufferlist &to_set_bl) |
865 | { | |
7c673cae FG |
866 | // bufferlist::c_str() is non-constant, so we can't call c_str() |
867 | if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) { | |
11fdf7f2 TL |
868 | bat.Put(cf, |
869 | rocksdb::Slice(key), | |
870 | rocksdb::Slice(to_set_bl.buffers().front().c_str(), | |
871 | to_set_bl.length())); | |
7c673cae | 872 | } else { |
31f18b77 | 873 | rocksdb::Slice key_slice(key); |
c07f9fc5 | 874 | vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size()); |
11fdf7f2 TL |
875 | bat.Put(cf, |
876 | rocksdb::SliceParts(&key_slice, 1), | |
c07f9fc5 | 877 | prepare_sliceparts(to_set_bl, &value_slices)); |
7c673cae FG |
878 | } |
879 | } | |
880 | ||
881 | void RocksDBStore::RocksDBTransactionImpl::set( | |
882 | const string &prefix, | |
11fdf7f2 | 883 | const string &k, |
7c673cae FG |
884 | const bufferlist &to_set_bl) |
885 | { | |
11fdf7f2 TL |
886 | auto cf = db->get_cf_handle(prefix); |
887 | if (cf) { | |
888 | put_bat(bat, cf, k, to_set_bl); | |
889 | } else { | |
890 | string key = combine_strings(prefix, k); | |
891 | put_bat(bat, db->default_cf, key, to_set_bl); | |
892 | } | |
893 | } | |
7c673cae | 894 | |
11fdf7f2 TL |
895 | void RocksDBStore::RocksDBTransactionImpl::set( |
896 | const string &prefix, | |
897 | const char *k, size_t keylen, | |
898 | const bufferlist &to_set_bl) | |
899 | { | |
900 | auto cf = db->get_cf_handle(prefix); | |
901 | if (cf) { | |
902 | string key(k, keylen); // fixme? | |
903 | put_bat(bat, cf, key, to_set_bl); | |
7c673cae | 904 | } else { |
11fdf7f2 TL |
905 | string key; |
906 | combine_strings(prefix, k, keylen, &key); | |
907 | put_bat(bat, cf, key, to_set_bl); | |
7c673cae FG |
908 | } |
909 | } | |
910 | ||
911 | void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix, | |
912 | const string &k) | |
913 | { | |
11fdf7f2 TL |
914 | auto cf = db->get_cf_handle(prefix); |
915 | if (cf) { | |
916 | bat.Delete(cf, rocksdb::Slice(k)); | |
917 | } else { | |
918 | bat.Delete(db->default_cf, combine_strings(prefix, k)); | |
919 | } | |
7c673cae FG |
920 | } |
921 | ||
922 | void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix, | |
923 | const char *k, | |
924 | size_t keylen) | |
925 | { | |
11fdf7f2 TL |
926 | auto cf = db->get_cf_handle(prefix); |
927 | if (cf) { | |
928 | bat.Delete(cf, rocksdb::Slice(k, keylen)); | |
929 | } else { | |
930 | string key; | |
931 | combine_strings(prefix, k, keylen, &key); | |
932 | bat.Delete(db->default_cf, rocksdb::Slice(key)); | |
933 | } | |
7c673cae FG |
934 | } |
935 | ||
936 | void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix, | |
937 | const string &k) | |
938 | { | |
11fdf7f2 TL |
939 | auto cf = db->get_cf_handle(prefix); |
940 | if (cf) { | |
941 | bat.SingleDelete(cf, k); | |
942 | } else { | |
943 | bat.SingleDelete(db->default_cf, combine_strings(prefix, k)); | |
944 | } | |
7c673cae FG |
945 | } |
946 | ||
947 | void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix) | |
948 | { | |
11fdf7f2 TL |
949 | auto cf = db->get_cf_handle(prefix); |
950 | if (cf) { | |
951 | if (db->enable_rmrange) { | |
952 | string endprefix("\xff\xff\xff\xff"); // FIXME: this is cheating... | |
494da23a TL |
953 | if (db->max_items_rmrange) { |
954 | uint64_t cnt = db->max_items_rmrange; | |
955 | bat.SetSavePoint(); | |
956 | auto it = db->get_iterator(prefix); | |
957 | for (it->seek_to_first(); | |
958 | it->valid(); | |
959 | it->next()) { | |
960 | if (!cnt) { | |
961 | bat.RollbackToSavePoint(); | |
962 | bat.DeleteRange(cf, string(), endprefix); | |
963 | return; | |
964 | } | |
965 | bat.Delete(cf, rocksdb::Slice(it->key())); | |
966 | --cnt; | |
967 | } | |
968 | bat.PopSavePoint(); | |
969 | } else { | |
970 | bat.DeleteRange(cf, string(), endprefix); | |
971 | } | |
11fdf7f2 TL |
972 | } else { |
973 | auto it = db->get_iterator(prefix); | |
974 | for (it->seek_to_first(); | |
975 | it->valid(); | |
976 | it->next()) { | |
977 | bat.Delete(cf, rocksdb::Slice(it->key())); | |
978 | } | |
979 | } | |
31f18b77 | 980 | } else { |
11fdf7f2 TL |
981 | if (db->enable_rmrange) { |
982 | string endprefix = prefix; | |
983 | endprefix.push_back('\x01'); | |
494da23a TL |
984 | if (db->max_items_rmrange) { |
985 | uint64_t cnt = db->max_items_rmrange; | |
986 | bat.SetSavePoint(); | |
987 | auto it = db->get_iterator(prefix); | |
988 | for (it->seek_to_first(); | |
989 | it->valid(); | |
990 | it->next()) { | |
991 | if (!cnt) { | |
992 | bat.RollbackToSavePoint(); | |
993 | bat.DeleteRange(db->default_cf, | |
994 | combine_strings(prefix, string()), | |
995 | combine_strings(endprefix, string())); | |
996 | return; | |
997 | } | |
998 | bat.Delete(db->default_cf, combine_strings(prefix, it->key())); | |
999 | --cnt; | |
1000 | } | |
1001 | bat.PopSavePoint(); | |
1002 | } else { | |
1003 | bat.DeleteRange(db->default_cf, | |
1004 | combine_strings(prefix, string()), | |
1005 | combine_strings(endprefix, string())); | |
1006 | } | |
11fdf7f2 TL |
1007 | } else { |
1008 | auto it = db->get_iterator(prefix); | |
1009 | for (it->seek_to_first(); | |
1010 | it->valid(); | |
1011 | it->next()) { | |
1012 | bat.Delete(db->default_cf, combine_strings(prefix, it->key())); | |
1013 | } | |
31f18b77 | 1014 | } |
7c673cae FG |
1015 | } |
1016 | } | |
1017 | ||
1018 | void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix, | |
1019 | const string &start, | |
1020 | const string &end) | |
1021 | { | |
11fdf7f2 TL |
1022 | auto cf = db->get_cf_handle(prefix); |
1023 | if (cf) { | |
1024 | if (db->enable_rmrange) { | |
494da23a TL |
1025 | if (db->max_items_rmrange) { |
1026 | uint64_t cnt = db->max_items_rmrange; | |
1027 | auto it = db->get_iterator(prefix); | |
1028 | bat.SetSavePoint(); | |
1029 | it->lower_bound(start); | |
1030 | while (it->valid()) { | |
1031 | if (it->key() >= end) { | |
1032 | break; | |
1033 | } | |
1034 | if (!cnt) { | |
1035 | bat.RollbackToSavePoint(); | |
1036 | bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end)); | |
1037 | return; | |
1038 | } | |
1039 | bat.Delete(cf, rocksdb::Slice(it->key())); | |
1040 | it->next(); | |
1041 | --cnt; | |
1042 | } | |
1043 | bat.PopSavePoint(); | |
1044 | } else { | |
1045 | bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end)); | |
1046 | } | |
11fdf7f2 TL |
1047 | } else { |
1048 | auto it = db->get_iterator(prefix); | |
1049 | it->lower_bound(start); | |
1050 | while (it->valid()) { | |
1051 | if (it->key() >= end) { | |
1052 | break; | |
1053 | } | |
1054 | bat.Delete(cf, rocksdb::Slice(it->key())); | |
1055 | it->next(); | |
1056 | } | |
1057 | } | |
7c673cae | 1058 | } else { |
11fdf7f2 | 1059 | if (db->enable_rmrange) { |
494da23a TL |
1060 | if (db->max_items_rmrange) { |
1061 | uint64_t cnt = db->max_items_rmrange; | |
1062 | auto it = db->get_iterator(prefix); | |
1063 | bat.SetSavePoint(); | |
1064 | it->lower_bound(start); | |
1065 | while (it->valid()) { | |
1066 | if (it->key() >= end) { | |
1067 | break; | |
1068 | } | |
1069 | if (!cnt) { | |
1070 | bat.RollbackToSavePoint(); | |
1071 | bat.DeleteRange( | |
1072 | db->default_cf, | |
1073 | rocksdb::Slice(combine_strings(prefix, start)), | |
1074 | rocksdb::Slice(combine_strings(prefix, end))); | |
1075 | return; | |
1076 | } | |
1077 | bat.Delete(db->default_cf, | |
1078 | combine_strings(prefix, it->key())); | |
1079 | it->next(); | |
1080 | --cnt; | |
1081 | } | |
1082 | bat.PopSavePoint(); | |
1083 | } else { | |
1084 | bat.DeleteRange( | |
1085 | db->default_cf, | |
1086 | rocksdb::Slice(combine_strings(prefix, start)), | |
1087 | rocksdb::Slice(combine_strings(prefix, end))); | |
1088 | } | |
11fdf7f2 TL |
1089 | } else { |
1090 | auto it = db->get_iterator(prefix); | |
1091 | it->lower_bound(start); | |
1092 | while (it->valid()) { | |
1093 | if (it->key() >= end) { | |
1094 | break; | |
1095 | } | |
1096 | bat.Delete(db->default_cf, | |
1097 | combine_strings(prefix, it->key())); | |
1098 | it->next(); | |
7c673cae | 1099 | } |
7c673cae FG |
1100 | } |
1101 | } | |
1102 | } | |
1103 | ||
1104 | void RocksDBStore::RocksDBTransactionImpl::merge( | |
1105 | const string &prefix, | |
1106 | const string &k, | |
1107 | const bufferlist &to_set_bl) | |
1108 | { | |
11fdf7f2 TL |
1109 | auto cf = db->get_cf_handle(prefix); |
1110 | if (cf) { | |
1111 | // bufferlist::c_str() is non-constant, so we can't call c_str() | |
1112 | if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) { | |
1113 | bat.Merge( | |
1114 | cf, | |
1115 | rocksdb::Slice(k), | |
1116 | rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length())); | |
1117 | } else { | |
1118 | // make a copy | |
1119 | rocksdb::Slice key_slice(k); | |
1120 | vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size()); | |
1121 | bat.Merge(cf, rocksdb::SliceParts(&key_slice, 1), | |
1122 | prepare_sliceparts(to_set_bl, &value_slices)); | |
1123 | } | |
7c673cae | 1124 | } else { |
11fdf7f2 TL |
1125 | string key = combine_strings(prefix, k); |
1126 | // bufferlist::c_str() is non-constant, so we can't call c_str() | |
1127 | if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) { | |
1128 | bat.Merge( | |
1129 | db->default_cf, | |
1130 | rocksdb::Slice(key), | |
1131 | rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length())); | |
1132 | } else { | |
1133 | // make a copy | |
1134 | rocksdb::Slice key_slice(key); | |
1135 | vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size()); | |
1136 | bat.Merge( | |
1137 | db->default_cf, | |
1138 | rocksdb::SliceParts(&key_slice, 1), | |
1139 | prepare_sliceparts(to_set_bl, &value_slices)); | |
1140 | } | |
7c673cae FG |
1141 | } |
1142 | } | |
1143 | ||
7c673cae FG |
1144 | int RocksDBStore::get( |
1145 | const string &prefix, | |
1146 | const std::set<string> &keys, | |
1147 | std::map<string, bufferlist> *out) | |
1148 | { | |
1149 | utime_t start = ceph_clock_now(); | |
11fdf7f2 TL |
1150 | auto cf = get_cf_handle(prefix); |
1151 | if (cf) { | |
1152 | for (auto& key : keys) { | |
1153 | std::string value; | |
1154 | auto status = db->Get(rocksdb::ReadOptions(), | |
1155 | cf, | |
1156 | rocksdb::Slice(key), | |
1157 | &value); | |
1158 | if (status.ok()) { | |
1159 | (*out)[key].append(value); | |
1160 | } else if (status.IsIOError()) { | |
1161 | ceph_abort_msg(status.getState()); | |
1162 | } | |
1163 | } | |
1164 | } else { | |
1165 | for (auto& key : keys) { | |
1166 | std::string value; | |
1167 | string k = combine_strings(prefix, key); | |
1168 | auto status = db->Get(rocksdb::ReadOptions(), | |
1169 | default_cf, | |
1170 | rocksdb::Slice(k), | |
1171 | &value); | |
1172 | if (status.ok()) { | |
1173 | (*out)[key].append(value); | |
1174 | } else if (status.IsIOError()) { | |
1175 | ceph_abort_msg(status.getState()); | |
1176 | } | |
224ce89b | 1177 | } |
7c673cae FG |
1178 | } |
1179 | utime_t lat = ceph_clock_now() - start; | |
1180 | logger->inc(l_rocksdb_gets); | |
1181 | logger->tinc(l_rocksdb_get_latency, lat); | |
1182 | return 0; | |
1183 | } | |
1184 | ||
1185 | int RocksDBStore::get( | |
1186 | const string &prefix, | |
1187 | const string &key, | |
1188 | bufferlist *out) | |
1189 | { | |
11fdf7f2 | 1190 | ceph_assert(out && (out->length() == 0)); |
7c673cae FG |
1191 | utime_t start = ceph_clock_now(); |
1192 | int r = 0; | |
11fdf7f2 | 1193 | string value; |
7c673cae | 1194 | rocksdb::Status s; |
11fdf7f2 TL |
1195 | auto cf = get_cf_handle(prefix); |
1196 | if (cf) { | |
1197 | s = db->Get(rocksdb::ReadOptions(), | |
1198 | cf, | |
1199 | rocksdb::Slice(key), | |
1200 | &value); | |
1201 | } else { | |
1202 | string k = combine_strings(prefix, key); | |
1203 | s = db->Get(rocksdb::ReadOptions(), | |
1204 | default_cf, | |
1205 | rocksdb::Slice(k), | |
1206 | &value); | |
1207 | } | |
7c673cae FG |
1208 | if (s.ok()) { |
1209 | out->append(value); | |
224ce89b | 1210 | } else if (s.IsNotFound()) { |
7c673cae | 1211 | r = -ENOENT; |
224ce89b | 1212 | } else { |
11fdf7f2 | 1213 | ceph_abort_msg(s.getState()); |
7c673cae FG |
1214 | } |
1215 | utime_t lat = ceph_clock_now() - start; | |
1216 | logger->inc(l_rocksdb_gets); | |
1217 | logger->tinc(l_rocksdb_get_latency, lat); | |
1218 | return r; | |
1219 | } | |
1220 | ||
1221 | int RocksDBStore::get( | |
1222 | const string& prefix, | |
1223 | const char *key, | |
1224 | size_t keylen, | |
1225 | bufferlist *out) | |
1226 | { | |
11fdf7f2 | 1227 | ceph_assert(out && (out->length() == 0)); |
7c673cae FG |
1228 | utime_t start = ceph_clock_now(); |
1229 | int r = 0; | |
11fdf7f2 | 1230 | string value; |
7c673cae | 1231 | rocksdb::Status s; |
11fdf7f2 TL |
1232 | auto cf = get_cf_handle(prefix); |
1233 | if (cf) { | |
1234 | s = db->Get(rocksdb::ReadOptions(), | |
1235 | cf, | |
1236 | rocksdb::Slice(key, keylen), | |
1237 | &value); | |
1238 | } else { | |
1239 | string k; | |
1240 | combine_strings(prefix, key, keylen, &k); | |
1241 | s = db->Get(rocksdb::ReadOptions(), | |
1242 | default_cf, | |
1243 | rocksdb::Slice(k), | |
1244 | &value); | |
1245 | } | |
7c673cae FG |
1246 | if (s.ok()) { |
1247 | out->append(value); | |
224ce89b | 1248 | } else if (s.IsNotFound()) { |
7c673cae | 1249 | r = -ENOENT; |
224ce89b | 1250 | } else { |
11fdf7f2 | 1251 | ceph_abort_msg(s.getState()); |
7c673cae FG |
1252 | } |
1253 | utime_t lat = ceph_clock_now() - start; | |
1254 | logger->inc(l_rocksdb_gets); | |
1255 | logger->tinc(l_rocksdb_get_latency, lat); | |
1256 | return r; | |
1257 | } | |
1258 | ||
1259 | int RocksDBStore::split_key(rocksdb::Slice in, string *prefix, string *key) | |
1260 | { | |
1261 | size_t prefix_len = 0; | |
1262 | ||
1263 | // Find separator inside Slice | |
1264 | char* separator = (char*) memchr(in.data(), 0, in.size()); | |
1265 | if (separator == NULL) | |
1266 | return -EINVAL; | |
1267 | prefix_len = size_t(separator - in.data()); | |
1268 | if (prefix_len >= in.size()) | |
1269 | return -EINVAL; | |
1270 | ||
1271 | // Fetch prefix and/or key directly from Slice | |
1272 | if (prefix) | |
1273 | *prefix = string(in.data(), prefix_len); | |
1274 | if (key) | |
1275 | *key = string(separator+1, in.size()-prefix_len-1); | |
1276 | return 0; | |
1277 | } | |
1278 | ||
1279 | void RocksDBStore::compact() | |
1280 | { | |
1281 | logger->inc(l_rocksdb_compact); | |
1282 | rocksdb::CompactRangeOptions options; | |
11fdf7f2 TL |
1283 | db->CompactRange(options, default_cf, nullptr, nullptr); |
1284 | for (auto cf : cf_handles) { | |
1285 | db->CompactRange( | |
1286 | options, | |
1287 | static_cast<rocksdb::ColumnFamilyHandle*>(cf.second), | |
1288 | nullptr, nullptr); | |
1289 | } | |
7c673cae FG |
1290 | } |
1291 | ||
1292 | ||
1293 | void RocksDBStore::compact_thread_entry() | |
1294 | { | |
1295 | compact_queue_lock.Lock(); | |
92f5a8d4 | 1296 | dout(10) << __func__ << " enter" << dendl; |
7c673cae | 1297 | while (!compact_queue_stop) { |
92f5a8d4 | 1298 | if (!compact_queue.empty()) { |
7c673cae FG |
1299 | pair<string,string> range = compact_queue.front(); |
1300 | compact_queue.pop_front(); | |
1301 | logger->set(l_rocksdb_compact_queue_len, compact_queue.size()); | |
1302 | compact_queue_lock.Unlock(); | |
1303 | logger->inc(l_rocksdb_compact_range); | |
11fdf7f2 TL |
1304 | if (range.first.empty() && range.second.empty()) { |
1305 | compact(); | |
1306 | } else { | |
1307 | compact_range(range.first, range.second); | |
1308 | } | |
7c673cae FG |
1309 | compact_queue_lock.Lock(); |
1310 | continue; | |
1311 | } | |
92f5a8d4 | 1312 | dout(10) << __func__ << " waiting" << dendl; |
7c673cae FG |
1313 | compact_queue_cond.Wait(compact_queue_lock); |
1314 | } | |
1315 | compact_queue_lock.Unlock(); | |
1316 | } | |
1317 | ||
1318 | void RocksDBStore::compact_range_async(const string& start, const string& end) | |
1319 | { | |
11fdf7f2 | 1320 | std::lock_guard l(compact_queue_lock); |
7c673cae FG |
1321 | |
1322 | // try to merge adjacent ranges. this is O(n), but the queue should | |
1323 | // be short. note that we do not cover all overlap cases and merge | |
1324 | // opportunities here, but we capture the ones we currently need. | |
1325 | list< pair<string,string> >::iterator p = compact_queue.begin(); | |
1326 | while (p != compact_queue.end()) { | |
1327 | if (p->first == start && p->second == end) { | |
1328 | // dup; no-op | |
1329 | return; | |
1330 | } | |
1331 | if (p->first <= end && p->first > start) { | |
1332 | // merge with existing range to the right | |
1333 | compact_queue.push_back(make_pair(start, p->second)); | |
1334 | compact_queue.erase(p); | |
1335 | logger->inc(l_rocksdb_compact_queue_merge); | |
1336 | break; | |
1337 | } | |
1338 | if (p->second >= start && p->second < end) { | |
1339 | // merge with existing range to the left | |
1340 | compact_queue.push_back(make_pair(p->first, end)); | |
1341 | compact_queue.erase(p); | |
1342 | logger->inc(l_rocksdb_compact_queue_merge); | |
1343 | break; | |
1344 | } | |
1345 | ++p; | |
1346 | } | |
1347 | if (p == compact_queue.end()) { | |
1348 | // no merge, new entry. | |
1349 | compact_queue.push_back(make_pair(start, end)); | |
1350 | logger->set(l_rocksdb_compact_queue_len, compact_queue.size()); | |
1351 | } | |
1352 | compact_queue_cond.Signal(); | |
1353 | if (!compact_thread.is_started()) { | |
1354 | compact_thread.create("rstore_compact"); | |
1355 | } | |
1356 | } | |
1357 | bool RocksDBStore::check_omap_dir(string &omap_dir) | |
1358 | { | |
1359 | rocksdb::Options options; | |
1360 | options.create_if_missing = true; | |
1361 | rocksdb::DB *db; | |
1362 | rocksdb::Status status = rocksdb::DB::Open(options, omap_dir, &db); | |
1363 | delete db; | |
1364 | db = nullptr; | |
1365 | return status.ok(); | |
1366 | } | |
1367 | void RocksDBStore::compact_range(const string& start, const string& end) | |
1368 | { | |
1369 | rocksdb::CompactRangeOptions options; | |
1370 | rocksdb::Slice cstart(start); | |
1371 | rocksdb::Slice cend(end); | |
1372 | db->CompactRange(options, &cstart, &cend); | |
1373 | } | |
91327a77 | 1374 | |
7c673cae FG |
1375 | RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl() |
1376 | { | |
1377 | delete dbiter; | |
1378 | } | |
1379 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first() | |
1380 | { | |
1381 | dbiter->SeekToFirst(); | |
11fdf7f2 | 1382 | ceph_assert(!dbiter->status().IsIOError()); |
7c673cae FG |
1383 | return dbiter->status().ok() ? 0 : -1; |
1384 | } | |
1385 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string &prefix) | |
1386 | { | |
1387 | rocksdb::Slice slice_prefix(prefix); | |
1388 | dbiter->Seek(slice_prefix); | |
11fdf7f2 | 1389 | ceph_assert(!dbiter->status().IsIOError()); |
7c673cae FG |
1390 | return dbiter->status().ok() ? 0 : -1; |
1391 | } | |
1392 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last() | |
1393 | { | |
1394 | dbiter->SeekToLast(); | |
11fdf7f2 | 1395 | ceph_assert(!dbiter->status().IsIOError()); |
7c673cae FG |
1396 | return dbiter->status().ok() ? 0 : -1; |
1397 | } | |
1398 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string &prefix) | |
1399 | { | |
1400 | string limit = past_prefix(prefix); | |
1401 | rocksdb::Slice slice_limit(limit); | |
1402 | dbiter->Seek(slice_limit); | |
1403 | ||
1404 | if (!dbiter->Valid()) { | |
1405 | dbiter->SeekToLast(); | |
1406 | } else { | |
1407 | dbiter->Prev(); | |
1408 | } | |
1409 | return dbiter->status().ok() ? 0 : -1; | |
1410 | } | |
1411 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after) | |
1412 | { | |
1413 | lower_bound(prefix, after); | |
1414 | if (valid()) { | |
1415 | pair<string,string> key = raw_key(); | |
1416 | if (key.first == prefix && key.second == after) | |
1417 | next(); | |
1418 | } | |
1419 | return dbiter->status().ok() ? 0 : -1; | |
1420 | } | |
1421 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to) | |
1422 | { | |
1423 | string bound = combine_strings(prefix, to); | |
1424 | rocksdb::Slice slice_bound(bound); | |
1425 | dbiter->Seek(slice_bound); | |
1426 | return dbiter->status().ok() ? 0 : -1; | |
1427 | } | |
1428 | bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid() | |
1429 | { | |
1430 | return dbiter->Valid(); | |
1431 | } | |
1432 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next() | |
1433 | { | |
1434 | if (valid()) { | |
1435 | dbiter->Next(); | |
1436 | } | |
11fdf7f2 | 1437 | ceph_assert(!dbiter->status().IsIOError()); |
7c673cae FG |
1438 | return dbiter->status().ok() ? 0 : -1; |
1439 | } | |
1440 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev() | |
1441 | { | |
1442 | if (valid()) { | |
1443 | dbiter->Prev(); | |
1444 | } | |
11fdf7f2 | 1445 | ceph_assert(!dbiter->status().IsIOError()); |
7c673cae FG |
1446 | return dbiter->status().ok() ? 0 : -1; |
1447 | } | |
1448 | string RocksDBStore::RocksDBWholeSpaceIteratorImpl::key() | |
1449 | { | |
1450 | string out_key; | |
1451 | split_key(dbiter->key(), 0, &out_key); | |
1452 | return out_key; | |
1453 | } | |
1454 | pair<string,string> RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key() | |
1455 | { | |
1456 | string prefix, key; | |
1457 | split_key(dbiter->key(), &prefix, &key); | |
1458 | return make_pair(prefix, key); | |
1459 | } | |
1460 | ||
1461 | bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key_is_prefixed(const string &prefix) { | |
1462 | // Look for "prefix\0" right in rocksb::Slice | |
1463 | rocksdb::Slice key = dbiter->key(); | |
1464 | if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) { | |
1465 | return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0; | |
1466 | } else { | |
1467 | return false; | |
1468 | } | |
1469 | } | |
1470 | ||
1471 | bufferlist RocksDBStore::RocksDBWholeSpaceIteratorImpl::value() | |
1472 | { | |
1473 | return to_bufferlist(dbiter->value()); | |
1474 | } | |
1475 | ||
1476 | size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::key_size() | |
1477 | { | |
1478 | return dbiter->key().size(); | |
1479 | } | |
1480 | ||
1481 | size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_size() | |
1482 | { | |
1483 | return dbiter->value().size(); | |
1484 | } | |
1485 | ||
1486 | bufferptr RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_as_ptr() | |
1487 | { | |
1488 | rocksdb::Slice val = dbiter->value(); | |
1489 | return bufferptr(val.data(), val.size()); | |
1490 | } | |
1491 | ||
1492 | int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status() | |
1493 | { | |
1494 | return dbiter->status().ok() ? 0 : -1; | |
1495 | } | |
1496 | ||
1497 | string RocksDBStore::past_prefix(const string &prefix) | |
1498 | { | |
1499 | string limit = prefix; | |
1500 | limit.push_back(1); | |
1501 | return limit; | |
1502 | } | |
1503 | ||
11fdf7f2 | 1504 | RocksDBStore::WholeSpaceIterator RocksDBStore::get_wholespace_iterator() |
7c673cae FG |
1505 | { |
1506 | return std::make_shared<RocksDBWholeSpaceIteratorImpl>( | |
11fdf7f2 | 1507 | db->NewIterator(rocksdb::ReadOptions(), default_cf)); |
7c673cae FG |
1508 | } |
1509 | ||
11fdf7f2 TL |
1510 | class CFIteratorImpl : public KeyValueDB::IteratorImpl { |
1511 | protected: | |
1512 | string prefix; | |
1513 | rocksdb::Iterator *dbiter; | |
1514 | public: | |
1515 | explicit CFIteratorImpl(const std::string& p, | |
1516 | rocksdb::Iterator *iter) | |
1517 | : prefix(p), dbiter(iter) { } | |
1518 | ~CFIteratorImpl() { | |
1519 | delete dbiter; | |
1520 | } | |
1521 | ||
1522 | int seek_to_first() override { | |
1523 | dbiter->SeekToFirst(); | |
1524 | return dbiter->status().ok() ? 0 : -1; | |
1525 | } | |
1526 | int seek_to_last() override { | |
1527 | dbiter->SeekToLast(); | |
1528 | return dbiter->status().ok() ? 0 : -1; | |
1529 | } | |
1530 | int upper_bound(const string &after) override { | |
1531 | lower_bound(after); | |
1532 | if (valid() && (key() == after)) { | |
1533 | next(); | |
1534 | } | |
1535 | return dbiter->status().ok() ? 0 : -1; | |
1536 | } | |
1537 | int lower_bound(const string &to) override { | |
1538 | rocksdb::Slice slice_bound(to); | |
1539 | dbiter->Seek(slice_bound); | |
1540 | return dbiter->status().ok() ? 0 : -1; | |
1541 | } | |
1542 | int next() override { | |
1543 | if (valid()) { | |
1544 | dbiter->Next(); | |
1545 | } | |
1546 | return dbiter->status().ok() ? 0 : -1; | |
1547 | } | |
1548 | int prev() override { | |
1549 | if (valid()) { | |
1550 | dbiter->Prev(); | |
1551 | } | |
1552 | return dbiter->status().ok() ? 0 : -1; | |
1553 | } | |
1554 | bool valid() override { | |
1555 | return dbiter->Valid(); | |
1556 | } | |
1557 | string key() override { | |
1558 | return dbiter->key().ToString(); | |
1559 | } | |
1560 | std::pair<std::string, std::string> raw_key() override { | |
1561 | return make_pair(prefix, key()); | |
1562 | } | |
1563 | bufferlist value() override { | |
1564 | return to_bufferlist(dbiter->value()); | |
1565 | } | |
1566 | bufferptr value_as_ptr() override { | |
1567 | rocksdb::Slice val = dbiter->value(); | |
1568 | return bufferptr(val.data(), val.size()); | |
1569 | } | |
1570 | int status() override { | |
1571 | return dbiter->status().ok() ? 0 : -1; | |
1572 | } | |
1573 | }; | |
1574 | ||
1575 | KeyValueDB::Iterator RocksDBStore::get_iterator(const std::string& prefix) | |
1576 | { | |
1577 | rocksdb::ColumnFamilyHandle *cf_handle = | |
1578 | static_cast<rocksdb::ColumnFamilyHandle*>(get_cf_handle(prefix)); | |
1579 | if (cf_handle) { | |
1580 | return std::make_shared<CFIteratorImpl>( | |
1581 | prefix, | |
1582 | db->NewIterator(rocksdb::ReadOptions(), cf_handle)); | |
1583 | } else { | |
1584 | return KeyValueDB::get_iterator(prefix); | |
1585 | } | |
1586 | } |