1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #include "LevelDBStore.h"
12 #include "common/debug.h"
13 #include "common/perf_counters.h"
15 // re-include our assert to clobber the system one; fix dout:
16 #include "include/ceph_assert.h"
18 #define dout_context cct
19 #define dout_subsys ceph_subsys_leveldb
21 #define dout_prefix *_dout << "leveldb: "
23 class CephLevelDBLogger
: public leveldb::Logger
{
26 explicit CephLevelDBLogger(CephContext
*c
) : cct(c
) {
29 ~CephLevelDBLogger() override
{
33 // Write an entry to the log file with the specified format.
34 void Logv(const char* format
, va_list ap
) override
{
37 vsnprintf(buf
, sizeof(buf
), format
, ap
);
38 *_dout
<< buf
<< dendl
;
42 leveldb::Logger
*create_leveldb_ceph_logger()
44 return new CephLevelDBLogger(g_ceph_context
);
47 int LevelDBStore::init(string option_str
)
49 // init defaults. caller can override these if they want
50 // prior to calling open.
51 options
.write_buffer_size
= g_conf()->leveldb_write_buffer_size
;
52 options
.cache_size
= g_conf()->leveldb_cache_size
;
53 options
.block_size
= g_conf()->leveldb_block_size
;
54 options
.bloom_size
= g_conf()->leveldb_bloom_size
;
55 options
.compression_enabled
= g_conf()->leveldb_compression
;
56 options
.paranoid_checks
= g_conf()->leveldb_paranoid
;
57 options
.max_open_files
= g_conf()->leveldb_max_open_files
;
58 options
.log_file
= g_conf()->leveldb_log
;
62 int LevelDBStore::open(ostream
&out
, const vector
<ColumnFamily
>& cfs
) {
64 ceph_abort_msg("Not implemented");
66 return do_open(out
, false);
69 int LevelDBStore::create_and_open(ostream
&out
, const vector
<ColumnFamily
>& cfs
) {
71 ceph_abort_msg("Not implemented");
73 return do_open(out
, true);
76 int LevelDBStore::load_leveldb_options(bool create_if_missing
, leveldb::Options
&ldoptions
)
78 if (options
.write_buffer_size
)
79 ldoptions
.write_buffer_size
= options
.write_buffer_size
;
80 if (options
.max_open_files
)
81 ldoptions
.max_open_files
= options
.max_open_files
;
82 if (options
.cache_size
) {
83 leveldb::Cache
*_db_cache
= leveldb::NewLRUCache(options
.cache_size
);
84 db_cache
.reset(_db_cache
);
85 ldoptions
.block_cache
= db_cache
.get();
87 if (options
.block_size
)
88 ldoptions
.block_size
= options
.block_size
;
89 if (options
.bloom_size
) {
90 #ifdef HAVE_LEVELDB_FILTER_POLICY
91 const leveldb::FilterPolicy
*_filterpolicy
=
92 leveldb::NewBloomFilterPolicy(options
.bloom_size
);
93 filterpolicy
.reset(_filterpolicy
);
94 ldoptions
.filter_policy
= filterpolicy
.get();
96 ceph_abort_msg(0 == "bloom size set but installed leveldb doesn't support bloom filters");
99 if (options
.compression_enabled
)
100 ldoptions
.compression
= leveldb::kSnappyCompression
;
102 ldoptions
.compression
= leveldb::kNoCompression
;
103 if (options
.block_restart_interval
)
104 ldoptions
.block_restart_interval
= options
.block_restart_interval
;
106 ldoptions
.error_if_exists
= options
.error_if_exists
;
107 ldoptions
.paranoid_checks
= options
.paranoid_checks
;
108 ldoptions
.create_if_missing
= create_if_missing
;
110 if (g_conf()->leveldb_log_to_ceph_log
) {
111 ceph_logger
= new CephLevelDBLogger(g_ceph_context
);
112 ldoptions
.info_log
= ceph_logger
;
115 if (options
.log_file
.length()) {
116 leveldb::Env
*env
= leveldb::Env::Default();
117 env
->NewLogger(options
.log_file
, &ldoptions
.info_log
);
122 int LevelDBStore::do_open(ostream
&out
, bool create_if_missing
)
124 leveldb::Options ldoptions
;
125 int r
= load_leveldb_options(create_if_missing
, ldoptions
);
127 dout(1) << "load leveldb options failed" << dendl
;
132 leveldb::Status status
= leveldb::DB::Open(ldoptions
, path
, &_db
);
135 out
<< status
.ToString() << std::endl
;
139 PerfCountersBuilder
plb(g_ceph_context
, "leveldb", l_leveldb_first
, l_leveldb_last
);
140 plb
.add_u64_counter(l_leveldb_gets
, "leveldb_get", "Gets");
141 plb
.add_u64_counter(l_leveldb_txns
, "leveldb_transaction", "Transactions");
142 plb
.add_time_avg(l_leveldb_get_latency
, "leveldb_get_latency", "Get Latency");
143 plb
.add_time_avg(l_leveldb_submit_latency
, "leveldb_submit_latency", "Submit Latency");
144 plb
.add_time_avg(l_leveldb_submit_sync_latency
, "leveldb_submit_sync_latency", "Submit Sync Latency");
145 plb
.add_u64_counter(l_leveldb_compact
, "leveldb_compact", "Compactions");
146 plb
.add_u64_counter(l_leveldb_compact_range
, "leveldb_compact_range", "Compactions by range");
147 plb
.add_u64_counter(l_leveldb_compact_queue_merge
, "leveldb_compact_queue_merge", "Mergings of ranges in compaction queue");
148 plb
.add_u64(l_leveldb_compact_queue_len
, "leveldb_compact_queue_len", "Length of compaction queue");
149 logger
= plb
.create_perf_counters();
150 cct
->get_perfcounters_collection()->add(logger
);
152 if (g_conf()->leveldb_compact_on_mount
) {
153 derr
<< "Compacting leveldb store..." << dendl
;
155 derr
<< "Finished compacting leveldb store" << dendl
;
160 int LevelDBStore::_test_init(const string
& dir
)
162 leveldb::Options options
;
163 options
.create_if_missing
= true;
165 leveldb::Status status
= leveldb::DB::Open(options
, dir
, &db
);
167 return status
.ok() ? 0 : -EIO
;
170 LevelDBStore::~LevelDBStore()
175 // Ensure db is destroyed before dependent db_cache and filterpolicy
180 void LevelDBStore::close()
182 // stop compaction thread
183 compact_queue_lock
.lock();
184 if (compact_thread
.is_started()) {
185 compact_queue_stop
= true;
186 compact_queue_cond
.notify_all();
187 compact_queue_lock
.unlock();
188 compact_thread
.join();
190 compact_queue_lock
.unlock();
194 cct
->get_perfcounters_collection()->remove(logger
);
197 int LevelDBStore::repair(std::ostream
&out
)
199 leveldb::Options ldoptions
;
200 int r
= load_leveldb_options(false, ldoptions
);
202 dout(1) << "load leveldb options failed" << dendl
;
203 out
<< "load leveldb options failed" << std::endl
;
206 leveldb::Status status
= leveldb::RepairDB(path
, ldoptions
);
210 out
<< "repair leveldb failed : " << status
.ToString() << std::endl
;
215 int LevelDBStore::submit_transaction(KeyValueDB::Transaction t
)
217 utime_t start
= ceph_clock_now();
218 LevelDBTransactionImpl
* _t
=
219 static_cast<LevelDBTransactionImpl
*>(t
.get());
220 leveldb::Status s
= db
->Write(leveldb::WriteOptions(), &(_t
->bat
));
221 utime_t lat
= ceph_clock_now() - start
;
222 logger
->inc(l_leveldb_txns
);
223 logger
->tinc(l_leveldb_submit_latency
, lat
);
224 return s
.ok() ? 0 : -1;
227 int LevelDBStore::submit_transaction_sync(KeyValueDB::Transaction t
)
229 utime_t start
= ceph_clock_now();
230 LevelDBTransactionImpl
* _t
=
231 static_cast<LevelDBTransactionImpl
*>(t
.get());
232 leveldb::WriteOptions options
;
234 leveldb::Status s
= db
->Write(options
, &(_t
->bat
));
235 utime_t lat
= ceph_clock_now() - start
;
236 logger
->inc(l_leveldb_txns
);
237 logger
->tinc(l_leveldb_submit_sync_latency
, lat
);
238 return s
.ok() ? 0 : -1;
241 void LevelDBStore::LevelDBTransactionImpl::set(
242 const string
&prefix
,
244 const bufferlist
&to_set_bl
)
246 string key
= combine_strings(prefix
, k
);
247 size_t bllen
= to_set_bl
.length();
248 // bufferlist::c_str() is non-constant, so we can't call c_str()
249 if (to_set_bl
.is_contiguous() && bllen
> 0) {
250 // bufferlist contains just one ptr or they're contiguous
251 bat
.Put(leveldb::Slice(key
), leveldb::Slice(to_set_bl
.buffers().front().c_str(), bllen
));
252 } else if ((bllen
<= 32 * 1024) && (bllen
> 0)) {
253 // 2+ bufferptrs that are not contiguopus
254 // allocate buffer on stack and copy bl contents to that buffer
255 // make sure the buffer isn't too large or we might crash here...
256 char* slicebuf
= (char*) alloca(bllen
);
257 leveldb::Slice
newslice(slicebuf
, bllen
);
258 for (const auto& node
: to_set_bl
.buffers()) {
259 const size_t ptrlen
= node
.length();
260 memcpy(static_cast<void*>(slicebuf
), node
.c_str(), ptrlen
);
263 bat
.Put(leveldb::Slice(key
), newslice
);
265 // 2+ bufferptrs that are not contiguous, and enormous in size
266 bufferlist val
= to_set_bl
;
267 bat
.Put(leveldb::Slice(key
), leveldb::Slice(val
.c_str(), val
.length()));
271 void LevelDBStore::LevelDBTransactionImpl::rmkey(const string
&prefix
,
274 string key
= combine_strings(prefix
, k
);
275 bat
.Delete(leveldb::Slice(key
));
278 void LevelDBStore::LevelDBTransactionImpl::rmkeys_by_prefix(const string
&prefix
)
280 KeyValueDB::Iterator it
= db
->get_iterator(prefix
);
281 for (it
->seek_to_first();
284 bat
.Delete(leveldb::Slice(combine_strings(prefix
, it
->key())));
288 void LevelDBStore::LevelDBTransactionImpl::rm_range_keys(const string
&prefix
, const string
&start
, const string
&end
)
290 KeyValueDB::Iterator it
= db
->get_iterator(prefix
);
291 it
->lower_bound(start
);
292 while (it
->valid()) {
293 if (it
->key() >= end
) {
296 bat
.Delete(combine_strings(prefix
, it
->key()));
301 int LevelDBStore::get(
302 const string
&prefix
,
303 const std::set
<string
> &keys
,
304 std::map
<string
, bufferlist
> *out
)
306 utime_t start
= ceph_clock_now();
307 for (std::set
<string
>::const_iterator i
= keys
.begin();
308 i
!= keys
.end(); ++i
) {
310 std::string bound
= combine_strings(prefix
, *i
);
311 auto status
= db
->Get(leveldb::ReadOptions(), leveldb::Slice(bound
), &value
);
313 (*out
)[*i
].append(value
);
315 utime_t lat
= ceph_clock_now() - start
;
316 logger
->inc(l_leveldb_gets
);
317 logger
->tinc(l_leveldb_get_latency
, lat
);
321 int LevelDBStore::get(const string
&prefix
,
325 ceph_assert(out
&& (out
->length() == 0));
326 utime_t start
= ceph_clock_now();
330 k
= combine_strings(prefix
, key
);
331 s
= db
->Get(leveldb::ReadOptions(), leveldb::Slice(k
), &value
);
337 utime_t lat
= ceph_clock_now() - start
;
338 logger
->inc(l_leveldb_gets
);
339 logger
->tinc(l_leveldb_get_latency
, lat
);
343 string
LevelDBStore::combine_strings(const string
&prefix
, const string
&value
)
351 bufferlist
LevelDBStore::to_bufferlist(leveldb::Slice in
)
354 bl
.append(bufferptr(in
.data(), in
.size()));
358 int LevelDBStore::split_key(leveldb::Slice in
, string
*prefix
, string
*key
)
360 size_t prefix_len
= 0;
362 // Find separator inside Slice
363 char* separator
= (char*) memchr(in
.data(), 0, in
.size());
364 if (separator
== NULL
)
366 prefix_len
= size_t(separator
- in
.data());
367 if (prefix_len
>= in
.size())
371 *prefix
= string(in
.data(), prefix_len
);
373 *key
= string(separator
+1, in
.size() - prefix_len
- 1);
377 void LevelDBStore::compact()
379 logger
->inc(l_leveldb_compact
);
380 db
->CompactRange(NULL
, NULL
);
384 void LevelDBStore::compact_thread_entry()
386 std::unique_lock l
{compact_queue_lock
};
387 while (!compact_queue_stop
) {
388 while (!compact_queue
.empty()) {
389 pair
<string
,string
> range
= compact_queue
.front();
390 compact_queue
.pop_front();
391 logger
->set(l_leveldb_compact_queue_len
, compact_queue
.size());
393 logger
->inc(l_leveldb_compact_range
);
394 if (range
.first
.empty() && range
.second
.empty()) {
397 compact_range(range
.first
, range
.second
);
402 if (compact_queue_stop
)
404 compact_queue_cond
.wait(l
);
408 void LevelDBStore::compact_range_async(const string
& start
, const string
& end
)
410 std::lock_guard
l(compact_queue_lock
);
412 // try to merge adjacent ranges. this is O(n), but the queue should
413 // be short. note that we do not cover all overlap cases and merge
414 // opportunities here, but we capture the ones we currently need.
415 list
< pair
<string
,string
> >::iterator p
= compact_queue
.begin();
416 while (p
!= compact_queue
.end()) {
417 if (p
->first
== start
&& p
->second
== end
) {
421 if (p
->first
<= end
&& p
->first
> start
) {
422 // merge with existing range to the right
423 compact_queue
.push_back(make_pair(start
, p
->second
));
424 compact_queue
.erase(p
);
425 logger
->inc(l_leveldb_compact_queue_merge
);
428 if (p
->second
>= start
&& p
->second
< end
) {
429 // merge with existing range to the left
430 compact_queue
.push_back(make_pair(p
->first
, end
));
431 compact_queue
.erase(p
);
432 logger
->inc(l_leveldb_compact_queue_merge
);
437 if (p
== compact_queue
.end()) {
438 // no merge, new entry.
439 compact_queue
.push_back(make_pair(start
, end
));
440 logger
->set(l_leveldb_compact_queue_len
, compact_queue
.size());
442 compact_queue_cond
.notify_all();
443 if (!compact_thread
.is_started()) {
444 compact_thread
.create("levdbst_compact");