]> git.proxmox.com Git - ceph.git/blob - ceph/src/kv/LevelDBStore.h
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / kv / LevelDBStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #ifndef LEVEL_DB_STORE_H
4 #define LEVEL_DB_STORE_H
5
6 #include "include/types.h"
7 #include "include/buffer_fwd.h"
8 #include "KeyValueDB.h"
9 #include <set>
10 #include <map>
11 #include <string>
12 #include "include/memory.h"
13 #include <boost/scoped_ptr.hpp>
14 #include "leveldb/db.h"
15 #include "leveldb/env.h"
16 #include "leveldb/write_batch.h"
17 #include "leveldb/slice.h"
18 #include "leveldb/cache.h"
19 #ifdef HAVE_LEVELDB_FILTER_POLICY
20 #include "leveldb/filter_policy.h"
21 #endif
22
23 #include <errno.h>
24 #include "common/errno.h"
25 #include "common/dout.h"
26 #include "include/assert.h"
27 #include "common/Formatter.h"
28 #include "common/Cond.h"
29
30 #include "common/ceph_context.h"
31
32 class PerfCounters;
33
34 enum {
35 l_leveldb_first = 34300,
36 l_leveldb_gets,
37 l_leveldb_txns,
38 l_leveldb_get_latency,
39 l_leveldb_submit_latency,
40 l_leveldb_submit_sync_latency,
41 l_leveldb_compact,
42 l_leveldb_compact_range,
43 l_leveldb_compact_queue_merge,
44 l_leveldb_compact_queue_len,
45 l_leveldb_last,
46 };
47
48 extern leveldb::Logger *create_leveldb_ceph_logger();
49
50 class CephLevelDBLogger;
51
52 /**
53 * Uses LevelDB to implement the KeyValueDB interface
54 */
55 class LevelDBStore : public KeyValueDB {
56 CephContext *cct;
57 PerfCounters *logger;
58 CephLevelDBLogger *ceph_logger;
59 string path;
60 boost::scoped_ptr<leveldb::Cache> db_cache;
61 #ifdef HAVE_LEVELDB_FILTER_POLICY
62 boost::scoped_ptr<const leveldb::FilterPolicy> filterpolicy;
63 #endif
64 boost::scoped_ptr<leveldb::DB> db;
65
66 int do_open(ostream &out, bool create_if_missing);
67
68 // manage async compactions
69 Mutex compact_queue_lock;
70 Cond compact_queue_cond;
71 list< pair<string,string> > compact_queue;
72 bool compact_queue_stop;
73 class CompactThread : public Thread {
74 LevelDBStore *db;
75 public:
76 explicit CompactThread(LevelDBStore *d) : db(d) {}
77 void *entry() override {
78 db->compact_thread_entry();
79 return NULL;
80 }
81 friend class LevelDBStore;
82 } compact_thread;
83
84 void compact_thread_entry();
85
86 void compact_range(const string& start, const string& end) {
87 leveldb::Slice cstart(start);
88 leveldb::Slice cend(end);
89 db->CompactRange(&cstart, &cend);
90 }
91 void compact_range_async(const string& start, const string& end);
92
93 public:
94 /// compact the underlying leveldb store
95 void compact() override;
96
97 /// compact db for all keys with a given prefix
98 void compact_prefix(const string& prefix) override {
99 compact_range(prefix, past_prefix(prefix));
100 }
101 void compact_prefix_async(const string& prefix) override {
102 compact_range_async(prefix, past_prefix(prefix));
103 }
104 void compact_range(const string& prefix,
105 const string& start, const string& end) override {
106 compact_range(combine_strings(prefix, start), combine_strings(prefix, end));
107 }
108 void compact_range_async(const string& prefix,
109 const string& start, const string& end) override {
110 compact_range_async(combine_strings(prefix, start),
111 combine_strings(prefix, end));
112 }
113
114
115 /**
116 * options_t: Holds options which are minimally interpreted
117 * on initialization and then passed through to LevelDB.
118 * We transform a couple of these into actual LevelDB
119 * structures, but the rest are simply passed through unchanged. See
120 * leveldb/options.h for more precise details on each.
121 *
122 * Set them after constructing the LevelDBStore, but before calling
123 * open() or create_and_open().
124 */
125 struct options_t {
126 uint64_t write_buffer_size; /// in-memory write buffer size
127 int max_open_files; /// maximum number of files LevelDB can open at once
128 uint64_t cache_size; /// size of extra decompressed cache to use
129 uint64_t block_size; /// user data per block
130 int bloom_size; /// number of bits per entry to put in a bloom filter
131 bool compression_enabled; /// whether to use libsnappy compression or not
132
133 // don't change these ones. No, seriously
134 int block_restart_interval;
135 bool error_if_exists;
136 bool paranoid_checks;
137
138 string log_file;
139
140 options_t() :
141 write_buffer_size(0), //< 0 means default
142 max_open_files(0), //< 0 means default
143 cache_size(0), //< 0 means no cache (default)
144 block_size(0), //< 0 means default
145 bloom_size(0), //< 0 means no bloom filter (default)
146 compression_enabled(true), //< set to false for no compression
147 block_restart_interval(0), //< 0 means default
148 error_if_exists(false), //< set to true if you want to check nonexistence
149 paranoid_checks(false) //< set to true if you want paranoid checks
150 {}
151 } options;
152
153 LevelDBStore(CephContext *c, const string &path) :
154 cct(c),
155 logger(NULL),
156 ceph_logger(NULL),
157 path(path),
158 db_cache(NULL),
159 #ifdef HAVE_LEVELDB_FILTER_POLICY
160 filterpolicy(NULL),
161 #endif
162 compact_queue_lock("LevelDBStore::compact_thread_lock"),
163 compact_queue_stop(false),
164 compact_thread(this),
165 options()
166 {}
167
168 ~LevelDBStore() override;
169
170 static int _test_init(const string& dir);
171 int init(string option_str="") override;
172
173 /// Opens underlying db
174 int open(ostream &out) override {
175 return do_open(out, false);
176 }
177 /// Creates underlying db if missing and opens it
178 int create_and_open(ostream &out) override {
179 return do_open(out, true);
180 }
181
182 void close() override;
183
184 class LevelDBTransactionImpl : public KeyValueDB::TransactionImpl {
185 public:
186 leveldb::WriteBatch bat;
187 LevelDBStore *db;
188 explicit LevelDBTransactionImpl(LevelDBStore *db) : db(db) {}
189 void set(
190 const string &prefix,
191 const string &k,
192 const bufferlist &bl) override;
193 using KeyValueDB::TransactionImpl::set;
194 void rmkey(
195 const string &prefix,
196 const string &k) override;
197 void rmkeys_by_prefix(
198 const string &prefix
199 ) override;
200 virtual void rm_range_keys(
201 const string &prefix,
202 const string &start,
203 const string &end) override;
204
205 using KeyValueDB::TransactionImpl::rmkey;
206 };
207
208 KeyValueDB::Transaction get_transaction() override {
209 return std::make_shared<LevelDBTransactionImpl>(this);
210 }
211
212 int submit_transaction(KeyValueDB::Transaction t) override;
213 int submit_transaction_sync(KeyValueDB::Transaction t) override;
214 int get(
215 const string &prefix,
216 const std::set<string> &key,
217 std::map<string, bufferlist> *out
218 ) override;
219
220 int get(const string &prefix,
221 const string &key,
222 bufferlist *value) override;
223
224 using KeyValueDB::get;
225
226 class LevelDBWholeSpaceIteratorImpl :
227 public KeyValueDB::WholeSpaceIteratorImpl {
228 protected:
229 boost::scoped_ptr<leveldb::Iterator> dbiter;
230 public:
231 explicit LevelDBWholeSpaceIteratorImpl(leveldb::Iterator *iter) :
232 dbiter(iter) { }
233 ~LevelDBWholeSpaceIteratorImpl() override { }
234
235 int seek_to_first() override {
236 dbiter->SeekToFirst();
237 return dbiter->status().ok() ? 0 : -1;
238 }
239 int seek_to_first(const string &prefix) override {
240 leveldb::Slice slice_prefix(prefix);
241 dbiter->Seek(slice_prefix);
242 return dbiter->status().ok() ? 0 : -1;
243 }
244 int seek_to_last() override {
245 dbiter->SeekToLast();
246 return dbiter->status().ok() ? 0 : -1;
247 }
248 int seek_to_last(const string &prefix) override {
249 string limit = past_prefix(prefix);
250 leveldb::Slice slice_limit(limit);
251 dbiter->Seek(slice_limit);
252
253 if (!dbiter->Valid()) {
254 dbiter->SeekToLast();
255 } else {
256 dbiter->Prev();
257 }
258 return dbiter->status().ok() ? 0 : -1;
259 }
260 int upper_bound(const string &prefix, const string &after) override {
261 lower_bound(prefix, after);
262 if (valid()) {
263 pair<string,string> key = raw_key();
264 if (key.first == prefix && key.second == after)
265 next();
266 }
267 return dbiter->status().ok() ? 0 : -1;
268 }
269 int lower_bound(const string &prefix, const string &to) override {
270 string bound = combine_strings(prefix, to);
271 leveldb::Slice slice_bound(bound);
272 dbiter->Seek(slice_bound);
273 return dbiter->status().ok() ? 0 : -1;
274 }
275 bool valid() override {
276 return dbiter->Valid();
277 }
278 int next() override {
279 if (valid())
280 dbiter->Next();
281 return dbiter->status().ok() ? 0 : -1;
282 }
283 int prev() override {
284 if (valid())
285 dbiter->Prev();
286 return dbiter->status().ok() ? 0 : -1;
287 }
288 string key() override {
289 string out_key;
290 split_key(dbiter->key(), 0, &out_key);
291 return out_key;
292 }
293 pair<string,string> raw_key() override {
294 string prefix, key;
295 split_key(dbiter->key(), &prefix, &key);
296 return make_pair(prefix, key);
297 }
298 bool raw_key_is_prefixed(const string &prefix) override {
299 leveldb::Slice key = dbiter->key();
300 if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) {
301 return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0;
302 } else {
303 return false;
304 }
305 }
306 bufferlist value() override {
307 return to_bufferlist(dbiter->value());
308 }
309
310 bufferptr value_as_ptr() override {
311 leveldb::Slice data = dbiter->value();
312 return bufferptr(data.data(), data.size());
313 }
314
315 int status() override {
316 return dbiter->status().ok() ? 0 : -1;
317 }
318 };
319
320 /// Utility
321 static string combine_strings(const string &prefix, const string &value);
322 static int split_key(leveldb::Slice in, string *prefix, string *key);
323 static bufferlist to_bufferlist(leveldb::Slice in);
324 static string past_prefix(const string &prefix) {
325 string limit = prefix;
326 limit.push_back(1);
327 return limit;
328 }
329
330 uint64_t get_estimated_size(map<string,uint64_t> &extra) override {
331 DIR *store_dir = opendir(path.c_str());
332 if (!store_dir) {
333 lderr(cct) << __func__ << " something happened opening the store: "
334 << cpp_strerror(errno) << dendl;
335 return 0;
336 }
337
338 uint64_t total_size = 0;
339 uint64_t sst_size = 0;
340 uint64_t log_size = 0;
341 uint64_t misc_size = 0;
342
343 struct dirent *entry = NULL;
344 while ((entry = readdir(store_dir)) != NULL) {
345 string n(entry->d_name);
346
347 if (n == "." || n == "..")
348 continue;
349
350 string fpath = path + '/' + n;
351 struct stat s;
352 int err = stat(fpath.c_str(), &s);
353 if (err < 0)
354 err = -errno;
355 // we may race against leveldb while reading files; this should only
356 // happen when those files are being updated, data is being shuffled
357 // and files get removed, in which case there's not much of a problem
358 // as we'll get to them next time around.
359 if (err == -ENOENT) {
360 continue;
361 }
362 if (err < 0) {
363 lderr(cct) << __func__ << " error obtaining stats for " << fpath
364 << ": " << cpp_strerror(err) << dendl;
365 goto err;
366 }
367
368 size_t pos = n.find_last_of('.');
369 if (pos == string::npos) {
370 misc_size += s.st_size;
371 continue;
372 }
373
374 string ext = n.substr(pos+1);
375 if (ext == "sst") {
376 sst_size += s.st_size;
377 } else if (ext == "log") {
378 log_size += s.st_size;
379 } else {
380 misc_size += s.st_size;
381 }
382 }
383
384 total_size = sst_size + log_size + misc_size;
385
386 extra["sst"] = sst_size;
387 extra["log"] = log_size;
388 extra["misc"] = misc_size;
389 extra["total"] = total_size;
390
391 err:
392 closedir(store_dir);
393 return total_size;
394 }
395
396
397 protected:
398 WholeSpaceIterator _get_iterator() override {
399 return std::make_shared<LevelDBWholeSpaceIteratorImpl>(
400 db->NewIterator(leveldb::ReadOptions()));
401 }
402
403 };
404
405 #endif