1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #include "LevelDBStore.h"
12 #include "include/memory.h"
14 #include "common/debug.h"
15 #include "common/perf_counters.h"
17 // re-include our assert to clobber the system one; fix dout:
18 #include "include/assert.h"
20 #define dout_context cct
21 #define dout_subsys ceph_subsys_leveldb
23 #define dout_prefix *_dout << "leveldb: "
25 class CephLevelDBLogger
: public leveldb::Logger
{
28 explicit CephLevelDBLogger(CephContext
*c
) : cct(c
) {
31 ~CephLevelDBLogger() override
{
35 // Write an entry to the log file with the specified format.
36 void Logv(const char* format
, va_list ap
) override
{
39 vsnprintf(buf
, sizeof(buf
), format
, ap
);
40 *_dout
<< buf
<< dendl
;
44 leveldb::Logger
*create_leveldb_ceph_logger()
46 return new CephLevelDBLogger(g_ceph_context
);
49 int LevelDBStore::init(string option_str
)
51 // init defaults. caller can override these if they want
52 // prior to calling open.
53 options
.write_buffer_size
= g_conf
->leveldb_write_buffer_size
;
54 options
.cache_size
= g_conf
->leveldb_cache_size
;
55 options
.block_size
= g_conf
->leveldb_block_size
;
56 options
.bloom_size
= g_conf
->leveldb_bloom_size
;
57 options
.compression_enabled
= g_conf
->leveldb_compression
;
58 options
.paranoid_checks
= g_conf
->leveldb_paranoid
;
59 options
.max_open_files
= g_conf
->leveldb_max_open_files
;
60 options
.log_file
= g_conf
->leveldb_log
;
64 int LevelDBStore::do_open(ostream
&out
, bool create_if_missing
)
66 leveldb::Options ldoptions
;
68 if (options
.write_buffer_size
)
69 ldoptions
.write_buffer_size
= options
.write_buffer_size
;
70 if (options
.max_open_files
)
71 ldoptions
.max_open_files
= options
.max_open_files
;
72 if (options
.cache_size
) {
73 leveldb::Cache
*_db_cache
= leveldb::NewLRUCache(options
.cache_size
);
74 db_cache
.reset(_db_cache
);
75 ldoptions
.block_cache
= db_cache
.get();
77 if (options
.block_size
)
78 ldoptions
.block_size
= options
.block_size
;
79 if (options
.bloom_size
) {
80 #ifdef HAVE_LEVELDB_FILTER_POLICY
81 const leveldb::FilterPolicy
*_filterpolicy
=
82 leveldb::NewBloomFilterPolicy(options
.bloom_size
);
83 filterpolicy
.reset(_filterpolicy
);
84 ldoptions
.filter_policy
= filterpolicy
.get();
86 assert(0 == "bloom size set but installed leveldb doesn't support bloom filters");
89 if (options
.compression_enabled
)
90 ldoptions
.compression
= leveldb::kSnappyCompression
;
92 ldoptions
.compression
= leveldb::kNoCompression
;
93 if (options
.block_restart_interval
)
94 ldoptions
.block_restart_interval
= options
.block_restart_interval
;
96 ldoptions
.error_if_exists
= options
.error_if_exists
;
97 ldoptions
.paranoid_checks
= options
.paranoid_checks
;
98 ldoptions
.create_if_missing
= create_if_missing
;
100 if (g_conf
->leveldb_log_to_ceph_log
) {
101 ceph_logger
= new CephLevelDBLogger(g_ceph_context
);
102 ldoptions
.info_log
= ceph_logger
;
105 if (options
.log_file
.length()) {
106 leveldb::Env
*env
= leveldb::Env::Default();
107 env
->NewLogger(options
.log_file
, &ldoptions
.info_log
);
111 leveldb::Status status
= leveldb::DB::Open(ldoptions
, path
, &_db
);
114 out
<< status
.ToString() << std::endl
;
118 PerfCountersBuilder
plb(g_ceph_context
, "leveldb", l_leveldb_first
, l_leveldb_last
);
119 plb
.add_u64_counter(l_leveldb_gets
, "leveldb_get", "Gets");
120 plb
.add_u64_counter(l_leveldb_txns
, "leveldb_transaction", "Transactions");
121 plb
.add_time_avg(l_leveldb_get_latency
, "leveldb_get_latency", "Get Latency");
122 plb
.add_time_avg(l_leveldb_submit_latency
, "leveldb_submit_latency", "Submit Latency");
123 plb
.add_time_avg(l_leveldb_submit_sync_latency
, "leveldb_submit_sync_latency", "Submit Sync Latency");
124 plb
.add_u64_counter(l_leveldb_compact
, "leveldb_compact", "Compactions");
125 plb
.add_u64_counter(l_leveldb_compact_range
, "leveldb_compact_range", "Compactions by range");
126 plb
.add_u64_counter(l_leveldb_compact_queue_merge
, "leveldb_compact_queue_merge", "Mergings of ranges in compaction queue");
127 plb
.add_u64(l_leveldb_compact_queue_len
, "leveldb_compact_queue_len", "Length of compaction queue");
128 logger
= plb
.create_perf_counters();
129 cct
->get_perfcounters_collection()->add(logger
);
131 if (g_conf
->leveldb_compact_on_mount
) {
132 derr
<< "Compacting leveldb store..." << dendl
;
134 derr
<< "Finished compacting leveldb store" << dendl
;
139 int LevelDBStore::_test_init(const string
& dir
)
141 leveldb::Options options
;
142 options
.create_if_missing
= true;
144 leveldb::Status status
= leveldb::DB::Open(options
, dir
, &db
);
146 return status
.ok() ? 0 : -EIO
;
149 LevelDBStore::~LevelDBStore()
155 // Ensure db is destroyed before dependent db_cache and filterpolicy
159 void LevelDBStore::close()
161 // stop compaction thread
162 compact_queue_lock
.Lock();
163 if (compact_thread
.is_started()) {
164 compact_queue_stop
= true;
165 compact_queue_cond
.Signal();
166 compact_queue_lock
.Unlock();
167 compact_thread
.join();
169 compact_queue_lock
.Unlock();
173 cct
->get_perfcounters_collection()->remove(logger
);
176 int LevelDBStore::submit_transaction(KeyValueDB::Transaction t
)
178 utime_t start
= ceph_clock_now();
179 LevelDBTransactionImpl
* _t
=
180 static_cast<LevelDBTransactionImpl
*>(t
.get());
181 leveldb::Status s
= db
->Write(leveldb::WriteOptions(), &(_t
->bat
));
182 utime_t lat
= ceph_clock_now() - start
;
183 logger
->inc(l_leveldb_txns
);
184 logger
->tinc(l_leveldb_submit_latency
, lat
);
185 return s
.ok() ? 0 : -1;
188 int LevelDBStore::submit_transaction_sync(KeyValueDB::Transaction t
)
190 utime_t start
= ceph_clock_now();
191 LevelDBTransactionImpl
* _t
=
192 static_cast<LevelDBTransactionImpl
*>(t
.get());
193 leveldb::WriteOptions options
;
195 leveldb::Status s
= db
->Write(options
, &(_t
->bat
));
196 utime_t lat
= ceph_clock_now() - start
;
197 logger
->inc(l_leveldb_txns
);
198 logger
->tinc(l_leveldb_submit_sync_latency
, lat
);
199 return s
.ok() ? 0 : -1;
202 void LevelDBStore::LevelDBTransactionImpl::set(
203 const string
&prefix
,
205 const bufferlist
&to_set_bl
)
207 string key
= combine_strings(prefix
, k
);
208 size_t bllen
= to_set_bl
.length();
209 // bufferlist::c_str() is non-constant, so we can't call c_str()
210 if (to_set_bl
.is_contiguous() && bllen
> 0) {
211 // bufferlist contains just one ptr or they're contiguous
212 bat
.Put(leveldb::Slice(key
), leveldb::Slice(to_set_bl
.buffers().front().c_str(), bllen
));
213 } else if ((bllen
<= 32 * 1024) && (bllen
> 0)) {
214 // 2+ bufferptrs that are not contiguopus
215 // allocate buffer on stack and copy bl contents to that buffer
216 // make sure the buffer isn't too large or we might crash here...
217 char* slicebuf
= (char*) alloca(bllen
);
218 leveldb::Slice
newslice(slicebuf
, bllen
);
219 std::list
<buffer::ptr
>::const_iterator pb
;
220 for (pb
= to_set_bl
.buffers().begin(); pb
!= to_set_bl
.buffers().end(); ++pb
) {
221 size_t ptrlen
= (*pb
).length();
222 memcpy((void*)slicebuf
, (*pb
).c_str(), ptrlen
);
225 bat
.Put(leveldb::Slice(key
), newslice
);
227 // 2+ bufferptrs that are not contiguous, and enormous in size
228 bufferlist val
= to_set_bl
;
229 bat
.Put(leveldb::Slice(key
), leveldb::Slice(val
.c_str(), val
.length()));
233 void LevelDBStore::LevelDBTransactionImpl::rmkey(const string
&prefix
,
236 string key
= combine_strings(prefix
, k
);
237 bat
.Delete(leveldb::Slice(key
));
240 void LevelDBStore::LevelDBTransactionImpl::rmkeys_by_prefix(const string
&prefix
)
242 KeyValueDB::Iterator it
= db
->get_iterator(prefix
);
243 for (it
->seek_to_first();
246 bat
.Delete(leveldb::Slice(combine_strings(prefix
, it
->key())));
250 void LevelDBStore::LevelDBTransactionImpl::rm_range_keys(const string
&prefix
, const string
&start
, const string
&end
)
252 KeyValueDB::Iterator it
= db
->get_iterator(prefix
);
253 it
->lower_bound(start
);
254 while (it
->valid()) {
255 if (it
->key() >= end
) {
258 bat
.Delete(combine_strings(prefix
, it
->key()));
263 int LevelDBStore::get(
264 const string
&prefix
,
265 const std::set
<string
> &keys
,
266 std::map
<string
, bufferlist
> *out
)
268 utime_t start
= ceph_clock_now();
269 for (std::set
<string
>::const_iterator i
= keys
.begin();
270 i
!= keys
.end(); ++i
) {
272 std::string bound
= combine_strings(prefix
, *i
);
273 auto status
= db
->Get(leveldb::ReadOptions(), leveldb::Slice(bound
), &value
);
275 (*out
)[*i
].append(value
);
277 utime_t lat
= ceph_clock_now() - start
;
278 logger
->inc(l_leveldb_gets
);
279 logger
->tinc(l_leveldb_get_latency
, lat
);
283 int LevelDBStore::get(const string
&prefix
,
287 assert(out
&& (out
->length() == 0));
288 utime_t start
= ceph_clock_now();
292 k
= combine_strings(prefix
, key
);
293 s
= db
->Get(leveldb::ReadOptions(), leveldb::Slice(k
), &value
);
299 utime_t lat
= ceph_clock_now() - start
;
300 logger
->inc(l_leveldb_gets
);
301 logger
->tinc(l_leveldb_get_latency
, lat
);
305 string
LevelDBStore::combine_strings(const string
&prefix
, const string
&value
)
313 bufferlist
LevelDBStore::to_bufferlist(leveldb::Slice in
)
316 bl
.append(bufferptr(in
.data(), in
.size()));
320 int LevelDBStore::split_key(leveldb::Slice in
, string
*prefix
, string
*key
)
322 size_t prefix_len
= 0;
324 // Find separator inside Slice
325 char* separator
= (char*) memchr(in
.data(), 0, in
.size());
326 if (separator
== NULL
)
328 prefix_len
= size_t(separator
- in
.data());
329 if (prefix_len
>= in
.size())
333 *prefix
= string(in
.data(), prefix_len
);
335 *key
= string(separator
+1, in
.size() - prefix_len
- 1);
339 void LevelDBStore::compact()
341 logger
->inc(l_leveldb_compact
);
342 db
->CompactRange(NULL
, NULL
);
346 void LevelDBStore::compact_thread_entry()
348 compact_queue_lock
.Lock();
349 while (!compact_queue_stop
) {
350 while (!compact_queue
.empty()) {
351 pair
<string
,string
> range
= compact_queue
.front();
352 compact_queue
.pop_front();
353 logger
->set(l_leveldb_compact_queue_len
, compact_queue
.size());
354 compact_queue_lock
.Unlock();
355 logger
->inc(l_leveldb_compact_range
);
356 compact_range(range
.first
, range
.second
);
357 compact_queue_lock
.Lock();
360 compact_queue_cond
.Wait(compact_queue_lock
);
362 compact_queue_lock
.Unlock();
365 void LevelDBStore::compact_range_async(const string
& start
, const string
& end
)
367 Mutex::Locker
l(compact_queue_lock
);
369 // try to merge adjacent ranges. this is O(n), but the queue should
370 // be short. note that we do not cover all overlap cases and merge
371 // opportunities here, but we capture the ones we currently need.
372 list
< pair
<string
,string
> >::iterator p
= compact_queue
.begin();
373 while (p
!= compact_queue
.end()) {
374 if (p
->first
== start
&& p
->second
== end
) {
378 if (p
->first
<= end
&& p
->first
> start
) {
379 // merge with existing range to the right
380 compact_queue
.push_back(make_pair(start
, p
->second
));
381 compact_queue
.erase(p
);
382 logger
->inc(l_leveldb_compact_queue_merge
);
385 if (p
->second
>= start
&& p
->second
< end
) {
386 // merge with existing range to the left
387 compact_queue
.push_back(make_pair(p
->first
, end
));
388 compact_queue
.erase(p
);
389 logger
->inc(l_leveldb_compact_queue_merge
);
394 if (p
== compact_queue
.end()) {
395 // no merge, new entry.
396 compact_queue
.push_back(make_pair(start
, end
));
397 logger
->set(l_leveldb_compact_queue_len
, compact_queue
.size());
399 compact_queue_cond
.Signal();
400 if (!compact_thread
.is_started()) {
401 compact_thread
.create("levdbst_compact");