]> git.proxmox.com Git - ceph.git/blame - ceph/src/kv/LevelDBStore.h
buildsys: switch source download to quincy
[ceph.git] / ceph / src / kv / LevelDBStore.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3#ifndef LEVEL_DB_STORE_H
4#define LEVEL_DB_STORE_H
5
6#include "include/types.h"
7#include "include/buffer_fwd.h"
8#include "KeyValueDB.h"
9#include <set>
10#include <map>
11#include <string>
7c673cae
FG
12#include <boost/scoped_ptr.hpp>
13#include "leveldb/db.h"
14#include "leveldb/env.h"
15#include "leveldb/write_batch.h"
16#include "leveldb/slice.h"
17#include "leveldb/cache.h"
18#ifdef HAVE_LEVELDB_FILTER_POLICY
19#include "leveldb/filter_policy.h"
20#endif
21
22#include <errno.h>
23#include "common/errno.h"
24#include "common/dout.h"
11fdf7f2 25#include "include/ceph_assert.h"
7c673cae
FG
26#include "common/Formatter.h"
27#include "common/Cond.h"
28
29#include "common/ceph_context.h"
9f95a23c 30#include "include/common_fwd.h"
7c673cae 31
31f18b77 32// reinclude our assert to clobber the system one
11fdf7f2 33# include "include/ceph_assert.h"
31f18b77 34
7c673cae
FG
35enum {
36 l_leveldb_first = 34300,
37 l_leveldb_gets,
38 l_leveldb_txns,
39 l_leveldb_get_latency,
40 l_leveldb_submit_latency,
41 l_leveldb_submit_sync_latency,
42 l_leveldb_compact,
43 l_leveldb_compact_range,
44 l_leveldb_compact_queue_merge,
45 l_leveldb_compact_queue_len,
46 l_leveldb_last,
47};
48
49extern leveldb::Logger *create_leveldb_ceph_logger();
50
51class CephLevelDBLogger;
52
53/**
54 * Uses LevelDB to implement the KeyValueDB interface
55 */
56class LevelDBStore : public KeyValueDB {
57 CephContext *cct;
58 PerfCounters *logger;
59 CephLevelDBLogger *ceph_logger;
f67539c2 60 std::string path;
7c673cae
FG
61 boost::scoped_ptr<leveldb::Cache> db_cache;
62#ifdef HAVE_LEVELDB_FILTER_POLICY
63 boost::scoped_ptr<const leveldb::FilterPolicy> filterpolicy;
64#endif
65 boost::scoped_ptr<leveldb::DB> db;
66
11fdf7f2 67 int load_leveldb_options(bool create_if_missing, leveldb::Options &opts);
f67539c2 68 int do_open(std::ostream &out, bool create_if_missing);
7c673cae
FG
69
70 // manage async compactions
9f95a23c
TL
71 ceph::mutex compact_queue_lock =
72 ceph::make_mutex("LevelDBStore::compact_thread_lock");
73 ceph::condition_variable compact_queue_cond;
f67539c2 74 std::list<std::pair<std::string, std::string>> compact_queue;
7c673cae
FG
75 bool compact_queue_stop;
76 class CompactThread : public Thread {
77 LevelDBStore *db;
78 public:
79 explicit CompactThread(LevelDBStore *d) : db(d) {}
80 void *entry() override {
81 db->compact_thread_entry();
82 return NULL;
83 }
84 friend class LevelDBStore;
85 } compact_thread;
86
87 void compact_thread_entry();
88
f67539c2 89 void compact_range(const std::string& start, const std::string& end) {
7c673cae
FG
90 leveldb::Slice cstart(start);
91 leveldb::Slice cend(end);
92 db->CompactRange(&cstart, &cend);
93 }
f67539c2 94 void compact_range_async(const std::string& start, const std::string& end);
7c673cae
FG
95
96public:
97 /// compact the underlying leveldb store
98 void compact() override;
99
11fdf7f2 100 void compact_async() override {
f67539c2 101 compact_range_async({}, {});
11fdf7f2
TL
102 }
103
7c673cae 104 /// compact db for all keys with a given prefix
f67539c2 105 void compact_prefix(const std::string& prefix) override {
7c673cae
FG
106 compact_range(prefix, past_prefix(prefix));
107 }
f67539c2 108 void compact_prefix_async(const std::string& prefix) override {
7c673cae
FG
109 compact_range_async(prefix, past_prefix(prefix));
110 }
f67539c2
TL
111 void compact_range(const std::string& prefix,
112 const std::string& start, const std::string& end) override {
7c673cae
FG
113 compact_range(combine_strings(prefix, start), combine_strings(prefix, end));
114 }
f67539c2
TL
115 void compact_range_async(const std::string& prefix,
116 const std::string& start, const std::string& end) override {
7c673cae
FG
117 compact_range_async(combine_strings(prefix, start),
118 combine_strings(prefix, end));
119 }
120
121
122 /**
123 * options_t: Holds options which are minimally interpreted
124 * on initialization and then passed through to LevelDB.
125 * We transform a couple of these into actual LevelDB
126 * structures, but the rest are simply passed through unchanged. See
127 * leveldb/options.h for more precise details on each.
128 *
129 * Set them after constructing the LevelDBStore, but before calling
130 * open() or create_and_open().
131 */
132 struct options_t {
133 uint64_t write_buffer_size; /// in-memory write buffer size
134 int max_open_files; /// maximum number of files LevelDB can open at once
135 uint64_t cache_size; /// size of extra decompressed cache to use
136 uint64_t block_size; /// user data per block
137 int bloom_size; /// number of bits per entry to put in a bloom filter
138 bool compression_enabled; /// whether to use libsnappy compression or not
139
140 // don't change these ones. No, seriously
141 int block_restart_interval;
142 bool error_if_exists;
143 bool paranoid_checks;
144
f67539c2 145 std::string log_file;
7c673cae
FG
146
147 options_t() :
148 write_buffer_size(0), //< 0 means default
149 max_open_files(0), //< 0 means default
150 cache_size(0), //< 0 means no cache (default)
151 block_size(0), //< 0 means default
152 bloom_size(0), //< 0 means no bloom filter (default)
153 compression_enabled(true), //< set to false for no compression
154 block_restart_interval(0), //< 0 means default
155 error_if_exists(false), //< set to true if you want to check nonexistence
156 paranoid_checks(false) //< set to true if you want paranoid checks
157 {}
158 } options;
159
f67539c2 160 LevelDBStore(CephContext *c, const std::string &path) :
7c673cae
FG
161 cct(c),
162 logger(NULL),
163 ceph_logger(NULL),
164 path(path),
165 db_cache(NULL),
166#ifdef HAVE_LEVELDB_FILTER_POLICY
167 filterpolicy(NULL),
168#endif
7c673cae
FG
169 compact_queue_stop(false),
170 compact_thread(this),
171 options()
172 {}
173
174 ~LevelDBStore() override;
175
f67539c2
TL
176 static int _test_init(const std::string& dir);
177 int init(std::string option_str="") override;
7c673cae
FG
178
179 /// Opens underlying db
f67539c2 180 int open(std::ostream &out, const std::string& cfs="") override;
7c673cae 181 /// Creates underlying db if missing and opens it
f67539c2 182 int create_and_open(std::ostream &out, const std::string& cfs="") override;
7c673cae
FG
183
184 void close() override;
185
3efd9988
FG
186 PerfCounters *get_perf_counters() override
187 {
188 return logger;
189 }
11fdf7f2 190 int repair(std::ostream &out) override;
3efd9988 191
7c673cae
FG
192 class LevelDBTransactionImpl : public KeyValueDB::TransactionImpl {
193 public:
194 leveldb::WriteBatch bat;
195 LevelDBStore *db;
196 explicit LevelDBTransactionImpl(LevelDBStore *db) : db(db) {}
197 void set(
f67539c2
TL
198 const std::string &prefix,
199 const std::string &k,
200 const ceph::buffer::list &bl) override;
7c673cae
FG
201 using KeyValueDB::TransactionImpl::set;
202 void rmkey(
f67539c2
TL
203 const std::string &prefix,
204 const std::string &k) override;
7c673cae 205 void rmkeys_by_prefix(
f67539c2 206 const std::string &prefix
7c673cae
FG
207 ) override;
208 virtual void rm_range_keys(
f67539c2
TL
209 const std::string &prefix,
210 const std::string &start,
211 const std::string &end) override;
7c673cae
FG
212
213 using KeyValueDB::TransactionImpl::rmkey;
214 };
215
216 KeyValueDB::Transaction get_transaction() override {
217 return std::make_shared<LevelDBTransactionImpl>(this);
218 }
219
220 int submit_transaction(KeyValueDB::Transaction t) override;
221 int submit_transaction_sync(KeyValueDB::Transaction t) override;
222 int get(
f67539c2
TL
223 const std::string &prefix,
224 const std::set<std::string> &key,
225 std::map<std::string, ceph::buffer::list> *out
7c673cae
FG
226 ) override;
227
f67539c2
TL
228 int get(const std::string &prefix,
229 const std::string &key,
230 ceph::buffer::list *value) override;
7c673cae
FG
231
232 using KeyValueDB::get;
233
234 class LevelDBWholeSpaceIteratorImpl :
235 public KeyValueDB::WholeSpaceIteratorImpl {
236 protected:
237 boost::scoped_ptr<leveldb::Iterator> dbiter;
238 public:
239 explicit LevelDBWholeSpaceIteratorImpl(leveldb::Iterator *iter) :
240 dbiter(iter) { }
241 ~LevelDBWholeSpaceIteratorImpl() override { }
242
243 int seek_to_first() override {
244 dbiter->SeekToFirst();
245 return dbiter->status().ok() ? 0 : -1;
246 }
f67539c2 247 int seek_to_first(const std::string &prefix) override {
7c673cae
FG
248 leveldb::Slice slice_prefix(prefix);
249 dbiter->Seek(slice_prefix);
250 return dbiter->status().ok() ? 0 : -1;
251 }
252 int seek_to_last() override {
253 dbiter->SeekToLast();
254 return dbiter->status().ok() ? 0 : -1;
255 }
f67539c2
TL
256 int seek_to_last(const std::string &prefix) override {
257 std::string limit = past_prefix(prefix);
7c673cae
FG
258 leveldb::Slice slice_limit(limit);
259 dbiter->Seek(slice_limit);
260
261 if (!dbiter->Valid()) {
262 dbiter->SeekToLast();
263 } else {
264 dbiter->Prev();
265 }
266 return dbiter->status().ok() ? 0 : -1;
267 }
f67539c2 268 int upper_bound(const std::string &prefix, const std::string &after) override {
7c673cae
FG
269 lower_bound(prefix, after);
270 if (valid()) {
f67539c2 271 std::pair<std::string,std::string> key = raw_key();
7c673cae
FG
272 if (key.first == prefix && key.second == after)
273 next();
274 }
275 return dbiter->status().ok() ? 0 : -1;
276 }
f67539c2
TL
277 int lower_bound(const std::string &prefix, const std::string &to) override {
278 std::string bound = combine_strings(prefix, to);
7c673cae
FG
279 leveldb::Slice slice_bound(bound);
280 dbiter->Seek(slice_bound);
281 return dbiter->status().ok() ? 0 : -1;
282 }
283 bool valid() override {
284 return dbiter->Valid();
285 }
286 int next() override {
287 if (valid())
288 dbiter->Next();
289 return dbiter->status().ok() ? 0 : -1;
290 }
291 int prev() override {
292 if (valid())
293 dbiter->Prev();
294 return dbiter->status().ok() ? 0 : -1;
295 }
f67539c2
TL
296 std::string key() override {
297 std::string out_key;
7c673cae
FG
298 split_key(dbiter->key(), 0, &out_key);
299 return out_key;
300 }
f67539c2
TL
301 std::pair<std::string,std::string> raw_key() override {
302 std::string prefix, key;
7c673cae 303 split_key(dbiter->key(), &prefix, &key);
f67539c2 304 return std::make_pair(prefix, key);
7c673cae 305 }
f67539c2 306 bool raw_key_is_prefixed(const std::string &prefix) override {
7c673cae
FG
307 leveldb::Slice key = dbiter->key();
308 if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) {
309 return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0;
310 } else {
311 return false;
312 }
313 }
f67539c2 314 ceph::buffer::list value() override {
7c673cae
FG
315 return to_bufferlist(dbiter->value());
316 }
317
f67539c2 318 ceph::bufferptr value_as_ptr() override {
7c673cae 319 leveldb::Slice data = dbiter->value();
f67539c2 320 return ceph::bufferptr(data.data(), data.size());
7c673cae
FG
321 }
322
323 int status() override {
324 return dbiter->status().ok() ? 0 : -1;
325 }
326 };
327
328 /// Utility
f67539c2
TL
329 static std::string combine_strings(const std::string &prefix, const std::string &value);
330 static int split_key(leveldb::Slice in, std::string *prefix, std::string *key);
331 static ceph::buffer::list to_bufferlist(leveldb::Slice in);
332 static std::string past_prefix(const std::string &prefix) {
333 std::string limit = prefix;
7c673cae
FG
334 limit.push_back(1);
335 return limit;
336 }
337
f67539c2 338 uint64_t get_estimated_size(std::map<std::string,std::uint64_t> &extra) override {
7c673cae
FG
339 DIR *store_dir = opendir(path.c_str());
340 if (!store_dir) {
341 lderr(cct) << __func__ << " something happened opening the store: "
342 << cpp_strerror(errno) << dendl;
343 return 0;
344 }
345
346 uint64_t total_size = 0;
347 uint64_t sst_size = 0;
348 uint64_t log_size = 0;
349 uint64_t misc_size = 0;
350
351 struct dirent *entry = NULL;
352 while ((entry = readdir(store_dir)) != NULL) {
f67539c2 353 std::string n(entry->d_name);
7c673cae
FG
354
355 if (n == "." || n == "..")
356 continue;
357
f67539c2 358 std::string fpath = path + '/' + n;
7c673cae
FG
359 struct stat s;
360 int err = stat(fpath.c_str(), &s);
361 if (err < 0)
362 err = -errno;
363 // we may race against leveldb while reading files; this should only
364 // happen when those files are being updated, data is being shuffled
365 // and files get removed, in which case there's not much of a problem
366 // as we'll get to them next time around.
367 if (err == -ENOENT) {
368 continue;
369 }
370 if (err < 0) {
371 lderr(cct) << __func__ << " error obtaining stats for " << fpath
372 << ": " << cpp_strerror(err) << dendl;
373 goto err;
374 }
375
376 size_t pos = n.find_last_of('.');
f67539c2 377 if (pos == std::string::npos) {
7c673cae
FG
378 misc_size += s.st_size;
379 continue;
380 }
381
f67539c2 382 std::string ext = n.substr(pos+1);
7c673cae
FG
383 if (ext == "sst") {
384 sst_size += s.st_size;
385 } else if (ext == "log") {
386 log_size += s.st_size;
387 } else {
388 misc_size += s.st_size;
389 }
390 }
391
392 total_size = sst_size + log_size + misc_size;
393
394 extra["sst"] = sst_size;
395 extra["log"] = log_size;
396 extra["misc"] = misc_size;
397 extra["total"] = total_size;
398
399err:
400 closedir(store_dir);
401 return total_size;
402 }
403
404
f67539c2 405 WholeSpaceIterator get_wholespace_iterator(IteratorOpts opts = 0) override {
7c673cae
FG
406 return std::make_shared<LevelDBWholeSpaceIteratorImpl>(
407 db->NewIterator(leveldb::ReadOptions()));
408 }
409
410};
411
412#endif