]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | #ifndef ROCKS_DB_STORE_H | |
4 | #define ROCKS_DB_STORE_H | |
5 | ||
6 | #include "include/types.h" | |
7 | #include "include/buffer_fwd.h" | |
8 | #include "KeyValueDB.h" | |
9 | #include <set> | |
10 | #include <map> | |
11 | #include <string> | |
12 | #include <memory> | |
13 | #include <boost/scoped_ptr.hpp> | |
14 | #include "rocksdb/write_batch.h" | |
15 | #include "rocksdb/perf_context.h" | |
16 | #include "rocksdb/iostats_context.h" | |
17 | #include "rocksdb/statistics.h" | |
18 | #include "rocksdb/table.h" | |
f67539c2 | 19 | #include "rocksdb/db.h" |
11fdf7f2 | 20 | #include "kv/rocksdb_cache/BinnedLRUCache.h" |
7c673cae FG |
21 | #include <errno.h> |
22 | #include "common/errno.h" | |
23 | #include "common/dout.h" | |
11fdf7f2 | 24 | #include "include/ceph_assert.h" |
9f95a23c | 25 | #include "include/common_fwd.h" |
7c673cae FG |
26 | #include "common/Formatter.h" |
27 | #include "common/Cond.h" | |
7c673cae | 28 | #include "common/ceph_context.h" |
91327a77 | 29 | #include "common/PriorityCache.h" |
f67539c2 | 30 | #include "common/pretty_binary.h" |
7c673cae FG |
31 | |
32 | enum { | |
33 | l_rocksdb_first = 34300, | |
34 | l_rocksdb_gets, | |
7c673cae FG |
35 | l_rocksdb_get_latency, |
36 | l_rocksdb_submit_latency, | |
37 | l_rocksdb_submit_sync_latency, | |
38 | l_rocksdb_compact, | |
39 | l_rocksdb_compact_range, | |
40 | l_rocksdb_compact_queue_merge, | |
41 | l_rocksdb_compact_queue_len, | |
42 | l_rocksdb_write_wal_time, | |
43 | l_rocksdb_write_memtable_time, | |
44 | l_rocksdb_write_delay_time, | |
45 | l_rocksdb_write_pre_and_post_process_time, | |
46 | l_rocksdb_last, | |
47 | }; | |
48 | ||
49 | namespace rocksdb{ | |
50 | class DB; | |
51 | class Env; | |
52 | class Cache; | |
53 | class FilterPolicy; | |
54 | class Snapshot; | |
55 | class Slice; | |
56 | class WriteBatch; | |
57 | class Iterator; | |
58 | class Logger; | |
11fdf7f2 | 59 | class ColumnFamilyHandle; |
7c673cae FG |
60 | struct Options; |
61 | struct BlockBasedTableOptions; | |
11fdf7f2 TL |
62 | struct DBOptions; |
63 | struct ColumnFamilyOptions; | |
7c673cae FG |
64 | } |
65 | ||
66 | extern rocksdb::Logger *create_rocksdb_ceph_logger(); | |
67 | ||
68 | /** | |
69 | * Uses RocksDB to implement the KeyValueDB interface | |
70 | */ | |
71 | class RocksDBStore : public KeyValueDB { | |
72 | CephContext *cct; | |
73 | PerfCounters *logger; | |
f67539c2 TL |
74 | std::string path; |
75 | std::map<std::string,std::string> kv_options; | |
7c673cae FG |
76 | void *priv; |
77 | rocksdb::DB *db; | |
78 | rocksdb::Env *env; | |
f67539c2 | 79 | const rocksdb::Comparator* comparator; |
7c673cae FG |
80 | std::shared_ptr<rocksdb::Statistics> dbstats; |
81 | rocksdb::BlockBasedTableOptions bbt_opts; | |
f67539c2 | 82 | std::string options_str; |
7c673cae | 83 | |
31f18b77 | 84 | uint64_t cache_size = 0; |
224ce89b | 85 | bool set_cache_flag = false; |
f67539c2 TL |
86 | friend class ShardMergeIteratorImpl; |
87 | friend class WholeMergeIteratorImpl; | |
88 | /* | |
89 | * See RocksDB's definition of a column family(CF) and how to use it. | |
90 | * The interfaces of KeyValueDB is extended, when a column family is created. | |
91 | * Prefix will be the name of column family to use. | |
92 | */ | |
93 | public: | |
94 | struct ColumnFamily { | |
95 | string name; //< name of this individual column family | |
96 | size_t shard_cnt; //< count of shards | |
97 | string options; //< configure option string for this CF | |
98 | uint32_t hash_l; //< first character to take for hash calc. | |
99 | uint32_t hash_h; //< last character to take for hash calc. | |
100 | ColumnFamily(const string &name, size_t shard_cnt, const string &options, | |
101 | uint32_t hash_l, uint32_t hash_h) | |
102 | : name(name), shard_cnt(shard_cnt), options(options), hash_l(hash_l), hash_h(hash_h) {} | |
103 | }; | |
104 | private: | |
105 | friend std::ostream& operator<<(std::ostream& out, const ColumnFamily& cf); | |
31f18b77 | 106 | |
11fdf7f2 TL |
107 | bool must_close_default_cf = false; |
108 | rocksdb::ColumnFamilyHandle *default_cf = nullptr; | |
109 | ||
f67539c2 TL |
110 | /// column families in use, name->handles |
111 | struct prefix_shards { | |
112 | uint32_t hash_l; //< first character to take for hash calc. | |
113 | uint32_t hash_h; //< last character to take for hash calc. | |
114 | std::vector<rocksdb::ColumnFamilyHandle *> handles; | |
115 | }; | |
116 | std::unordered_map<std::string, prefix_shards> cf_handles; | |
117 | std::unordered_map<uint32_t, std::string> cf_ids_to_prefix; | |
118 | std::unordered_map<std::string, rocksdb::BlockBasedTableOptions> cf_bbt_opts; | |
119 | ||
120 | void add_column_family(const std::string& cf_name, uint32_t hash_l, uint32_t hash_h, | |
121 | size_t shard_idx, rocksdb::ColumnFamilyHandle *handle); | |
122 | bool is_column_family(const std::string& prefix); | |
123 | rocksdb::ColumnFamilyHandle *get_cf_handle(const std::string& prefix, const std::string& key); | |
124 | rocksdb::ColumnFamilyHandle *get_cf_handle(const std::string& prefix, const char* key, size_t keylen); | |
125 | ||
11fdf7f2 | 126 | int submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t); |
f67539c2 | 127 | int install_cf_mergeop(const std::string &cf_name, rocksdb::ColumnFamilyOptions *cf_opt); |
11fdf7f2 | 128 | int create_db_dir(); |
f67539c2 TL |
129 | int do_open(std::ostream &out, bool create_if_missing, bool open_readonly, |
130 | const std::string& cfs=""); | |
11fdf7f2 | 131 | int load_rocksdb_options(bool create_if_missing, rocksdb::Options& opt); |
f67539c2 TL |
132 | public: |
133 | static bool parse_sharding_def(const std::string_view text_def, | |
134 | std::vector<ColumnFamily>& sharding_def, | |
135 | char const* *error_position = nullptr, | |
136 | std::string *error_msg = nullptr); | |
137 | const rocksdb::Comparator* get_comparator() const { | |
138 | return comparator; | |
139 | } | |
7c673cae | 140 | |
f67539c2 TL |
141 | private: |
142 | static void sharding_def_to_columns(const std::vector<ColumnFamily>& sharding_def, | |
143 | std::vector<std::string>& columns); | |
144 | int create_shards(const rocksdb::Options& opt, | |
145 | const vector<ColumnFamily>& sharding_def); | |
146 | int apply_sharding(const rocksdb::Options& opt, | |
147 | const std::string& sharding_text); | |
148 | int verify_sharding(const rocksdb::Options& opt, | |
149 | std::vector<rocksdb::ColumnFamilyDescriptor>& existing_cfs, | |
150 | std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> >& existing_cfs_shard, | |
151 | std::vector<rocksdb::ColumnFamilyDescriptor>& missing_cfs, | |
152 | std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> >& missing_cfs_shard); | |
153 | std::shared_ptr<rocksdb::Cache> create_block_cache(const std::string& cache_type, size_t cache_size, double cache_prio_high = 0.0); | |
522d829b | 154 | int split_column_family_options(const std::string& opts_str, |
f67539c2 TL |
155 | std::unordered_map<std::string, std::string>* column_opts_map, |
156 | std::string* block_cache_opt); | |
522d829b TL |
157 | int apply_block_cache_options(const std::string& column_name, |
158 | const std::string& block_cache_opt, | |
159 | rocksdb::ColumnFamilyOptions* cf_opt); | |
160 | int update_column_family_options(const std::string& base_name, | |
161 | const std::string& more_options, | |
162 | rocksdb::ColumnFamilyOptions* cf_opt); | |
7c673cae | 163 | // manage async compactions |
9f95a23c TL |
164 | ceph::mutex compact_queue_lock = |
165 | ceph::make_mutex("RocksDBStore::compact_thread_lock"); | |
166 | ceph::condition_variable compact_queue_cond; | |
f67539c2 | 167 | std::list<std::pair<std::string,std::string>> compact_queue; |
7c673cae FG |
168 | bool compact_queue_stop; |
169 | class CompactThread : public Thread { | |
170 | RocksDBStore *db; | |
171 | public: | |
172 | explicit CompactThread(RocksDBStore *d) : db(d) {} | |
173 | void *entry() override { | |
174 | db->compact_thread_entry(); | |
175 | return NULL; | |
176 | } | |
177 | friend class RocksDBStore; | |
178 | } compact_thread; | |
179 | ||
180 | void compact_thread_entry(); | |
181 | ||
f67539c2 TL |
182 | void compact_range(const std::string& start, const std::string& end); |
183 | void compact_range_async(const std::string& start, const std::string& end); | |
184 | int tryInterpret(const std::string& key, const std::string& val, | |
185 | rocksdb::Options& opt); | |
7c673cae FG |
186 | |
187 | public: | |
188 | /// compact the underlying rocksdb store | |
189 | bool compact_on_mount; | |
190 | bool disableWAL; | |
9f95a23c | 191 | const uint64_t delete_range_threshold; |
7c673cae | 192 | void compact() override; |
11fdf7f2 TL |
193 | |
194 | void compact_async() override { | |
f67539c2 | 195 | compact_range_async({}, {}); |
11fdf7f2 | 196 | } |
7c673cae | 197 | |
f67539c2 | 198 | int ParseOptionsFromString(const std::string& opt_str, rocksdb::Options& opt); |
9f95a23c TL |
199 | static int ParseOptionsFromStringStatic( |
200 | CephContext* cct, | |
f67539c2 | 201 | const std::string& opt_str, |
9f95a23c | 202 | rocksdb::Options &opt, |
f67539c2 TL |
203 | std::function<int(const std::string&, const std::string&, rocksdb::Options&)> interp); |
204 | static int _test_init(const std::string& dir); | |
205 | int init(std::string options_str) override; | |
7c673cae | 206 | /// compact rocksdb for all keys with a given prefix |
f67539c2 | 207 | void compact_prefix(const std::string& prefix) override { |
7c673cae FG |
208 | compact_range(prefix, past_prefix(prefix)); |
209 | } | |
f67539c2 | 210 | void compact_prefix_async(const std::string& prefix) override { |
7c673cae FG |
211 | compact_range_async(prefix, past_prefix(prefix)); |
212 | } | |
213 | ||
f67539c2 TL |
214 | void compact_range(const std::string& prefix, const std::string& start, |
215 | const std::string& end) override { | |
7c673cae FG |
216 | compact_range(combine_strings(prefix, start), combine_strings(prefix, end)); |
217 | } | |
f67539c2 TL |
218 | void compact_range_async(const std::string& prefix, const std::string& start, |
219 | const std::string& end) override { | |
7c673cae FG |
220 | compact_range_async(combine_strings(prefix, start), combine_strings(prefix, end)); |
221 | } | |
222 | ||
f67539c2 | 223 | RocksDBStore(CephContext *c, const std::string &path, std::map<std::string,std::string> opt, void *p) : |
7c673cae FG |
224 | cct(c), |
225 | logger(NULL), | |
226 | path(path), | |
11fdf7f2 | 227 | kv_options(opt), |
7c673cae FG |
228 | priv(p), |
229 | db(NULL), | |
230 | env(static_cast<rocksdb::Env*>(p)), | |
f67539c2 | 231 | comparator(nullptr), |
7c673cae | 232 | dbstats(NULL), |
7c673cae FG |
233 | compact_queue_stop(false), |
234 | compact_thread(this), | |
235 | compact_on_mount(false), | |
236 | disableWAL(false), | |
9f95a23c | 237 | delete_range_threshold(cct->_conf.get_val<uint64_t>("rocksdb_delete_range_threshold")) |
7c673cae FG |
238 | {} |
239 | ||
240 | ~RocksDBStore() override; | |
241 | ||
f67539c2 | 242 | static bool check_omap_dir(std::string &omap_dir); |
7c673cae | 243 | /// Opens underlying db |
f67539c2 TL |
244 | int open(std::ostream &out, const std::string& cfs="") override { |
245 | return do_open(out, false, false, cfs); | |
7c673cae FG |
246 | } |
247 | /// Creates underlying db if missing and opens it | |
f67539c2 TL |
248 | int create_and_open(std::ostream &out, |
249 | const std::string& cfs="") override; | |
11fdf7f2 | 250 | |
f67539c2 TL |
251 | int open_read_only(std::ostream &out, const std::string& cfs="") override { |
252 | return do_open(out, false, true, cfs); | |
11fdf7f2 | 253 | } |
7c673cae FG |
254 | |
255 | void close() override; | |
256 | ||
11fdf7f2 | 257 | int repair(std::ostream &out) override; |
7c673cae | 258 | void split_stats(const std::string &s, char delim, std::vector<std::string> &elems); |
f67539c2 | 259 | void get_statistics(ceph::Formatter *f) override; |
7c673cae | 260 | |
3efd9988 FG |
261 | PerfCounters *get_perf_counters() override |
262 | { | |
263 | return logger; | |
264 | } | |
265 | ||
9f95a23c TL |
266 | bool get_property( |
267 | const std::string &property, | |
268 | uint64_t *out) final; | |
269 | ||
f67539c2 TL |
270 | int64_t estimate_prefix_size(const std::string& prefix, |
271 | const std::string& key_prefix) override; | |
272 | struct RocksWBHandler; | |
7c673cae FG |
273 | class RocksDBTransactionImpl : public KeyValueDB::TransactionImpl { |
274 | public: | |
275 | rocksdb::WriteBatch bat; | |
276 | RocksDBStore *db; | |
277 | ||
278 | explicit RocksDBTransactionImpl(RocksDBStore *_db); | |
11fdf7f2 TL |
279 | private: |
280 | void put_bat( | |
281 | rocksdb::WriteBatch& bat, | |
282 | rocksdb::ColumnFamilyHandle *cf, | |
f67539c2 TL |
283 | const std::string &k, |
284 | const ceph::bufferlist &to_set_bl); | |
11fdf7f2 | 285 | public: |
7c673cae | 286 | void set( |
f67539c2 TL |
287 | const std::string &prefix, |
288 | const std::string &k, | |
289 | const ceph::bufferlist &bl) override; | |
7c673cae | 290 | void set( |
f67539c2 | 291 | const std::string &prefix, |
7c673cae FG |
292 | const char *k, |
293 | size_t keylen, | |
f67539c2 | 294 | const ceph::bufferlist &bl) override; |
7c673cae | 295 | void rmkey( |
f67539c2 TL |
296 | const std::string &prefix, |
297 | const std::string &k) override; | |
7c673cae | 298 | void rmkey( |
f67539c2 | 299 | const std::string &prefix, |
7c673cae FG |
300 | const char *k, |
301 | size_t keylen) override; | |
302 | void rm_single_key( | |
f67539c2 TL |
303 | const std::string &prefix, |
304 | const std::string &k) override; | |
7c673cae | 305 | void rmkeys_by_prefix( |
f67539c2 | 306 | const std::string &prefix |
7c673cae FG |
307 | ) override; |
308 | void rm_range_keys( | |
f67539c2 TL |
309 | const std::string &prefix, |
310 | const std::string &start, | |
311 | const std::string &end) override; | |
7c673cae | 312 | void merge( |
f67539c2 TL |
313 | const std::string& prefix, |
314 | const std::string& k, | |
315 | const ceph::bufferlist &bl) override; | |
7c673cae FG |
316 | }; |
317 | ||
318 | KeyValueDB::Transaction get_transaction() override { | |
319 | return std::make_shared<RocksDBTransactionImpl>(this); | |
320 | } | |
321 | ||
322 | int submit_transaction(KeyValueDB::Transaction t) override; | |
323 | int submit_transaction_sync(KeyValueDB::Transaction t) override; | |
324 | int get( | |
f67539c2 TL |
325 | const std::string &prefix, |
326 | const std::set<std::string> &key, | |
327 | std::map<std::string, ceph::bufferlist> *out | |
7c673cae FG |
328 | ) override; |
329 | int get( | |
f67539c2 TL |
330 | const std::string &prefix, |
331 | const std::string &key, | |
332 | ceph::bufferlist *out | |
7c673cae FG |
333 | ) override; |
334 | int get( | |
f67539c2 | 335 | const std::string &prefix, |
7c673cae FG |
336 | const char *key, |
337 | size_t keylen, | |
f67539c2 | 338 | ceph::bufferlist *out) override; |
7c673cae FG |
339 | |
340 | ||
341 | class RocksDBWholeSpaceIteratorImpl : | |
342 | public KeyValueDB::WholeSpaceIteratorImpl { | |
343 | protected: | |
344 | rocksdb::Iterator *dbiter; | |
345 | public: | |
346 | explicit RocksDBWholeSpaceIteratorImpl(rocksdb::Iterator *iter) : | |
347 | dbiter(iter) { } | |
348 | //virtual ~RocksDBWholeSpaceIteratorImpl() { } | |
349 | ~RocksDBWholeSpaceIteratorImpl() override; | |
350 | ||
351 | int seek_to_first() override; | |
f67539c2 | 352 | int seek_to_first(const std::string &prefix) override; |
7c673cae | 353 | int seek_to_last() override; |
f67539c2 TL |
354 | int seek_to_last(const std::string &prefix) override; |
355 | int upper_bound(const std::string &prefix, const std::string &after) override; | |
356 | int lower_bound(const std::string &prefix, const std::string &to) override; | |
7c673cae FG |
357 | bool valid() override; |
358 | int next() override; | |
359 | int prev() override; | |
f67539c2 TL |
360 | std::string key() override; |
361 | std::pair<std::string,std::string> raw_key() override; | |
362 | bool raw_key_is_prefixed(const std::string &prefix) override; | |
363 | ceph::bufferlist value() override; | |
364 | ceph::bufferptr value_as_ptr() override; | |
7c673cae FG |
365 | int status() override; |
366 | size_t key_size() override; | |
367 | size_t value_size() override; | |
368 | }; | |
369 | ||
f67539c2 TL |
370 | Iterator get_iterator(const std::string& prefix, IteratorOpts opts = 0) override; |
371 | private: | |
372 | /// this iterator spans single cf | |
373 | rocksdb::Iterator* new_shard_iterator(rocksdb::ColumnFamilyHandle* cf); | |
374 | public: | |
7c673cae | 375 | /// Utility |
f67539c2 TL |
376 | static std::string combine_strings(const std::string &prefix, const std::string &value) { |
377 | std::string out = prefix; | |
7c673cae FG |
378 | out.push_back(0); |
379 | out.append(value); | |
380 | return out; | |
381 | } | |
f67539c2 | 382 | static void combine_strings(const std::string &prefix, |
7c673cae | 383 | const char *key, size_t keylen, |
f67539c2 | 384 | std::string *out) { |
7c673cae FG |
385 | out->reserve(prefix.size() + 1 + keylen); |
386 | *out = prefix; | |
387 | out->push_back(0); | |
388 | out->append(key, keylen); | |
389 | } | |
390 | ||
f67539c2 | 391 | static int split_key(rocksdb::Slice in, std::string *prefix, std::string *key); |
7c673cae | 392 | |
f67539c2 | 393 | static std::string past_prefix(const std::string &prefix); |
7c673cae FG |
394 | |
395 | class MergeOperatorRouter; | |
11fdf7f2 | 396 | class MergeOperatorLinker; |
7c673cae | 397 | friend class MergeOperatorRouter; |
11fdf7f2 TL |
398 | int set_merge_operator( |
399 | const std::string& prefix, | |
400 | std::shared_ptr<KeyValueDB::MergeOperator> mop) override; | |
f67539c2 | 401 | std::string assoc_name; ///< Name of associative operator |
7c673cae | 402 | |
f67539c2 | 403 | uint64_t get_estimated_size(std::map<std::string,uint64_t> &extra) override { |
7c673cae FG |
404 | DIR *store_dir = opendir(path.c_str()); |
405 | if (!store_dir) { | |
406 | lderr(cct) << __func__ << " something happened opening the store: " | |
407 | << cpp_strerror(errno) << dendl; | |
408 | return 0; | |
409 | } | |
410 | ||
411 | uint64_t total_size = 0; | |
412 | uint64_t sst_size = 0; | |
413 | uint64_t log_size = 0; | |
414 | uint64_t misc_size = 0; | |
415 | ||
416 | struct dirent *entry = NULL; | |
417 | while ((entry = readdir(store_dir)) != NULL) { | |
f67539c2 | 418 | std::string n(entry->d_name); |
7c673cae FG |
419 | |
420 | if (n == "." || n == "..") | |
421 | continue; | |
422 | ||
f67539c2 | 423 | std::string fpath = path + '/' + n; |
7c673cae FG |
424 | struct stat s; |
425 | int err = stat(fpath.c_str(), &s); | |
426 | if (err < 0) | |
427 | err = -errno; | |
428 | // we may race against rocksdb while reading files; this should only | |
429 | // happen when those files are being updated, data is being shuffled | |
430 | // and files get removed, in which case there's not much of a problem | |
431 | // as we'll get to them next time around. | |
432 | if (err == -ENOENT) { | |
433 | continue; | |
434 | } | |
435 | if (err < 0) { | |
436 | lderr(cct) << __func__ << " error obtaining stats for " << fpath | |
437 | << ": " << cpp_strerror(err) << dendl; | |
438 | goto err; | |
439 | } | |
440 | ||
441 | size_t pos = n.find_last_of('.'); | |
f67539c2 | 442 | if (pos == std::string::npos) { |
7c673cae FG |
443 | misc_size += s.st_size; |
444 | continue; | |
445 | } | |
446 | ||
f67539c2 | 447 | std::string ext = n.substr(pos+1); |
7c673cae FG |
448 | if (ext == "sst") { |
449 | sst_size += s.st_size; | |
450 | } else if (ext == "log") { | |
451 | log_size += s.st_size; | |
452 | } else { | |
453 | misc_size += s.st_size; | |
454 | } | |
455 | } | |
456 | ||
457 | total_size = sst_size + log_size + misc_size; | |
458 | ||
459 | extra["sst"] = sst_size; | |
460 | extra["log"] = log_size; | |
461 | extra["misc"] = misc_size; | |
462 | extra["total"] = total_size; | |
463 | ||
464 | err: | |
465 | closedir(store_dir); | |
466 | return total_size; | |
467 | } | |
468 | ||
11fdf7f2 TL |
469 | virtual int64_t get_cache_usage() const override { |
470 | return static_cast<int64_t>(bbt_opts.block_cache->GetUsage()); | |
91327a77 | 471 | } |
91327a77 | 472 | |
f67539c2 TL |
473 | virtual int64_t get_cache_usage(string prefix) const override { |
474 | auto it = cf_bbt_opts.find(prefix); | |
475 | if (it != cf_bbt_opts.end() && it->second.block_cache) { | |
476 | return static_cast<int64_t>(it->second.block_cache->GetUsage()); | |
477 | } | |
478 | return -EINVAL; | |
479 | } | |
480 | ||
31f18b77 FG |
481 | int set_cache_size(uint64_t s) override { |
482 | cache_size = s; | |
224ce89b | 483 | set_cache_flag = true; |
31f18b77 FG |
484 | return 0; |
485 | } | |
7c673cae | 486 | |
f67539c2 TL |
487 | virtual std::shared_ptr<PriorityCache::PriCache> |
488 | get_priority_cache() const override { | |
11fdf7f2 TL |
489 | return dynamic_pointer_cast<PriorityCache::PriCache>( |
490 | bbt_opts.block_cache); | |
491 | } | |
492 | ||
f67539c2 TL |
493 | virtual std::shared_ptr<PriorityCache::PriCache> |
494 | get_priority_cache(string prefix) const override { | |
495 | auto it = cf_bbt_opts.find(prefix); | |
496 | if (it != cf_bbt_opts.end()) { | |
497 | return dynamic_pointer_cast<PriorityCache::PriCache>( | |
498 | it->second.block_cache); | |
499 | } | |
500 | return nullptr; | |
501 | } | |
7c673cae | 502 | |
f67539c2 TL |
503 | WholeSpaceIterator get_wholespace_iterator(IteratorOpts opts = 0) override; |
504 | private: | |
505 | WholeSpaceIterator get_default_cf_iterator(); | |
506 | ||
507 | using cf_deleter_t = std::function<void(rocksdb::ColumnFamilyHandle*)>; | |
508 | using columns_t = std::map<std::string, | |
509 | std::unique_ptr<rocksdb::ColumnFamilyHandle, | |
510 | cf_deleter_t>>; | |
511 | int prepare_for_reshard(const std::string& new_sharding, | |
512 | columns_t& to_process_columns); | |
513 | int reshard_cleanup(const columns_t& current_columns); | |
514 | public: | |
515 | struct resharding_ctrl { | |
516 | size_t bytes_per_iterator = 10000000; /// amount of data to process before refreshing iterator | |
517 | size_t keys_per_iterator = 10000; | |
518 | size_t bytes_per_batch = 1000000; /// amount of data before submitting batch | |
519 | size_t keys_per_batch = 1000; | |
520 | bool unittest_fail_after_first_batch = false; | |
521 | bool unittest_fail_after_processing_column = false; | |
522 | bool unittest_fail_after_successful_processing = false; | |
523 | }; | |
524 | int reshard(const std::string& new_sharding, const resharding_ctrl* ctrl = nullptr); | |
525 | bool get_sharding(std::string& sharding); | |
7c673cae | 526 | |
f67539c2 | 527 | }; |
7c673cae FG |
528 | |
529 | #endif |