]> git.proxmox.com Git - ceph.git/blame - ceph/src/kv/LevelDBStore.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / kv / LevelDBStore.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3#include "LevelDBStore.h"
4
5#include <set>
6#include <map>
7#include <string>
31f18b77
FG
8#include <cerrno>
9
7c673cae 10using std::string;
31f18b77 11
7c673cae
FG
12#include "common/debug.h"
13#include "common/perf_counters.h"
14
31f18b77 15// re-include our assert to clobber the system one; fix dout:
11fdf7f2 16#include "include/ceph_assert.h"
31f18b77 17
7c673cae
FG
18#define dout_context cct
19#define dout_subsys ceph_subsys_leveldb
20#undef dout_prefix
21#define dout_prefix *_dout << "leveldb: "
22
23class CephLevelDBLogger : public leveldb::Logger {
24 CephContext *cct;
25public:
26 explicit CephLevelDBLogger(CephContext *c) : cct(c) {
27 cct->get();
28 }
29 ~CephLevelDBLogger() override {
30 cct->put();
31 }
32
33 // Write an entry to the log file with the specified format.
34 void Logv(const char* format, va_list ap) override {
35 dout(1);
36 char buf[65536];
37 vsnprintf(buf, sizeof(buf), format, ap);
38 *_dout << buf << dendl;
39 }
40};
41
42leveldb::Logger *create_leveldb_ceph_logger()
43{
44 return new CephLevelDBLogger(g_ceph_context);
45}
46
47int LevelDBStore::init(string option_str)
48{
49 // init defaults. caller can override these if they want
50 // prior to calling open.
11fdf7f2
TL
51 options.write_buffer_size = g_conf()->leveldb_write_buffer_size;
52 options.cache_size = g_conf()->leveldb_cache_size;
53 options.block_size = g_conf()->leveldb_block_size;
54 options.bloom_size = g_conf()->leveldb_bloom_size;
55 options.compression_enabled = g_conf()->leveldb_compression;
56 options.paranoid_checks = g_conf()->leveldb_paranoid;
57 options.max_open_files = g_conf()->leveldb_max_open_files;
58 options.log_file = g_conf()->leveldb_log;
7c673cae
FG
59 return 0;
60}
61
11fdf7f2
TL
62int LevelDBStore::open(ostream &out, const vector<ColumnFamily>& cfs) {
63 if (!cfs.empty()) {
64 ceph_abort_msg("Not implemented");
65 }
66 return do_open(out, false);
67}
7c673cae 68
11fdf7f2
TL
69int LevelDBStore::create_and_open(ostream &out, const vector<ColumnFamily>& cfs) {
70 if (!cfs.empty()) {
71 ceph_abort_msg("Not implemented");
72 }
73 return do_open(out, true);
74}
75
76int LevelDBStore::load_leveldb_options(bool create_if_missing, leveldb::Options &ldoptions)
77{
7c673cae
FG
78 if (options.write_buffer_size)
79 ldoptions.write_buffer_size = options.write_buffer_size;
80 if (options.max_open_files)
81 ldoptions.max_open_files = options.max_open_files;
82 if (options.cache_size) {
83 leveldb::Cache *_db_cache = leveldb::NewLRUCache(options.cache_size);
84 db_cache.reset(_db_cache);
85 ldoptions.block_cache = db_cache.get();
86 }
87 if (options.block_size)
88 ldoptions.block_size = options.block_size;
89 if (options.bloom_size) {
90#ifdef HAVE_LEVELDB_FILTER_POLICY
91 const leveldb::FilterPolicy *_filterpolicy =
92 leveldb::NewBloomFilterPolicy(options.bloom_size);
93 filterpolicy.reset(_filterpolicy);
94 ldoptions.filter_policy = filterpolicy.get();
95#else
11fdf7f2 96 ceph_abort_msg(0 == "bloom size set but installed leveldb doesn't support bloom filters");
7c673cae
FG
97#endif
98 }
99 if (options.compression_enabled)
100 ldoptions.compression = leveldb::kSnappyCompression;
101 else
102 ldoptions.compression = leveldb::kNoCompression;
103 if (options.block_restart_interval)
104 ldoptions.block_restart_interval = options.block_restart_interval;
105
106 ldoptions.error_if_exists = options.error_if_exists;
107 ldoptions.paranoid_checks = options.paranoid_checks;
108 ldoptions.create_if_missing = create_if_missing;
109
11fdf7f2 110 if (g_conf()->leveldb_log_to_ceph_log) {
7c673cae
FG
111 ceph_logger = new CephLevelDBLogger(g_ceph_context);
112 ldoptions.info_log = ceph_logger;
113 }
114
115 if (options.log_file.length()) {
116 leveldb::Env *env = leveldb::Env::Default();
117 env->NewLogger(options.log_file, &ldoptions.info_log);
118 }
11fdf7f2
TL
119 return 0;
120}
121
122int LevelDBStore::do_open(ostream &out, bool create_if_missing)
123{
124 leveldb::Options ldoptions;
125 int r = load_leveldb_options(create_if_missing, ldoptions);
126 if (r) {
127 dout(1) << "load leveldb options failed" << dendl;
128 return r;
129 }
7c673cae
FG
130
131 leveldb::DB *_db;
132 leveldb::Status status = leveldb::DB::Open(ldoptions, path, &_db);
133 db.reset(_db);
134 if (!status.ok()) {
135 out << status.ToString() << std::endl;
136 return -EINVAL;
137 }
138
139 PerfCountersBuilder plb(g_ceph_context, "leveldb", l_leveldb_first, l_leveldb_last);
140 plb.add_u64_counter(l_leveldb_gets, "leveldb_get", "Gets");
141 plb.add_u64_counter(l_leveldb_txns, "leveldb_transaction", "Transactions");
142 plb.add_time_avg(l_leveldb_get_latency, "leveldb_get_latency", "Get Latency");
143 plb.add_time_avg(l_leveldb_submit_latency, "leveldb_submit_latency", "Submit Latency");
144 plb.add_time_avg(l_leveldb_submit_sync_latency, "leveldb_submit_sync_latency", "Submit Sync Latency");
145 plb.add_u64_counter(l_leveldb_compact, "leveldb_compact", "Compactions");
146 plb.add_u64_counter(l_leveldb_compact_range, "leveldb_compact_range", "Compactions by range");
147 plb.add_u64_counter(l_leveldb_compact_queue_merge, "leveldb_compact_queue_merge", "Mergings of ranges in compaction queue");
148 plb.add_u64(l_leveldb_compact_queue_len, "leveldb_compact_queue_len", "Length of compaction queue");
149 logger = plb.create_perf_counters();
150 cct->get_perfcounters_collection()->add(logger);
151
11fdf7f2 152 if (g_conf()->leveldb_compact_on_mount) {
7c673cae
FG
153 derr << "Compacting leveldb store..." << dendl;
154 compact();
155 derr << "Finished compacting leveldb store" << dendl;
156 }
157 return 0;
158}
159
160int LevelDBStore::_test_init(const string& dir)
161{
162 leveldb::Options options;
163 options.create_if_missing = true;
164 leveldb::DB *db;
165 leveldb::Status status = leveldb::DB::Open(options, dir, &db);
166 delete db;
167 return status.ok() ? 0 : -EIO;
168}
169
170LevelDBStore::~LevelDBStore()
171{
172 close();
173 delete logger;
7c673cae
FG
174
175 // Ensure db is destroyed before dependent db_cache and filterpolicy
176 db.reset();
c07f9fc5 177 delete ceph_logger;
7c673cae
FG
178}
179
180void LevelDBStore::close()
181{
182 // stop compaction thread
9f95a23c 183 compact_queue_lock.lock();
7c673cae
FG
184 if (compact_thread.is_started()) {
185 compact_queue_stop = true;
9f95a23c
TL
186 compact_queue_cond.notify_all();
187 compact_queue_lock.unlock();
7c673cae
FG
188 compact_thread.join();
189 } else {
9f95a23c 190 compact_queue_lock.unlock();
7c673cae
FG
191 }
192
193 if (logger)
194 cct->get_perfcounters_collection()->remove(logger);
195}
196
11fdf7f2
TL
197int LevelDBStore::repair(std::ostream &out)
198{
199 leveldb::Options ldoptions;
200 int r = load_leveldb_options(false, ldoptions);
201 if (r) {
202 dout(1) << "load leveldb options failed" << dendl;
203 out << "load leveldb options failed" << std::endl;
204 return r;
205 }
206 leveldb::Status status = leveldb::RepairDB(path, ldoptions);
207 if (status.ok()) {
208 return 0;
209 } else {
210 out << "repair leveldb failed : " << status.ToString() << std::endl;
211 return 1;
212 }
213}
214
7c673cae
FG
215int LevelDBStore::submit_transaction(KeyValueDB::Transaction t)
216{
217 utime_t start = ceph_clock_now();
218 LevelDBTransactionImpl * _t =
219 static_cast<LevelDBTransactionImpl *>(t.get());
220 leveldb::Status s = db->Write(leveldb::WriteOptions(), &(_t->bat));
221 utime_t lat = ceph_clock_now() - start;
222 logger->inc(l_leveldb_txns);
223 logger->tinc(l_leveldb_submit_latency, lat);
224 return s.ok() ? 0 : -1;
225}
226
227int LevelDBStore::submit_transaction_sync(KeyValueDB::Transaction t)
228{
229 utime_t start = ceph_clock_now();
230 LevelDBTransactionImpl * _t =
231 static_cast<LevelDBTransactionImpl *>(t.get());
232 leveldb::WriteOptions options;
233 options.sync = true;
234 leveldb::Status s = db->Write(options, &(_t->bat));
235 utime_t lat = ceph_clock_now() - start;
236 logger->inc(l_leveldb_txns);
237 logger->tinc(l_leveldb_submit_sync_latency, lat);
238 return s.ok() ? 0 : -1;
239}
240
241void LevelDBStore::LevelDBTransactionImpl::set(
242 const string &prefix,
243 const string &k,
244 const bufferlist &to_set_bl)
245{
246 string key = combine_strings(prefix, k);
247 size_t bllen = to_set_bl.length();
248 // bufferlist::c_str() is non-constant, so we can't call c_str()
249 if (to_set_bl.is_contiguous() && bllen > 0) {
250 // bufferlist contains just one ptr or they're contiguous
251 bat.Put(leveldb::Slice(key), leveldb::Slice(to_set_bl.buffers().front().c_str(), bllen));
252 } else if ((bllen <= 32 * 1024) && (bllen > 0)) {
253 // 2+ bufferptrs that are not contiguopus
254 // allocate buffer on stack and copy bl contents to that buffer
255 // make sure the buffer isn't too large or we might crash here...
256 char* slicebuf = (char*) alloca(bllen);
257 leveldb::Slice newslice(slicebuf, bllen);
11fdf7f2
TL
258 for (const auto& node : to_set_bl.buffers()) {
259 const size_t ptrlen = node.length();
260 memcpy(static_cast<void*>(slicebuf), node.c_str(), ptrlen);
7c673cae
FG
261 slicebuf += ptrlen;
262 }
263 bat.Put(leveldb::Slice(key), newslice);
264 } else {
265 // 2+ bufferptrs that are not contiguous, and enormous in size
266 bufferlist val = to_set_bl;
267 bat.Put(leveldb::Slice(key), leveldb::Slice(val.c_str(), val.length()));
268 }
269}
270
271void LevelDBStore::LevelDBTransactionImpl::rmkey(const string &prefix,
272 const string &k)
273{
274 string key = combine_strings(prefix, k);
275 bat.Delete(leveldb::Slice(key));
276}
277
278void LevelDBStore::LevelDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
279{
280 KeyValueDB::Iterator it = db->get_iterator(prefix);
281 for (it->seek_to_first();
282 it->valid();
283 it->next()) {
284 bat.Delete(leveldb::Slice(combine_strings(prefix, it->key())));
285 }
286}
287
288void LevelDBStore::LevelDBTransactionImpl::rm_range_keys(const string &prefix, const string &start, const string &end)
289{
290 KeyValueDB::Iterator it = db->get_iterator(prefix);
291 it->lower_bound(start);
292 while (it->valid()) {
293 if (it->key() >= end) {
294 break;
295 }
296 bat.Delete(combine_strings(prefix, it->key()));
297 it->next();
298 }
299}
300
301int LevelDBStore::get(
302 const string &prefix,
303 const std::set<string> &keys,
304 std::map<string, bufferlist> *out)
305{
306 utime_t start = ceph_clock_now();
307 for (std::set<string>::const_iterator i = keys.begin();
308 i != keys.end(); ++i) {
309 std::string value;
310 std::string bound = combine_strings(prefix, *i);
311 auto status = db->Get(leveldb::ReadOptions(), leveldb::Slice(bound), &value);
312 if (status.ok())
313 (*out)[*i].append(value);
314 }
315 utime_t lat = ceph_clock_now() - start;
316 logger->inc(l_leveldb_gets);
317 logger->tinc(l_leveldb_get_latency, lat);
318 return 0;
319}
320
321int LevelDBStore::get(const string &prefix,
322 const string &key,
323 bufferlist *out)
324{
11fdf7f2 325 ceph_assert(out && (out->length() == 0));
7c673cae
FG
326 utime_t start = ceph_clock_now();
327 int r = 0;
328 string value, k;
329 leveldb::Status s;
330 k = combine_strings(prefix, key);
331 s = db->Get(leveldb::ReadOptions(), leveldb::Slice(k), &value);
332 if (s.ok()) {
333 out->append(value);
334 } else {
335 r = -ENOENT;
336 }
337 utime_t lat = ceph_clock_now() - start;
338 logger->inc(l_leveldb_gets);
339 logger->tinc(l_leveldb_get_latency, lat);
340 return r;
341}
342
343string LevelDBStore::combine_strings(const string &prefix, const string &value)
344{
345 string out = prefix;
346 out.push_back(0);
347 out.append(value);
348 return out;
349}
350
351bufferlist LevelDBStore::to_bufferlist(leveldb::Slice in)
352{
353 bufferlist bl;
354 bl.append(bufferptr(in.data(), in.size()));
355 return bl;
356}
357
358int LevelDBStore::split_key(leveldb::Slice in, string *prefix, string *key)
359{
360 size_t prefix_len = 0;
361
362 // Find separator inside Slice
363 char* separator = (char*) memchr(in.data(), 0, in.size());
364 if (separator == NULL)
365 return -EINVAL;
366 prefix_len = size_t(separator - in.data());
367 if (prefix_len >= in.size())
368 return -EINVAL;
369
370 if (prefix)
371 *prefix = string(in.data(), prefix_len);
372 if (key)
373 *key = string(separator+1, in.size() - prefix_len - 1);
374 return 0;
375}
376
377void LevelDBStore::compact()
378{
379 logger->inc(l_leveldb_compact);
380 db->CompactRange(NULL, NULL);
381}
382
383
384void LevelDBStore::compact_thread_entry()
385{
9f95a23c 386 std::unique_lock l{compact_queue_lock};
7c673cae
FG
387 while (!compact_queue_stop) {
388 while (!compact_queue.empty()) {
389 pair<string,string> range = compact_queue.front();
390 compact_queue.pop_front();
391 logger->set(l_leveldb_compact_queue_len, compact_queue.size());
9f95a23c 392 l.unlock();
7c673cae 393 logger->inc(l_leveldb_compact_range);
11fdf7f2
TL
394 if (range.first.empty() && range.second.empty()) {
395 compact();
396 } else {
397 compact_range(range.first, range.second);
398 }
9f95a23c 399 l.lock();
7c673cae
FG
400 continue;
401 }
11fdf7f2
TL
402 if (compact_queue_stop)
403 break;
9f95a23c 404 compact_queue_cond.wait(l);
7c673cae 405 }
7c673cae
FG
406}
407
408void LevelDBStore::compact_range_async(const string& start, const string& end)
409{
11fdf7f2 410 std::lock_guard l(compact_queue_lock);
7c673cae
FG
411
412 // try to merge adjacent ranges. this is O(n), but the queue should
413 // be short. note that we do not cover all overlap cases and merge
414 // opportunities here, but we capture the ones we currently need.
415 list< pair<string,string> >::iterator p = compact_queue.begin();
416 while (p != compact_queue.end()) {
417 if (p->first == start && p->second == end) {
418 // dup; no-op
419 return;
420 }
421 if (p->first <= end && p->first > start) {
422 // merge with existing range to the right
423 compact_queue.push_back(make_pair(start, p->second));
424 compact_queue.erase(p);
425 logger->inc(l_leveldb_compact_queue_merge);
426 break;
427 }
428 if (p->second >= start && p->second < end) {
429 // merge with existing range to the left
430 compact_queue.push_back(make_pair(p->first, end));
431 compact_queue.erase(p);
432 logger->inc(l_leveldb_compact_queue_merge);
433 break;
434 }
435 ++p;
436 }
437 if (p == compact_queue.end()) {
438 // no merge, new entry.
439 compact_queue.push_back(make_pair(start, end));
440 logger->set(l_leveldb_compact_queue_len, compact_queue.size());
441 }
9f95a23c 442 compact_queue_cond.notify_all();
7c673cae
FG
443 if (!compact_thread.is_started()) {
444 compact_thread.create("levdbst_compact");
445 }
446}