1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #ifndef ROCKS_DB_STORE_H
4 #define ROCKS_DB_STORE_H
6 #include "include/types.h"
7 #include "include/buffer_fwd.h"
8 #include "KeyValueDB.h"
13 #include <boost/scoped_ptr.hpp>
14 #include "rocksdb/write_batch.h"
15 #include "rocksdb/perf_context.h"
16 #include "rocksdb/iostats_context.h"
17 #include "rocksdb/statistics.h"
18 #include "rocksdb/table.h"
19 #include "rocksdb/db.h"
20 #include "kv/rocksdb_cache/BinnedLRUCache.h"
22 #include "common/errno.h"
23 #include "common/dout.h"
24 #include "include/ceph_assert.h"
25 #include "include/common_fwd.h"
26 #include "common/Formatter.h"
27 #include "common/Cond.h"
28 #include "common/ceph_context.h"
29 #include "common/PriorityCache.h"
30 #include "common/pretty_binary.h"
33 l_rocksdb_first
= 34300,
34 l_rocksdb_get_latency
,
35 l_rocksdb_submit_latency
,
36 l_rocksdb_submit_sync_latency
,
38 l_rocksdb_compact_range
,
39 l_rocksdb_compact_queue_merge
,
40 l_rocksdb_compact_queue_len
,
41 l_rocksdb_write_wal_time
,
42 l_rocksdb_write_memtable_time
,
43 l_rocksdb_write_delay_time
,
44 l_rocksdb_write_pre_and_post_process_time
,
58 class ColumnFamilyHandle
;
60 struct BlockBasedTableOptions
;
62 struct ColumnFamilyOptions
;
65 extern rocksdb::Logger
*create_rocksdb_ceph_logger();
67 inline rocksdb::Slice
make_slice(const std::optional
<std::string
>& bound
) {
76 * Uses RocksDB to implement the KeyValueDB interface
78 class RocksDBStore
: public KeyValueDB
{
82 std::map
<std::string
,std::string
> kv_options
;
86 const rocksdb::Comparator
* comparator
;
87 std::shared_ptr
<rocksdb::Statistics
> dbstats
;
88 rocksdb::BlockBasedTableOptions bbt_opts
;
89 std::string options_str
;
91 uint64_t cache_size
= 0;
92 bool set_cache_flag
= false;
93 friend class ShardMergeIteratorImpl
;
94 friend class CFIteratorImpl
;
95 friend class WholeMergeIteratorImpl
;
97 * See RocksDB's definition of a column family(CF) and how to use it.
98 * The interfaces of KeyValueDB is extended, when a column family is created.
99 * Prefix will be the name of column family to use.
102 struct ColumnFamily
{
103 std::string name
; //< name of this individual column family
104 size_t shard_cnt
; //< count of shards
105 std::string options
; //< configure option string for this CF
106 uint32_t hash_l
; //< first character to take for hash calc.
107 uint32_t hash_h
; //< last character to take for hash calc.
108 ColumnFamily(const std::string
&name
, size_t shard_cnt
, const std::string
&options
,
109 uint32_t hash_l
, uint32_t hash_h
)
110 : name(name
), shard_cnt(shard_cnt
), options(options
), hash_l(hash_l
), hash_h(hash_h
) {}
113 friend std::ostream
& operator<<(std::ostream
& out
, const ColumnFamily
& cf
);
115 bool must_close_default_cf
= false;
116 rocksdb::ColumnFamilyHandle
*default_cf
= nullptr;
118 /// column families in use, name->handles
119 struct prefix_shards
{
120 uint32_t hash_l
; //< first character to take for hash calc.
121 uint32_t hash_h
; //< last character to take for hash calc.
122 std::vector
<rocksdb::ColumnFamilyHandle
*> handles
;
124 std::unordered_map
<std::string
, prefix_shards
> cf_handles
;
125 std::unordered_map
<uint32_t, std::string
> cf_ids_to_prefix
;
126 std::unordered_map
<std::string
, rocksdb::BlockBasedTableOptions
> cf_bbt_opts
;
128 void add_column_family(const std::string
& cf_name
, uint32_t hash_l
, uint32_t hash_h
,
129 size_t shard_idx
, rocksdb::ColumnFamilyHandle
*handle
);
130 bool is_column_family(const std::string
& prefix
);
131 std::string_view
get_key_hash_view(const prefix_shards
& shards
, const char* key
, const size_t keylen
);
132 rocksdb::ColumnFamilyHandle
*get_key_cf(const prefix_shards
& shards
, const char* key
, const size_t keylen
);
133 rocksdb::ColumnFamilyHandle
*get_cf_handle(const std::string
& prefix
, const std::string
& key
);
134 rocksdb::ColumnFamilyHandle
*get_cf_handle(const std::string
& prefix
, const char* key
, size_t keylen
);
135 rocksdb::ColumnFamilyHandle
*get_cf_handle(const std::string
& prefix
, const IteratorBounds
& bounds
);
137 int submit_common(rocksdb::WriteOptions
& woptions
, KeyValueDB::Transaction t
);
138 int install_cf_mergeop(const std::string
&cf_name
, rocksdb::ColumnFamilyOptions
*cf_opt
);
140 int do_open(std::ostream
&out
, bool create_if_missing
, bool open_readonly
,
141 const std::string
& cfs
="");
142 int load_rocksdb_options(bool create_if_missing
, rocksdb::Options
& opt
);
144 static bool parse_sharding_def(const std::string_view text_def
,
145 std::vector
<ColumnFamily
>& sharding_def
,
146 char const* *error_position
= nullptr,
147 std::string
*error_msg
= nullptr);
148 const rocksdb::Comparator
* get_comparator() const {
153 static void sharding_def_to_columns(const std::vector
<ColumnFamily
>& sharding_def
,
154 std::vector
<std::string
>& columns
);
155 int create_shards(const rocksdb::Options
& opt
,
156 const std::vector
<ColumnFamily
>& sharding_def
);
157 int apply_sharding(const rocksdb::Options
& opt
,
158 const std::string
& sharding_text
);
159 int verify_sharding(const rocksdb::Options
& opt
,
160 std::vector
<rocksdb::ColumnFamilyDescriptor
>& existing_cfs
,
161 std::vector
<std::pair
<size_t, RocksDBStore::ColumnFamily
> >& existing_cfs_shard
,
162 std::vector
<rocksdb::ColumnFamilyDescriptor
>& missing_cfs
,
163 std::vector
<std::pair
<size_t, RocksDBStore::ColumnFamily
> >& missing_cfs_shard
);
164 std::shared_ptr
<rocksdb::Cache
> create_block_cache(const std::string
& cache_type
, size_t cache_size
, double cache_prio_high
= 0.0);
165 int split_column_family_options(const std::string
& opts_str
,
166 std::unordered_map
<std::string
, std::string
>* column_opts_map
,
167 std::string
* block_cache_opt
);
168 int apply_block_cache_options(const std::string
& column_name
,
169 const std::string
& block_cache_opt
,
170 rocksdb::ColumnFamilyOptions
* cf_opt
);
171 int update_column_family_options(const std::string
& base_name
,
172 const std::string
& more_options
,
173 rocksdb::ColumnFamilyOptions
* cf_opt
);
174 // manage async compactions
175 ceph::mutex compact_queue_lock
=
176 ceph::make_mutex("RocksDBStore::compact_thread_lock");
177 ceph::condition_variable compact_queue_cond
;
178 std::list
<std::pair
<std::string
,std::string
>> compact_queue
;
179 bool compact_queue_stop
;
180 class CompactThread
: public Thread
{
183 explicit CompactThread(RocksDBStore
*d
) : db(d
) {}
184 void *entry() override
{
185 db
->compact_thread_entry();
188 friend class RocksDBStore
;
191 void compact_thread_entry();
193 void compact_range(const std::string
& start
, const std::string
& end
);
194 void compact_range_async(const std::string
& start
, const std::string
& end
);
195 int tryInterpret(const std::string
& key
, const std::string
& val
,
196 rocksdb::Options
& opt
);
199 /// compact the underlying rocksdb store
200 bool compact_on_mount
;
202 const uint64_t delete_range_threshold
;
203 void compact() override
;
205 void compact_async() override
{
206 compact_range_async({}, {});
209 int ParseOptionsFromString(const std::string
& opt_str
, rocksdb::Options
& opt
);
210 static int ParseOptionsFromStringStatic(
212 const std::string
& opt_str
,
213 rocksdb::Options
&opt
,
214 std::function
<int(const std::string
&, const std::string
&, rocksdb::Options
&)> interp
);
215 static int _test_init(const std::string
& dir
);
216 int init(std::string options_str
) override
;
217 /// compact rocksdb for all keys with a given prefix
218 void compact_prefix(const std::string
& prefix
) override
{
219 compact_range(prefix
, past_prefix(prefix
));
221 void compact_prefix_async(const std::string
& prefix
) override
{
222 compact_range_async(prefix
, past_prefix(prefix
));
225 void compact_range(const std::string
& prefix
, const std::string
& start
,
226 const std::string
& end
) override
{
227 compact_range(combine_strings(prefix
, start
), combine_strings(prefix
, end
));
229 void compact_range_async(const std::string
& prefix
, const std::string
& start
,
230 const std::string
& end
) override
{
231 compact_range_async(combine_strings(prefix
, start
), combine_strings(prefix
, end
));
234 RocksDBStore(CephContext
*c
, const std::string
&path
, std::map
<std::string
,std::string
> opt
, void *p
) :
241 env(static_cast<rocksdb::Env
*>(p
)),
244 compact_queue_stop(false),
245 compact_thread(this),
246 compact_on_mount(false),
248 delete_range_threshold(cct
->_conf
.get_val
<uint64_t>("rocksdb_delete_range_threshold"))
251 ~RocksDBStore() override
;
253 static bool check_omap_dir(std::string
&omap_dir
);
254 /// Opens underlying db
255 int open(std::ostream
&out
, const std::string
& cfs
="") override
{
256 return do_open(out
, false, false, cfs
);
258 /// Creates underlying db if missing and opens it
259 int create_and_open(std::ostream
&out
,
260 const std::string
& cfs
="") override
;
262 int open_read_only(std::ostream
&out
, const std::string
& cfs
="") override
{
263 return do_open(out
, false, true, cfs
);
266 void close() override
;
268 int repair(std::ostream
&out
) override
;
269 void split_stats(const std::string
&s
, char delim
, std::vector
<std::string
> &elems
);
270 void get_statistics(ceph::Formatter
*f
) override
;
272 PerfCounters
*get_perf_counters() override
278 const std::string
&property
,
279 uint64_t *out
) final
;
281 int64_t estimate_prefix_size(const std::string
& prefix
,
282 const std::string
& key_prefix
) override
;
283 struct RocksWBHandler
;
284 class RocksDBTransactionImpl
: public KeyValueDB::TransactionImpl
{
286 rocksdb::WriteBatch bat
;
289 explicit RocksDBTransactionImpl(RocksDBStore
*_db
);
292 rocksdb::WriteBatch
& bat
,
293 rocksdb::ColumnFamilyHandle
*cf
,
294 const std::string
&k
,
295 const ceph::bufferlist
&to_set_bl
);
298 const std::string
&prefix
,
299 const std::string
&k
,
300 const ceph::bufferlist
&bl
) override
;
302 const std::string
&prefix
,
305 const ceph::bufferlist
&bl
) override
;
307 const std::string
&prefix
,
308 const std::string
&k
) override
;
310 const std::string
&prefix
,
312 size_t keylen
) override
;
314 const std::string
&prefix
,
315 const std::string
&k
) override
;
316 void rmkeys_by_prefix(
317 const std::string
&prefix
320 const std::string
&prefix
,
321 const std::string
&start
,
322 const std::string
&end
) override
;
324 const std::string
& prefix
,
325 const std::string
& k
,
326 const ceph::bufferlist
&bl
) override
;
329 KeyValueDB::Transaction
get_transaction() override
{
330 return std::make_shared
<RocksDBTransactionImpl
>(this);
333 int submit_transaction(KeyValueDB::Transaction t
) override
;
334 int submit_transaction_sync(KeyValueDB::Transaction t
) override
;
336 const std::string
&prefix
,
337 const std::set
<std::string
> &key
,
338 std::map
<std::string
, ceph::bufferlist
> *out
341 const std::string
&prefix
,
342 const std::string
&key
,
343 ceph::bufferlist
*out
346 const std::string
&prefix
,
349 ceph::bufferlist
*out
) override
;
352 class RocksDBWholeSpaceIteratorImpl
:
353 public KeyValueDB::WholeSpaceIteratorImpl
{
355 rocksdb::Iterator
*dbiter
;
357 explicit RocksDBWholeSpaceIteratorImpl(const RocksDBStore
* db
,
358 rocksdb::ColumnFamilyHandle
* cf
,
359 const KeyValueDB::IteratorOpts opts
)
361 rocksdb::ReadOptions options
= rocksdb::ReadOptions();
362 if (opts
& ITERATOR_NOCACHE
)
363 options
.fill_cache
=false;
364 dbiter
= db
->db
->NewIterator(options
, cf
);
366 ~RocksDBWholeSpaceIteratorImpl() override
;
368 int seek_to_first() override
;
369 int seek_to_first(const std::string
&prefix
) override
;
370 int seek_to_last() override
;
371 int seek_to_last(const std::string
&prefix
) override
;
372 int upper_bound(const std::string
&prefix
, const std::string
&after
) override
;
373 int lower_bound(const std::string
&prefix
, const std::string
&to
) override
;
374 bool valid() override
;
377 std::string
key() override
;
378 std::pair
<std::string
,std::string
> raw_key() override
;
379 bool raw_key_is_prefixed(const std::string
&prefix
) override
;
380 ceph::bufferlist
value() override
;
381 ceph::bufferptr
value_as_ptr() override
;
382 int status() override
;
383 size_t key_size() override
;
384 size_t value_size() override
;
387 Iterator
get_iterator(const std::string
& prefix
, IteratorOpts opts
= 0, IteratorBounds
= IteratorBounds()) override
;
389 /// this iterator spans single cf
390 rocksdb::Iterator
* new_shard_iterator(rocksdb::ColumnFamilyHandle
* cf
);
393 static std::string
combine_strings(const std::string
&prefix
, const std::string
&value
) {
394 std::string out
= prefix
;
399 static void combine_strings(const std::string
&prefix
,
400 const char *key
, size_t keylen
,
402 out
->reserve(prefix
.size() + 1 + keylen
);
405 out
->append(key
, keylen
);
408 static int split_key(rocksdb::Slice in
, std::string
*prefix
, std::string
*key
);
410 static std::string
past_prefix(const std::string
&prefix
);
412 class MergeOperatorRouter
;
413 class MergeOperatorLinker
;
414 friend class MergeOperatorRouter
;
415 int set_merge_operator(
416 const std::string
& prefix
,
417 std::shared_ptr
<KeyValueDB::MergeOperator
> mop
) override
;
418 std::string assoc_name
; ///< Name of associative operator
420 uint64_t get_estimated_size(std::map
<std::string
,uint64_t> &extra
) override
{
421 DIR *store_dir
= opendir(path
.c_str());
423 lderr(cct
) << __func__
<< " something happened opening the store: "
424 << cpp_strerror(errno
) << dendl
;
428 uint64_t total_size
= 0;
429 uint64_t sst_size
= 0;
430 uint64_t log_size
= 0;
431 uint64_t misc_size
= 0;
433 struct dirent
*entry
= NULL
;
434 while ((entry
= readdir(store_dir
)) != NULL
) {
435 std::string
n(entry
->d_name
);
437 if (n
== "." || n
== "..")
440 std::string fpath
= path
+ '/' + n
;
442 int err
= stat(fpath
.c_str(), &s
);
445 // we may race against rocksdb while reading files; this should only
446 // happen when those files are being updated, data is being shuffled
447 // and files get removed, in which case there's not much of a problem
448 // as we'll get to them next time around.
449 if (err
== -ENOENT
) {
453 lderr(cct
) << __func__
<< " error obtaining stats for " << fpath
454 << ": " << cpp_strerror(err
) << dendl
;
458 size_t pos
= n
.find_last_of('.');
459 if (pos
== std::string::npos
) {
460 misc_size
+= s
.st_size
;
464 std::string ext
= n
.substr(pos
+1);
466 sst_size
+= s
.st_size
;
467 } else if (ext
== "log") {
468 log_size
+= s
.st_size
;
470 misc_size
+= s
.st_size
;
474 total_size
= sst_size
+ log_size
+ misc_size
;
476 extra
["sst"] = sst_size
;
477 extra
["log"] = log_size
;
478 extra
["misc"] = misc_size
;
479 extra
["total"] = total_size
;
486 virtual int64_t get_cache_usage() const override
{
487 return static_cast<int64_t>(bbt_opts
.block_cache
->GetUsage());
490 virtual int64_t get_cache_usage(std::string prefix
) const override
{
491 auto it
= cf_bbt_opts
.find(prefix
);
492 if (it
!= cf_bbt_opts
.end() && it
->second
.block_cache
) {
493 return static_cast<int64_t>(it
->second
.block_cache
->GetUsage());
498 int set_cache_size(uint64_t s
) override
{
500 set_cache_flag
= true;
504 virtual std::shared_ptr
<PriorityCache::PriCache
>
505 get_priority_cache() const override
{
506 return std::dynamic_pointer_cast
<PriorityCache::PriCache
>(
507 bbt_opts
.block_cache
);
510 virtual std::shared_ptr
<PriorityCache::PriCache
>
511 get_priority_cache(std::string prefix
) const override
{
512 auto it
= cf_bbt_opts
.find(prefix
);
513 if (it
!= cf_bbt_opts
.end()) {
514 return std::dynamic_pointer_cast
<PriorityCache::PriCache
>(
515 it
->second
.block_cache
);
520 WholeSpaceIterator
get_wholespace_iterator(IteratorOpts opts
= 0) override
;
522 WholeSpaceIterator
get_default_cf_iterator();
524 using cf_deleter_t
= std::function
<void(rocksdb::ColumnFamilyHandle
*)>;
525 using columns_t
= std::map
<std::string
,
526 std::unique_ptr
<rocksdb::ColumnFamilyHandle
,
528 int prepare_for_reshard(const std::string
& new_sharding
,
529 columns_t
& to_process_columns
);
530 int reshard_cleanup(const columns_t
& current_columns
);
532 struct resharding_ctrl
{
533 size_t bytes_per_iterator
= 10000000; /// amount of data to process before refreshing iterator
534 size_t keys_per_iterator
= 10000;
535 size_t bytes_per_batch
= 1000000; /// amount of data before submitting batch
536 size_t keys_per_batch
= 1000;
537 bool unittest_fail_after_first_batch
= false;
538 bool unittest_fail_after_processing_column
= false;
539 bool unittest_fail_after_successful_processing
= false;
541 int reshard(const std::string
& new_sharding
, const resharding_ctrl
* ctrl
= nullptr);
542 bool get_sharding(std::string
& sharding
);