]> git.proxmox.com Git - ceph.git/blame - ceph/src/kv/LevelDBStore.h
bump version to 12.2.1-pve3
[ceph.git] / ceph / src / kv / LevelDBStore.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3#ifndef LEVEL_DB_STORE_H
4#define LEVEL_DB_STORE_H
5
6#include "include/types.h"
7#include "include/buffer_fwd.h"
8#include "KeyValueDB.h"
9#include <set>
10#include <map>
11#include <string>
12#include "include/memory.h"
13#include <boost/scoped_ptr.hpp>
14#include "leveldb/db.h"
15#include "leveldb/env.h"
16#include "leveldb/write_batch.h"
17#include "leveldb/slice.h"
18#include "leveldb/cache.h"
19#ifdef HAVE_LEVELDB_FILTER_POLICY
20#include "leveldb/filter_policy.h"
21#endif
22
23#include <errno.h>
24#include "common/errno.h"
25#include "common/dout.h"
26#include "include/assert.h"
27#include "common/Formatter.h"
28#include "common/Cond.h"
29
30#include "common/ceph_context.h"
31
31f18b77
FG
32// reinclude our assert to clobber the system one
33# include "include/assert.h"
34
7c673cae
FG
35class PerfCounters;
36
37enum {
38 l_leveldb_first = 34300,
39 l_leveldb_gets,
40 l_leveldb_txns,
41 l_leveldb_get_latency,
42 l_leveldb_submit_latency,
43 l_leveldb_submit_sync_latency,
44 l_leveldb_compact,
45 l_leveldb_compact_range,
46 l_leveldb_compact_queue_merge,
47 l_leveldb_compact_queue_len,
48 l_leveldb_last,
49};
50
51extern leveldb::Logger *create_leveldb_ceph_logger();
52
53class CephLevelDBLogger;
54
55/**
56 * Uses LevelDB to implement the KeyValueDB interface
57 */
58class LevelDBStore : public KeyValueDB {
59 CephContext *cct;
60 PerfCounters *logger;
61 CephLevelDBLogger *ceph_logger;
62 string path;
63 boost::scoped_ptr<leveldb::Cache> db_cache;
64#ifdef HAVE_LEVELDB_FILTER_POLICY
65 boost::scoped_ptr<const leveldb::FilterPolicy> filterpolicy;
66#endif
67 boost::scoped_ptr<leveldb::DB> db;
68
69 int do_open(ostream &out, bool create_if_missing);
70
71 // manage async compactions
72 Mutex compact_queue_lock;
73 Cond compact_queue_cond;
74 list< pair<string,string> > compact_queue;
75 bool compact_queue_stop;
76 class CompactThread : public Thread {
77 LevelDBStore *db;
78 public:
79 explicit CompactThread(LevelDBStore *d) : db(d) {}
80 void *entry() override {
81 db->compact_thread_entry();
82 return NULL;
83 }
84 friend class LevelDBStore;
85 } compact_thread;
86
87 void compact_thread_entry();
88
89 void compact_range(const string& start, const string& end) {
90 leveldb::Slice cstart(start);
91 leveldb::Slice cend(end);
92 db->CompactRange(&cstart, &cend);
93 }
94 void compact_range_async(const string& start, const string& end);
95
96public:
97 /// compact the underlying leveldb store
98 void compact() override;
99
100 /// compact db for all keys with a given prefix
101 void compact_prefix(const string& prefix) override {
102 compact_range(prefix, past_prefix(prefix));
103 }
104 void compact_prefix_async(const string& prefix) override {
105 compact_range_async(prefix, past_prefix(prefix));
106 }
107 void compact_range(const string& prefix,
108 const string& start, const string& end) override {
109 compact_range(combine_strings(prefix, start), combine_strings(prefix, end));
110 }
111 void compact_range_async(const string& prefix,
112 const string& start, const string& end) override {
113 compact_range_async(combine_strings(prefix, start),
114 combine_strings(prefix, end));
115 }
116
117
118 /**
119 * options_t: Holds options which are minimally interpreted
120 * on initialization and then passed through to LevelDB.
121 * We transform a couple of these into actual LevelDB
122 * structures, but the rest are simply passed through unchanged. See
123 * leveldb/options.h for more precise details on each.
124 *
125 * Set them after constructing the LevelDBStore, but before calling
126 * open() or create_and_open().
127 */
128 struct options_t {
129 uint64_t write_buffer_size; /// in-memory write buffer size
130 int max_open_files; /// maximum number of files LevelDB can open at once
131 uint64_t cache_size; /// size of extra decompressed cache to use
132 uint64_t block_size; /// user data per block
133 int bloom_size; /// number of bits per entry to put in a bloom filter
134 bool compression_enabled; /// whether to use libsnappy compression or not
135
136 // don't change these ones. No, seriously
137 int block_restart_interval;
138 bool error_if_exists;
139 bool paranoid_checks;
140
141 string log_file;
142
143 options_t() :
144 write_buffer_size(0), //< 0 means default
145 max_open_files(0), //< 0 means default
146 cache_size(0), //< 0 means no cache (default)
147 block_size(0), //< 0 means default
148 bloom_size(0), //< 0 means no bloom filter (default)
149 compression_enabled(true), //< set to false for no compression
150 block_restart_interval(0), //< 0 means default
151 error_if_exists(false), //< set to true if you want to check nonexistence
152 paranoid_checks(false) //< set to true if you want paranoid checks
153 {}
154 } options;
155
156 LevelDBStore(CephContext *c, const string &path) :
157 cct(c),
158 logger(NULL),
159 ceph_logger(NULL),
160 path(path),
161 db_cache(NULL),
162#ifdef HAVE_LEVELDB_FILTER_POLICY
163 filterpolicy(NULL),
164#endif
165 compact_queue_lock("LevelDBStore::compact_thread_lock"),
166 compact_queue_stop(false),
167 compact_thread(this),
168 options()
169 {}
170
171 ~LevelDBStore() override;
172
173 static int _test_init(const string& dir);
174 int init(string option_str="") override;
175
176 /// Opens underlying db
177 int open(ostream &out) override {
178 return do_open(out, false);
179 }
180 /// Creates underlying db if missing and opens it
181 int create_and_open(ostream &out) override {
182 return do_open(out, true);
183 }
184
185 void close() override;
186
187 class LevelDBTransactionImpl : public KeyValueDB::TransactionImpl {
188 public:
189 leveldb::WriteBatch bat;
190 LevelDBStore *db;
191 explicit LevelDBTransactionImpl(LevelDBStore *db) : db(db) {}
192 void set(
193 const string &prefix,
194 const string &k,
195 const bufferlist &bl) override;
196 using KeyValueDB::TransactionImpl::set;
197 void rmkey(
198 const string &prefix,
199 const string &k) override;
200 void rmkeys_by_prefix(
201 const string &prefix
202 ) override;
203 virtual void rm_range_keys(
204 const string &prefix,
205 const string &start,
206 const string &end) override;
207
208 using KeyValueDB::TransactionImpl::rmkey;
209 };
210
211 KeyValueDB::Transaction get_transaction() override {
212 return std::make_shared<LevelDBTransactionImpl>(this);
213 }
214
215 int submit_transaction(KeyValueDB::Transaction t) override;
216 int submit_transaction_sync(KeyValueDB::Transaction t) override;
217 int get(
218 const string &prefix,
219 const std::set<string> &key,
220 std::map<string, bufferlist> *out
221 ) override;
222
223 int get(const string &prefix,
224 const string &key,
225 bufferlist *value) override;
226
227 using KeyValueDB::get;
228
229 class LevelDBWholeSpaceIteratorImpl :
230 public KeyValueDB::WholeSpaceIteratorImpl {
231 protected:
232 boost::scoped_ptr<leveldb::Iterator> dbiter;
233 public:
234 explicit LevelDBWholeSpaceIteratorImpl(leveldb::Iterator *iter) :
235 dbiter(iter) { }
236 ~LevelDBWholeSpaceIteratorImpl() override { }
237
238 int seek_to_first() override {
239 dbiter->SeekToFirst();
240 return dbiter->status().ok() ? 0 : -1;
241 }
242 int seek_to_first(const string &prefix) override {
243 leveldb::Slice slice_prefix(prefix);
244 dbiter->Seek(slice_prefix);
245 return dbiter->status().ok() ? 0 : -1;
246 }
247 int seek_to_last() override {
248 dbiter->SeekToLast();
249 return dbiter->status().ok() ? 0 : -1;
250 }
251 int seek_to_last(const string &prefix) override {
252 string limit = past_prefix(prefix);
253 leveldb::Slice slice_limit(limit);
254 dbiter->Seek(slice_limit);
255
256 if (!dbiter->Valid()) {
257 dbiter->SeekToLast();
258 } else {
259 dbiter->Prev();
260 }
261 return dbiter->status().ok() ? 0 : -1;
262 }
263 int upper_bound(const string &prefix, const string &after) override {
264 lower_bound(prefix, after);
265 if (valid()) {
266 pair<string,string> key = raw_key();
267 if (key.first == prefix && key.second == after)
268 next();
269 }
270 return dbiter->status().ok() ? 0 : -1;
271 }
272 int lower_bound(const string &prefix, const string &to) override {
273 string bound = combine_strings(prefix, to);
274 leveldb::Slice slice_bound(bound);
275 dbiter->Seek(slice_bound);
276 return dbiter->status().ok() ? 0 : -1;
277 }
278 bool valid() override {
279 return dbiter->Valid();
280 }
281 int next() override {
282 if (valid())
283 dbiter->Next();
284 return dbiter->status().ok() ? 0 : -1;
285 }
286 int prev() override {
287 if (valid())
288 dbiter->Prev();
289 return dbiter->status().ok() ? 0 : -1;
290 }
291 string key() override {
292 string out_key;
293 split_key(dbiter->key(), 0, &out_key);
294 return out_key;
295 }
296 pair<string,string> raw_key() override {
297 string prefix, key;
298 split_key(dbiter->key(), &prefix, &key);
299 return make_pair(prefix, key);
300 }
301 bool raw_key_is_prefixed(const string &prefix) override {
302 leveldb::Slice key = dbiter->key();
303 if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) {
304 return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0;
305 } else {
306 return false;
307 }
308 }
309 bufferlist value() override {
310 return to_bufferlist(dbiter->value());
311 }
312
313 bufferptr value_as_ptr() override {
314 leveldb::Slice data = dbiter->value();
315 return bufferptr(data.data(), data.size());
316 }
317
318 int status() override {
319 return dbiter->status().ok() ? 0 : -1;
320 }
321 };
322
323 /// Utility
324 static string combine_strings(const string &prefix, const string &value);
325 static int split_key(leveldb::Slice in, string *prefix, string *key);
326 static bufferlist to_bufferlist(leveldb::Slice in);
327 static string past_prefix(const string &prefix) {
328 string limit = prefix;
329 limit.push_back(1);
330 return limit;
331 }
332
333 uint64_t get_estimated_size(map<string,uint64_t> &extra) override {
334 DIR *store_dir = opendir(path.c_str());
335 if (!store_dir) {
336 lderr(cct) << __func__ << " something happened opening the store: "
337 << cpp_strerror(errno) << dendl;
338 return 0;
339 }
340
341 uint64_t total_size = 0;
342 uint64_t sst_size = 0;
343 uint64_t log_size = 0;
344 uint64_t misc_size = 0;
345
346 struct dirent *entry = NULL;
347 while ((entry = readdir(store_dir)) != NULL) {
348 string n(entry->d_name);
349
350 if (n == "." || n == "..")
351 continue;
352
353 string fpath = path + '/' + n;
354 struct stat s;
355 int err = stat(fpath.c_str(), &s);
356 if (err < 0)
357 err = -errno;
358 // we may race against leveldb while reading files; this should only
359 // happen when those files are being updated, data is being shuffled
360 // and files get removed, in which case there's not much of a problem
361 // as we'll get to them next time around.
362 if (err == -ENOENT) {
363 continue;
364 }
365 if (err < 0) {
366 lderr(cct) << __func__ << " error obtaining stats for " << fpath
367 << ": " << cpp_strerror(err) << dendl;
368 goto err;
369 }
370
371 size_t pos = n.find_last_of('.');
372 if (pos == string::npos) {
373 misc_size += s.st_size;
374 continue;
375 }
376
377 string ext = n.substr(pos+1);
378 if (ext == "sst") {
379 sst_size += s.st_size;
380 } else if (ext == "log") {
381 log_size += s.st_size;
382 } else {
383 misc_size += s.st_size;
384 }
385 }
386
387 total_size = sst_size + log_size + misc_size;
388
389 extra["sst"] = sst_size;
390 extra["log"] = log_size;
391 extra["misc"] = misc_size;
392 extra["total"] = total_size;
393
394err:
395 closedir(store_dir);
396 return total_size;
397 }
398
399
400protected:
401 WholeSpaceIterator _get_iterator() override {
402 return std::make_shared<LevelDBWholeSpaceIteratorImpl>(
403 db->NewIterator(leveldb::ReadOptions()));
404 }
405
406};
407
408#endif