1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #ifndef LEVEL_DB_STORE_H
4 #define LEVEL_DB_STORE_H
6 #include "include/types.h"
7 #include "include/buffer_fwd.h"
8 #include "KeyValueDB.h"
12 #include "include/memory.h"
13 #include <boost/scoped_ptr.hpp>
14 #include "leveldb/db.h"
15 #include "leveldb/env.h"
16 #include "leveldb/write_batch.h"
17 #include "leveldb/slice.h"
18 #include "leveldb/cache.h"
19 #ifdef HAVE_LEVELDB_FILTER_POLICY
20 #include "leveldb/filter_policy.h"
24 #include "common/errno.h"
25 #include "common/dout.h"
26 #include "include/assert.h"
27 #include "common/Formatter.h"
28 #include "common/Cond.h"
30 #include "common/ceph_context.h"
35 l_leveldb_first
= 34300,
38 l_leveldb_get_latency
,
39 l_leveldb_submit_latency
,
40 l_leveldb_submit_sync_latency
,
42 l_leveldb_compact_range
,
43 l_leveldb_compact_queue_merge
,
44 l_leveldb_compact_queue_len
,
48 extern leveldb::Logger
*create_leveldb_ceph_logger();
50 class CephLevelDBLogger
;
53 * Uses LevelDB to implement the KeyValueDB interface
55 class LevelDBStore
: public KeyValueDB
{
58 CephLevelDBLogger
*ceph_logger
;
60 boost::scoped_ptr
<leveldb::Cache
> db_cache
;
61 #ifdef HAVE_LEVELDB_FILTER_POLICY
62 boost::scoped_ptr
<const leveldb::FilterPolicy
> filterpolicy
;
64 boost::scoped_ptr
<leveldb::DB
> db
;
66 int do_open(ostream
&out
, bool create_if_missing
);
68 // manage async compactions
69 Mutex compact_queue_lock
;
70 Cond compact_queue_cond
;
71 list
< pair
<string
,string
> > compact_queue
;
72 bool compact_queue_stop
;
73 class CompactThread
: public Thread
{
76 explicit CompactThread(LevelDBStore
*d
) : db(d
) {}
77 void *entry() override
{
78 db
->compact_thread_entry();
81 friend class LevelDBStore
;
84 void compact_thread_entry();
86 void compact_range(const string
& start
, const string
& end
) {
87 leveldb::Slice
cstart(start
);
88 leveldb::Slice
cend(end
);
89 db
->CompactRange(&cstart
, &cend
);
91 void compact_range_async(const string
& start
, const string
& end
);
94 /// compact the underlying leveldb store
95 void compact() override
;
97 /// compact db for all keys with a given prefix
98 void compact_prefix(const string
& prefix
) override
{
99 compact_range(prefix
, past_prefix(prefix
));
101 void compact_prefix_async(const string
& prefix
) override
{
102 compact_range_async(prefix
, past_prefix(prefix
));
104 void compact_range(const string
& prefix
,
105 const string
& start
, const string
& end
) override
{
106 compact_range(combine_strings(prefix
, start
), combine_strings(prefix
, end
));
108 void compact_range_async(const string
& prefix
,
109 const string
& start
, const string
& end
) override
{
110 compact_range_async(combine_strings(prefix
, start
),
111 combine_strings(prefix
, end
));
116 * options_t: Holds options which are minimally interpreted
117 * on initialization and then passed through to LevelDB.
118 * We transform a couple of these into actual LevelDB
119 * structures, but the rest are simply passed through unchanged. See
120 * leveldb/options.h for more precise details on each.
122 * Set them after constructing the LevelDBStore, but before calling
123 * open() or create_and_open().
126 uint64_t write_buffer_size
; /// in-memory write buffer size
127 int max_open_files
; /// maximum number of files LevelDB can open at once
128 uint64_t cache_size
; /// size of extra decompressed cache to use
129 uint64_t block_size
; /// user data per block
130 int bloom_size
; /// number of bits per entry to put in a bloom filter
131 bool compression_enabled
; /// whether to use libsnappy compression or not
133 // don't change these ones. No, seriously
134 int block_restart_interval
;
135 bool error_if_exists
;
136 bool paranoid_checks
;
141 write_buffer_size(0), //< 0 means default
142 max_open_files(0), //< 0 means default
143 cache_size(0), //< 0 means no cache (default)
144 block_size(0), //< 0 means default
145 bloom_size(0), //< 0 means no bloom filter (default)
146 compression_enabled(true), //< set to false for no compression
147 block_restart_interval(0), //< 0 means default
148 error_if_exists(false), //< set to true if you want to check nonexistence
149 paranoid_checks(false) //< set to true if you want paranoid checks
153 LevelDBStore(CephContext
*c
, const string
&path
) :
159 #ifdef HAVE_LEVELDB_FILTER_POLICY
162 compact_queue_lock("LevelDBStore::compact_thread_lock"),
163 compact_queue_stop(false),
164 compact_thread(this),
168 ~LevelDBStore() override
;
170 static int _test_init(const string
& dir
);
171 int init(string option_str
="") override
;
173 /// Opens underlying db
174 int open(ostream
&out
) override
{
175 return do_open(out
, false);
177 /// Creates underlying db if missing and opens it
178 int create_and_open(ostream
&out
) override
{
179 return do_open(out
, true);
182 void close() override
;
184 class LevelDBTransactionImpl
: public KeyValueDB::TransactionImpl
{
186 leveldb::WriteBatch bat
;
188 explicit LevelDBTransactionImpl(LevelDBStore
*db
) : db(db
) {}
190 const string
&prefix
,
192 const bufferlist
&bl
) override
;
193 using KeyValueDB::TransactionImpl::set
;
195 const string
&prefix
,
196 const string
&k
) override
;
197 void rmkeys_by_prefix(
200 virtual void rm_range_keys(
201 const string
&prefix
,
203 const string
&end
) override
;
205 using KeyValueDB::TransactionImpl::rmkey
;
208 KeyValueDB::Transaction
get_transaction() override
{
209 return std::make_shared
<LevelDBTransactionImpl
>(this);
212 int submit_transaction(KeyValueDB::Transaction t
) override
;
213 int submit_transaction_sync(KeyValueDB::Transaction t
) override
;
215 const string
&prefix
,
216 const std::set
<string
> &key
,
217 std::map
<string
, bufferlist
> *out
220 int get(const string
&prefix
,
222 bufferlist
*value
) override
;
224 using KeyValueDB::get
;
226 class LevelDBWholeSpaceIteratorImpl
:
227 public KeyValueDB::WholeSpaceIteratorImpl
{
229 boost::scoped_ptr
<leveldb::Iterator
> dbiter
;
231 explicit LevelDBWholeSpaceIteratorImpl(leveldb::Iterator
*iter
) :
233 ~LevelDBWholeSpaceIteratorImpl() override
{ }
235 int seek_to_first() override
{
236 dbiter
->SeekToFirst();
237 return dbiter
->status().ok() ? 0 : -1;
239 int seek_to_first(const string
&prefix
) override
{
240 leveldb::Slice
slice_prefix(prefix
);
241 dbiter
->Seek(slice_prefix
);
242 return dbiter
->status().ok() ? 0 : -1;
244 int seek_to_last() override
{
245 dbiter
->SeekToLast();
246 return dbiter
->status().ok() ? 0 : -1;
248 int seek_to_last(const string
&prefix
) override
{
249 string limit
= past_prefix(prefix
);
250 leveldb::Slice
slice_limit(limit
);
251 dbiter
->Seek(slice_limit
);
253 if (!dbiter
->Valid()) {
254 dbiter
->SeekToLast();
258 return dbiter
->status().ok() ? 0 : -1;
260 int upper_bound(const string
&prefix
, const string
&after
) override
{
261 lower_bound(prefix
, after
);
263 pair
<string
,string
> key
= raw_key();
264 if (key
.first
== prefix
&& key
.second
== after
)
267 return dbiter
->status().ok() ? 0 : -1;
269 int lower_bound(const string
&prefix
, const string
&to
) override
{
270 string bound
= combine_strings(prefix
, to
);
271 leveldb::Slice
slice_bound(bound
);
272 dbiter
->Seek(slice_bound
);
273 return dbiter
->status().ok() ? 0 : -1;
275 bool valid() override
{
276 return dbiter
->Valid();
278 int next() override
{
281 return dbiter
->status().ok() ? 0 : -1;
283 int prev() override
{
286 return dbiter
->status().ok() ? 0 : -1;
288 string
key() override
{
290 split_key(dbiter
->key(), 0, &out_key
);
293 pair
<string
,string
> raw_key() override
{
295 split_key(dbiter
->key(), &prefix
, &key
);
296 return make_pair(prefix
, key
);
298 bool raw_key_is_prefixed(const string
&prefix
) override
{
299 leveldb::Slice key
= dbiter
->key();
300 if ((key
.size() > prefix
.length()) && (key
[prefix
.length()] == '\0')) {
301 return memcmp(key
.data(), prefix
.c_str(), prefix
.length()) == 0;
306 bufferlist
value() override
{
307 return to_bufferlist(dbiter
->value());
310 bufferptr
value_as_ptr() override
{
311 leveldb::Slice data
= dbiter
->value();
312 return bufferptr(data
.data(), data
.size());
315 int status() override
{
316 return dbiter
->status().ok() ? 0 : -1;
321 static string
combine_strings(const string
&prefix
, const string
&value
);
322 static int split_key(leveldb::Slice in
, string
*prefix
, string
*key
);
323 static bufferlist
to_bufferlist(leveldb::Slice in
);
324 static string
past_prefix(const string
&prefix
) {
325 string limit
= prefix
;
330 uint64_t get_estimated_size(map
<string
,uint64_t> &extra
) override
{
331 DIR *store_dir
= opendir(path
.c_str());
333 lderr(cct
) << __func__
<< " something happened opening the store: "
334 << cpp_strerror(errno
) << dendl
;
338 uint64_t total_size
= 0;
339 uint64_t sst_size
= 0;
340 uint64_t log_size
= 0;
341 uint64_t misc_size
= 0;
343 struct dirent
*entry
= NULL
;
344 while ((entry
= readdir(store_dir
)) != NULL
) {
345 string
n(entry
->d_name
);
347 if (n
== "." || n
== "..")
350 string fpath
= path
+ '/' + n
;
352 int err
= stat(fpath
.c_str(), &s
);
355 // we may race against leveldb while reading files; this should only
356 // happen when those files are being updated, data is being shuffled
357 // and files get removed, in which case there's not much of a problem
358 // as we'll get to them next time around.
359 if (err
== -ENOENT
) {
363 lderr(cct
) << __func__
<< " error obtaining stats for " << fpath
364 << ": " << cpp_strerror(err
) << dendl
;
368 size_t pos
= n
.find_last_of('.');
369 if (pos
== string::npos
) {
370 misc_size
+= s
.st_size
;
374 string ext
= n
.substr(pos
+1);
376 sst_size
+= s
.st_size
;
377 } else if (ext
== "log") {
378 log_size
+= s
.st_size
;
380 misc_size
+= s
.st_size
;
384 total_size
= sst_size
+ log_size
+ misc_size
;
386 extra
["sst"] = sst_size
;
387 extra
["log"] = log_size
;
388 extra
["misc"] = misc_size
;
389 extra
["total"] = total_size
;
398 WholeSpaceIterator
_get_iterator() override
{
399 return std::make_shared
<LevelDBWholeSpaceIteratorImpl
>(
400 db
->NewIterator(leveldb::ReadOptions()));