1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #include "LevelDBStore.h"
8 #include "include/memory.h"
11 #include "common/debug.h"
12 #include "common/perf_counters.h"
14 #define dout_context cct
15 #define dout_subsys ceph_subsys_leveldb
17 #define dout_prefix *_dout << "leveldb: "
19 class CephLevelDBLogger
: public leveldb::Logger
{
22 explicit CephLevelDBLogger(CephContext
*c
) : cct(c
) {
25 ~CephLevelDBLogger() override
{
29 // Write an entry to the log file with the specified format.
30 void Logv(const char* format
, va_list ap
) override
{
33 vsnprintf(buf
, sizeof(buf
), format
, ap
);
34 *_dout
<< buf
<< dendl
;
38 leveldb::Logger
*create_leveldb_ceph_logger()
40 return new CephLevelDBLogger(g_ceph_context
);
43 int LevelDBStore::init(string option_str
)
45 // init defaults. caller can override these if they want
46 // prior to calling open.
47 options
.write_buffer_size
= g_conf
->leveldb_write_buffer_size
;
48 options
.cache_size
= g_conf
->leveldb_cache_size
;
49 options
.block_size
= g_conf
->leveldb_block_size
;
50 options
.bloom_size
= g_conf
->leveldb_bloom_size
;
51 options
.compression_enabled
= g_conf
->leveldb_compression
;
52 options
.paranoid_checks
= g_conf
->leveldb_paranoid
;
53 options
.max_open_files
= g_conf
->leveldb_max_open_files
;
54 options
.log_file
= g_conf
->leveldb_log
;
58 int LevelDBStore::do_open(ostream
&out
, bool create_if_missing
)
60 leveldb::Options ldoptions
;
62 if (options
.write_buffer_size
)
63 ldoptions
.write_buffer_size
= options
.write_buffer_size
;
64 if (options
.max_open_files
)
65 ldoptions
.max_open_files
= options
.max_open_files
;
66 if (options
.cache_size
) {
67 leveldb::Cache
*_db_cache
= leveldb::NewLRUCache(options
.cache_size
);
68 db_cache
.reset(_db_cache
);
69 ldoptions
.block_cache
= db_cache
.get();
71 if (options
.block_size
)
72 ldoptions
.block_size
= options
.block_size
;
73 if (options
.bloom_size
) {
74 #ifdef HAVE_LEVELDB_FILTER_POLICY
75 const leveldb::FilterPolicy
*_filterpolicy
=
76 leveldb::NewBloomFilterPolicy(options
.bloom_size
);
77 filterpolicy
.reset(_filterpolicy
);
78 ldoptions
.filter_policy
= filterpolicy
.get();
80 assert(0 == "bloom size set but installed leveldb doesn't support bloom filters");
83 if (options
.compression_enabled
)
84 ldoptions
.compression
= leveldb::kSnappyCompression
;
86 ldoptions
.compression
= leveldb::kNoCompression
;
87 if (options
.block_restart_interval
)
88 ldoptions
.block_restart_interval
= options
.block_restart_interval
;
90 ldoptions
.error_if_exists
= options
.error_if_exists
;
91 ldoptions
.paranoid_checks
= options
.paranoid_checks
;
92 ldoptions
.create_if_missing
= create_if_missing
;
94 if (g_conf
->leveldb_log_to_ceph_log
) {
95 ceph_logger
= new CephLevelDBLogger(g_ceph_context
);
96 ldoptions
.info_log
= ceph_logger
;
99 if (options
.log_file
.length()) {
100 leveldb::Env
*env
= leveldb::Env::Default();
101 env
->NewLogger(options
.log_file
, &ldoptions
.info_log
);
105 leveldb::Status status
= leveldb::DB::Open(ldoptions
, path
, &_db
);
108 out
<< status
.ToString() << std::endl
;
112 PerfCountersBuilder
plb(g_ceph_context
, "leveldb", l_leveldb_first
, l_leveldb_last
);
113 plb
.add_u64_counter(l_leveldb_gets
, "leveldb_get", "Gets");
114 plb
.add_u64_counter(l_leveldb_txns
, "leveldb_transaction", "Transactions");
115 plb
.add_time_avg(l_leveldb_get_latency
, "leveldb_get_latency", "Get Latency");
116 plb
.add_time_avg(l_leveldb_submit_latency
, "leveldb_submit_latency", "Submit Latency");
117 plb
.add_time_avg(l_leveldb_submit_sync_latency
, "leveldb_submit_sync_latency", "Submit Sync Latency");
118 plb
.add_u64_counter(l_leveldb_compact
, "leveldb_compact", "Compactions");
119 plb
.add_u64_counter(l_leveldb_compact_range
, "leveldb_compact_range", "Compactions by range");
120 plb
.add_u64_counter(l_leveldb_compact_queue_merge
, "leveldb_compact_queue_merge", "Mergings of ranges in compaction queue");
121 plb
.add_u64(l_leveldb_compact_queue_len
, "leveldb_compact_queue_len", "Length of compaction queue");
122 logger
= plb
.create_perf_counters();
123 cct
->get_perfcounters_collection()->add(logger
);
125 if (g_conf
->leveldb_compact_on_mount
) {
126 derr
<< "Compacting leveldb store..." << dendl
;
128 derr
<< "Finished compacting leveldb store" << dendl
;
133 int LevelDBStore::_test_init(const string
& dir
)
135 leveldb::Options options
;
136 options
.create_if_missing
= true;
138 leveldb::Status status
= leveldb::DB::Open(options
, dir
, &db
);
140 return status
.ok() ? 0 : -EIO
;
143 LevelDBStore::~LevelDBStore()
149 // Ensure db is destroyed before dependent db_cache and filterpolicy
153 void LevelDBStore::close()
155 // stop compaction thread
156 compact_queue_lock
.Lock();
157 if (compact_thread
.is_started()) {
158 compact_queue_stop
= true;
159 compact_queue_cond
.Signal();
160 compact_queue_lock
.Unlock();
161 compact_thread
.join();
163 compact_queue_lock
.Unlock();
167 cct
->get_perfcounters_collection()->remove(logger
);
170 int LevelDBStore::submit_transaction(KeyValueDB::Transaction t
)
172 utime_t start
= ceph_clock_now();
173 LevelDBTransactionImpl
* _t
=
174 static_cast<LevelDBTransactionImpl
*>(t
.get());
175 leveldb::Status s
= db
->Write(leveldb::WriteOptions(), &(_t
->bat
));
176 utime_t lat
= ceph_clock_now() - start
;
177 logger
->inc(l_leveldb_txns
);
178 logger
->tinc(l_leveldb_submit_latency
, lat
);
179 return s
.ok() ? 0 : -1;
182 int LevelDBStore::submit_transaction_sync(KeyValueDB::Transaction t
)
184 utime_t start
= ceph_clock_now();
185 LevelDBTransactionImpl
* _t
=
186 static_cast<LevelDBTransactionImpl
*>(t
.get());
187 leveldb::WriteOptions options
;
189 leveldb::Status s
= db
->Write(options
, &(_t
->bat
));
190 utime_t lat
= ceph_clock_now() - start
;
191 logger
->inc(l_leveldb_txns
);
192 logger
->tinc(l_leveldb_submit_sync_latency
, lat
);
193 return s
.ok() ? 0 : -1;
196 void LevelDBStore::LevelDBTransactionImpl::set(
197 const string
&prefix
,
199 const bufferlist
&to_set_bl
)
201 string key
= combine_strings(prefix
, k
);
202 size_t bllen
= to_set_bl
.length();
203 // bufferlist::c_str() is non-constant, so we can't call c_str()
204 if (to_set_bl
.is_contiguous() && bllen
> 0) {
205 // bufferlist contains just one ptr or they're contiguous
206 bat
.Put(leveldb::Slice(key
), leveldb::Slice(to_set_bl
.buffers().front().c_str(), bllen
));
207 } else if ((bllen
<= 32 * 1024) && (bllen
> 0)) {
208 // 2+ bufferptrs that are not contiguopus
209 // allocate buffer on stack and copy bl contents to that buffer
210 // make sure the buffer isn't too large or we might crash here...
211 char* slicebuf
= (char*) alloca(bllen
);
212 leveldb::Slice
newslice(slicebuf
, bllen
);
213 std::list
<buffer::ptr
>::const_iterator pb
;
214 for (pb
= to_set_bl
.buffers().begin(); pb
!= to_set_bl
.buffers().end(); ++pb
) {
215 size_t ptrlen
= (*pb
).length();
216 memcpy((void*)slicebuf
, (*pb
).c_str(), ptrlen
);
219 bat
.Put(leveldb::Slice(key
), newslice
);
221 // 2+ bufferptrs that are not contiguous, and enormous in size
222 bufferlist val
= to_set_bl
;
223 bat
.Put(leveldb::Slice(key
), leveldb::Slice(val
.c_str(), val
.length()));
227 void LevelDBStore::LevelDBTransactionImpl::rmkey(const string
&prefix
,
230 string key
= combine_strings(prefix
, k
);
231 bat
.Delete(leveldb::Slice(key
));
234 void LevelDBStore::LevelDBTransactionImpl::rmkeys_by_prefix(const string
&prefix
)
236 KeyValueDB::Iterator it
= db
->get_iterator(prefix
);
237 for (it
->seek_to_first();
240 bat
.Delete(leveldb::Slice(combine_strings(prefix
, it
->key())));
244 void LevelDBStore::LevelDBTransactionImpl::rm_range_keys(const string
&prefix
, const string
&start
, const string
&end
)
246 KeyValueDB::Iterator it
= db
->get_iterator(prefix
);
247 it
->lower_bound(start
);
248 while (it
->valid()) {
249 if (it
->key() >= end
) {
252 bat
.Delete(combine_strings(prefix
, it
->key()));
257 int LevelDBStore::get(
258 const string
&prefix
,
259 const std::set
<string
> &keys
,
260 std::map
<string
, bufferlist
> *out
)
262 utime_t start
= ceph_clock_now();
263 for (std::set
<string
>::const_iterator i
= keys
.begin();
264 i
!= keys
.end(); ++i
) {
266 std::string bound
= combine_strings(prefix
, *i
);
267 auto status
= db
->Get(leveldb::ReadOptions(), leveldb::Slice(bound
), &value
);
269 (*out
)[*i
].append(value
);
271 utime_t lat
= ceph_clock_now() - start
;
272 logger
->inc(l_leveldb_gets
);
273 logger
->tinc(l_leveldb_get_latency
, lat
);
277 int LevelDBStore::get(const string
&prefix
,
281 assert(out
&& (out
->length() == 0));
282 utime_t start
= ceph_clock_now();
286 k
= combine_strings(prefix
, key
);
287 s
= db
->Get(leveldb::ReadOptions(), leveldb::Slice(k
), &value
);
293 utime_t lat
= ceph_clock_now() - start
;
294 logger
->inc(l_leveldb_gets
);
295 logger
->tinc(l_leveldb_get_latency
, lat
);
299 string
LevelDBStore::combine_strings(const string
&prefix
, const string
&value
)
307 bufferlist
LevelDBStore::to_bufferlist(leveldb::Slice in
)
310 bl
.append(bufferptr(in
.data(), in
.size()));
314 int LevelDBStore::split_key(leveldb::Slice in
, string
*prefix
, string
*key
)
316 size_t prefix_len
= 0;
318 // Find separator inside Slice
319 char* separator
= (char*) memchr(in
.data(), 0, in
.size());
320 if (separator
== NULL
)
322 prefix_len
= size_t(separator
- in
.data());
323 if (prefix_len
>= in
.size())
327 *prefix
= string(in
.data(), prefix_len
);
329 *key
= string(separator
+1, in
.size() - prefix_len
- 1);
333 void LevelDBStore::compact()
335 logger
->inc(l_leveldb_compact
);
336 db
->CompactRange(NULL
, NULL
);
340 void LevelDBStore::compact_thread_entry()
342 compact_queue_lock
.Lock();
343 while (!compact_queue_stop
) {
344 while (!compact_queue
.empty()) {
345 pair
<string
,string
> range
= compact_queue
.front();
346 compact_queue
.pop_front();
347 logger
->set(l_leveldb_compact_queue_len
, compact_queue
.size());
348 compact_queue_lock
.Unlock();
349 logger
->inc(l_leveldb_compact_range
);
350 compact_range(range
.first
, range
.second
);
351 compact_queue_lock
.Lock();
354 compact_queue_cond
.Wait(compact_queue_lock
);
356 compact_queue_lock
.Unlock();
359 void LevelDBStore::compact_range_async(const string
& start
, const string
& end
)
361 Mutex::Locker
l(compact_queue_lock
);
363 // try to merge adjacent ranges. this is O(n), but the queue should
364 // be short. note that we do not cover all overlap cases and merge
365 // opportunities here, but we capture the ones we currently need.
366 list
< pair
<string
,string
> >::iterator p
= compact_queue
.begin();
367 while (p
!= compact_queue
.end()) {
368 if (p
->first
== start
&& p
->second
== end
) {
372 if (p
->first
<= end
&& p
->first
> start
) {
373 // merge with existing range to the right
374 compact_queue
.push_back(make_pair(start
, p
->second
));
375 compact_queue
.erase(p
);
376 logger
->inc(l_leveldb_compact_queue_merge
);
379 if (p
->second
>= start
&& p
->second
< end
) {
380 // merge with existing range to the left
381 compact_queue
.push_back(make_pair(p
->first
, end
));
382 compact_queue
.erase(p
);
383 logger
->inc(l_leveldb_compact_queue_merge
);
388 if (p
== compact_queue
.end()) {
389 // no merge, new entry.
390 compact_queue
.push_back(make_pair(start
, end
));
391 logger
->set(l_leveldb_compact_queue_len
, compact_queue
.size());
393 compact_queue_cond
.Signal();
394 if (!compact_thread
.is_started()) {
395 compact_thread
.create("levdbst_compact");