]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | #ifndef LEVEL_DB_STORE_H | |
4 | #define LEVEL_DB_STORE_H | |
5 | ||
6 | #include "include/types.h" | |
7 | #include "include/buffer_fwd.h" | |
8 | #include "KeyValueDB.h" | |
9 | #include <set> | |
10 | #include <map> | |
11 | #include <string> | |
7c673cae FG |
12 | #include <boost/scoped_ptr.hpp> |
13 | #include "leveldb/db.h" | |
14 | #include "leveldb/env.h" | |
15 | #include "leveldb/write_batch.h" | |
16 | #include "leveldb/slice.h" | |
17 | #include "leveldb/cache.h" | |
18 | #ifdef HAVE_LEVELDB_FILTER_POLICY | |
19 | #include "leveldb/filter_policy.h" | |
20 | #endif | |
21 | ||
22 | #include <errno.h> | |
23 | #include "common/errno.h" | |
24 | #include "common/dout.h" | |
11fdf7f2 | 25 | #include "include/ceph_assert.h" |
7c673cae FG |
26 | #include "common/Formatter.h" |
27 | #include "common/Cond.h" | |
28 | ||
29 | #include "common/ceph_context.h" | |
9f95a23c | 30 | #include "include/common_fwd.h" |
7c673cae | 31 | |
31f18b77 | 32 | // reinclude our assert to clobber the system one |
11fdf7f2 | 33 | # include "include/ceph_assert.h" |
31f18b77 | 34 | |
7c673cae FG |
35 | enum { |
36 | l_leveldb_first = 34300, | |
37 | l_leveldb_gets, | |
38 | l_leveldb_txns, | |
39 | l_leveldb_get_latency, | |
40 | l_leveldb_submit_latency, | |
41 | l_leveldb_submit_sync_latency, | |
42 | l_leveldb_compact, | |
43 | l_leveldb_compact_range, | |
44 | l_leveldb_compact_queue_merge, | |
45 | l_leveldb_compact_queue_len, | |
46 | l_leveldb_last, | |
47 | }; | |
48 | ||
49 | extern leveldb::Logger *create_leveldb_ceph_logger(); | |
50 | ||
51 | class CephLevelDBLogger; | |
52 | ||
53 | /** | |
54 | * Uses LevelDB to implement the KeyValueDB interface | |
55 | */ | |
56 | class LevelDBStore : public KeyValueDB { | |
57 | CephContext *cct; | |
58 | PerfCounters *logger; | |
59 | CephLevelDBLogger *ceph_logger; | |
f67539c2 | 60 | std::string path; |
7c673cae FG |
61 | boost::scoped_ptr<leveldb::Cache> db_cache; |
62 | #ifdef HAVE_LEVELDB_FILTER_POLICY | |
63 | boost::scoped_ptr<const leveldb::FilterPolicy> filterpolicy; | |
64 | #endif | |
65 | boost::scoped_ptr<leveldb::DB> db; | |
66 | ||
11fdf7f2 | 67 | int load_leveldb_options(bool create_if_missing, leveldb::Options &opts); |
f67539c2 | 68 | int do_open(std::ostream &out, bool create_if_missing); |
7c673cae FG |
69 | |
70 | // manage async compactions | |
9f95a23c TL |
71 | ceph::mutex compact_queue_lock = |
72 | ceph::make_mutex("LevelDBStore::compact_thread_lock"); | |
73 | ceph::condition_variable compact_queue_cond; | |
f67539c2 | 74 | std::list<std::pair<std::string, std::string>> compact_queue; |
7c673cae FG |
75 | bool compact_queue_stop; |
76 | class CompactThread : public Thread { | |
77 | LevelDBStore *db; | |
78 | public: | |
79 | explicit CompactThread(LevelDBStore *d) : db(d) {} | |
80 | void *entry() override { | |
81 | db->compact_thread_entry(); | |
82 | return NULL; | |
83 | } | |
84 | friend class LevelDBStore; | |
85 | } compact_thread; | |
86 | ||
87 | void compact_thread_entry(); | |
88 | ||
f67539c2 | 89 | void compact_range(const std::string& start, const std::string& end) { |
7c673cae FG |
90 | leveldb::Slice cstart(start); |
91 | leveldb::Slice cend(end); | |
92 | db->CompactRange(&cstart, &cend); | |
93 | } | |
f67539c2 | 94 | void compact_range_async(const std::string& start, const std::string& end); |
7c673cae FG |
95 | |
96 | public: | |
97 | /// compact the underlying leveldb store | |
98 | void compact() override; | |
99 | ||
11fdf7f2 | 100 | void compact_async() override { |
f67539c2 | 101 | compact_range_async({}, {}); |
11fdf7f2 TL |
102 | } |
103 | ||
7c673cae | 104 | /// compact db for all keys with a given prefix |
f67539c2 | 105 | void compact_prefix(const std::string& prefix) override { |
7c673cae FG |
106 | compact_range(prefix, past_prefix(prefix)); |
107 | } | |
f67539c2 | 108 | void compact_prefix_async(const std::string& prefix) override { |
7c673cae FG |
109 | compact_range_async(prefix, past_prefix(prefix)); |
110 | } | |
f67539c2 TL |
111 | void compact_range(const std::string& prefix, |
112 | const std::string& start, const std::string& end) override { | |
7c673cae FG |
113 | compact_range(combine_strings(prefix, start), combine_strings(prefix, end)); |
114 | } | |
f67539c2 TL |
115 | void compact_range_async(const std::string& prefix, |
116 | const std::string& start, const std::string& end) override { | |
7c673cae FG |
117 | compact_range_async(combine_strings(prefix, start), |
118 | combine_strings(prefix, end)); | |
119 | } | |
120 | ||
121 | ||
122 | /** | |
123 | * options_t: Holds options which are minimally interpreted | |
124 | * on initialization and then passed through to LevelDB. | |
125 | * We transform a couple of these into actual LevelDB | |
126 | * structures, but the rest are simply passed through unchanged. See | |
127 | * leveldb/options.h for more precise details on each. | |
128 | * | |
129 | * Set them after constructing the LevelDBStore, but before calling | |
130 | * open() or create_and_open(). | |
131 | */ | |
132 | struct options_t { | |
133 | uint64_t write_buffer_size; /// in-memory write buffer size | |
134 | int max_open_files; /// maximum number of files LevelDB can open at once | |
135 | uint64_t cache_size; /// size of extra decompressed cache to use | |
136 | uint64_t block_size; /// user data per block | |
137 | int bloom_size; /// number of bits per entry to put in a bloom filter | |
138 | bool compression_enabled; /// whether to use libsnappy compression or not | |
139 | ||
140 | // don't change these ones. No, seriously | |
141 | int block_restart_interval; | |
142 | bool error_if_exists; | |
143 | bool paranoid_checks; | |
144 | ||
f67539c2 | 145 | std::string log_file; |
7c673cae FG |
146 | |
147 | options_t() : | |
148 | write_buffer_size(0), //< 0 means default | |
149 | max_open_files(0), //< 0 means default | |
150 | cache_size(0), //< 0 means no cache (default) | |
151 | block_size(0), //< 0 means default | |
152 | bloom_size(0), //< 0 means no bloom filter (default) | |
153 | compression_enabled(true), //< set to false for no compression | |
154 | block_restart_interval(0), //< 0 means default | |
155 | error_if_exists(false), //< set to true if you want to check nonexistence | |
156 | paranoid_checks(false) //< set to true if you want paranoid checks | |
157 | {} | |
158 | } options; | |
159 | ||
f67539c2 | 160 | LevelDBStore(CephContext *c, const std::string &path) : |
7c673cae FG |
161 | cct(c), |
162 | logger(NULL), | |
163 | ceph_logger(NULL), | |
164 | path(path), | |
165 | db_cache(NULL), | |
166 | #ifdef HAVE_LEVELDB_FILTER_POLICY | |
167 | filterpolicy(NULL), | |
168 | #endif | |
7c673cae FG |
169 | compact_queue_stop(false), |
170 | compact_thread(this), | |
171 | options() | |
172 | {} | |
173 | ||
174 | ~LevelDBStore() override; | |
175 | ||
f67539c2 TL |
176 | static int _test_init(const std::string& dir); |
177 | int init(std::string option_str="") override; | |
7c673cae FG |
178 | |
179 | /// Opens underlying db | |
f67539c2 | 180 | int open(std::ostream &out, const std::string& cfs="") override; |
7c673cae | 181 | /// Creates underlying db if missing and opens it |
f67539c2 | 182 | int create_and_open(std::ostream &out, const std::string& cfs="") override; |
7c673cae FG |
183 | |
184 | void close() override; | |
185 | ||
3efd9988 FG |
186 | PerfCounters *get_perf_counters() override |
187 | { | |
188 | return logger; | |
189 | } | |
11fdf7f2 | 190 | int repair(std::ostream &out) override; |
3efd9988 | 191 | |
7c673cae FG |
192 | class LevelDBTransactionImpl : public KeyValueDB::TransactionImpl { |
193 | public: | |
194 | leveldb::WriteBatch bat; | |
195 | LevelDBStore *db; | |
196 | explicit LevelDBTransactionImpl(LevelDBStore *db) : db(db) {} | |
197 | void set( | |
f67539c2 TL |
198 | const std::string &prefix, |
199 | const std::string &k, | |
200 | const ceph::buffer::list &bl) override; | |
7c673cae FG |
201 | using KeyValueDB::TransactionImpl::set; |
202 | void rmkey( | |
f67539c2 TL |
203 | const std::string &prefix, |
204 | const std::string &k) override; | |
7c673cae | 205 | void rmkeys_by_prefix( |
f67539c2 | 206 | const std::string &prefix |
7c673cae FG |
207 | ) override; |
208 | virtual void rm_range_keys( | |
f67539c2 TL |
209 | const std::string &prefix, |
210 | const std::string &start, | |
211 | const std::string &end) override; | |
7c673cae FG |
212 | |
213 | using KeyValueDB::TransactionImpl::rmkey; | |
214 | }; | |
215 | ||
216 | KeyValueDB::Transaction get_transaction() override { | |
217 | return std::make_shared<LevelDBTransactionImpl>(this); | |
218 | } | |
219 | ||
220 | int submit_transaction(KeyValueDB::Transaction t) override; | |
221 | int submit_transaction_sync(KeyValueDB::Transaction t) override; | |
222 | int get( | |
f67539c2 TL |
223 | const std::string &prefix, |
224 | const std::set<std::string> &key, | |
225 | std::map<std::string, ceph::buffer::list> *out | |
7c673cae FG |
226 | ) override; |
227 | ||
f67539c2 TL |
228 | int get(const std::string &prefix, |
229 | const std::string &key, | |
230 | ceph::buffer::list *value) override; | |
7c673cae FG |
231 | |
232 | using KeyValueDB::get; | |
233 | ||
234 | class LevelDBWholeSpaceIteratorImpl : | |
235 | public KeyValueDB::WholeSpaceIteratorImpl { | |
236 | protected: | |
237 | boost::scoped_ptr<leveldb::Iterator> dbiter; | |
238 | public: | |
239 | explicit LevelDBWholeSpaceIteratorImpl(leveldb::Iterator *iter) : | |
240 | dbiter(iter) { } | |
241 | ~LevelDBWholeSpaceIteratorImpl() override { } | |
242 | ||
243 | int seek_to_first() override { | |
244 | dbiter->SeekToFirst(); | |
245 | return dbiter->status().ok() ? 0 : -1; | |
246 | } | |
f67539c2 | 247 | int seek_to_first(const std::string &prefix) override { |
7c673cae FG |
248 | leveldb::Slice slice_prefix(prefix); |
249 | dbiter->Seek(slice_prefix); | |
250 | return dbiter->status().ok() ? 0 : -1; | |
251 | } | |
252 | int seek_to_last() override { | |
253 | dbiter->SeekToLast(); | |
254 | return dbiter->status().ok() ? 0 : -1; | |
255 | } | |
f67539c2 TL |
256 | int seek_to_last(const std::string &prefix) override { |
257 | std::string limit = past_prefix(prefix); | |
7c673cae FG |
258 | leveldb::Slice slice_limit(limit); |
259 | dbiter->Seek(slice_limit); | |
260 | ||
261 | if (!dbiter->Valid()) { | |
262 | dbiter->SeekToLast(); | |
263 | } else { | |
264 | dbiter->Prev(); | |
265 | } | |
266 | return dbiter->status().ok() ? 0 : -1; | |
267 | } | |
f67539c2 | 268 | int upper_bound(const std::string &prefix, const std::string &after) override { |
7c673cae FG |
269 | lower_bound(prefix, after); |
270 | if (valid()) { | |
f67539c2 | 271 | std::pair<std::string,std::string> key = raw_key(); |
7c673cae FG |
272 | if (key.first == prefix && key.second == after) |
273 | next(); | |
274 | } | |
275 | return dbiter->status().ok() ? 0 : -1; | |
276 | } | |
f67539c2 TL |
277 | int lower_bound(const std::string &prefix, const std::string &to) override { |
278 | std::string bound = combine_strings(prefix, to); | |
7c673cae FG |
279 | leveldb::Slice slice_bound(bound); |
280 | dbiter->Seek(slice_bound); | |
281 | return dbiter->status().ok() ? 0 : -1; | |
282 | } | |
283 | bool valid() override { | |
284 | return dbiter->Valid(); | |
285 | } | |
286 | int next() override { | |
287 | if (valid()) | |
288 | dbiter->Next(); | |
289 | return dbiter->status().ok() ? 0 : -1; | |
290 | } | |
291 | int prev() override { | |
292 | if (valid()) | |
293 | dbiter->Prev(); | |
294 | return dbiter->status().ok() ? 0 : -1; | |
295 | } | |
f67539c2 TL |
296 | std::string key() override { |
297 | std::string out_key; | |
7c673cae FG |
298 | split_key(dbiter->key(), 0, &out_key); |
299 | return out_key; | |
300 | } | |
f67539c2 TL |
301 | std::pair<std::string,std::string> raw_key() override { |
302 | std::string prefix, key; | |
7c673cae | 303 | split_key(dbiter->key(), &prefix, &key); |
f67539c2 | 304 | return std::make_pair(prefix, key); |
7c673cae | 305 | } |
f67539c2 | 306 | bool raw_key_is_prefixed(const std::string &prefix) override { |
7c673cae FG |
307 | leveldb::Slice key = dbiter->key(); |
308 | if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) { | |
309 | return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0; | |
310 | } else { | |
311 | return false; | |
312 | } | |
313 | } | |
f67539c2 | 314 | ceph::buffer::list value() override { |
7c673cae FG |
315 | return to_bufferlist(dbiter->value()); |
316 | } | |
317 | ||
f67539c2 | 318 | ceph::bufferptr value_as_ptr() override { |
7c673cae | 319 | leveldb::Slice data = dbiter->value(); |
f67539c2 | 320 | return ceph::bufferptr(data.data(), data.size()); |
7c673cae FG |
321 | } |
322 | ||
323 | int status() override { | |
324 | return dbiter->status().ok() ? 0 : -1; | |
325 | } | |
326 | }; | |
327 | ||
328 | /// Utility | |
f67539c2 TL |
329 | static std::string combine_strings(const std::string &prefix, const std::string &value); |
330 | static int split_key(leveldb::Slice in, std::string *prefix, std::string *key); | |
331 | static ceph::buffer::list to_bufferlist(leveldb::Slice in); | |
332 | static std::string past_prefix(const std::string &prefix) { | |
333 | std::string limit = prefix; | |
7c673cae FG |
334 | limit.push_back(1); |
335 | return limit; | |
336 | } | |
337 | ||
f67539c2 | 338 | uint64_t get_estimated_size(std::map<std::string,std::uint64_t> &extra) override { |
7c673cae FG |
339 | DIR *store_dir = opendir(path.c_str()); |
340 | if (!store_dir) { | |
341 | lderr(cct) << __func__ << " something happened opening the store: " | |
342 | << cpp_strerror(errno) << dendl; | |
343 | return 0; | |
344 | } | |
345 | ||
346 | uint64_t total_size = 0; | |
347 | uint64_t sst_size = 0; | |
348 | uint64_t log_size = 0; | |
349 | uint64_t misc_size = 0; | |
350 | ||
351 | struct dirent *entry = NULL; | |
352 | while ((entry = readdir(store_dir)) != NULL) { | |
f67539c2 | 353 | std::string n(entry->d_name); |
7c673cae FG |
354 | |
355 | if (n == "." || n == "..") | |
356 | continue; | |
357 | ||
f67539c2 | 358 | std::string fpath = path + '/' + n; |
7c673cae FG |
359 | struct stat s; |
360 | int err = stat(fpath.c_str(), &s); | |
361 | if (err < 0) | |
362 | err = -errno; | |
363 | // we may race against leveldb while reading files; this should only | |
364 | // happen when those files are being updated, data is being shuffled | |
365 | // and files get removed, in which case there's not much of a problem | |
366 | // as we'll get to them next time around. | |
367 | if (err == -ENOENT) { | |
368 | continue; | |
369 | } | |
370 | if (err < 0) { | |
371 | lderr(cct) << __func__ << " error obtaining stats for " << fpath | |
372 | << ": " << cpp_strerror(err) << dendl; | |
373 | goto err; | |
374 | } | |
375 | ||
376 | size_t pos = n.find_last_of('.'); | |
f67539c2 | 377 | if (pos == std::string::npos) { |
7c673cae FG |
378 | misc_size += s.st_size; |
379 | continue; | |
380 | } | |
381 | ||
f67539c2 | 382 | std::string ext = n.substr(pos+1); |
7c673cae FG |
383 | if (ext == "sst") { |
384 | sst_size += s.st_size; | |
385 | } else if (ext == "log") { | |
386 | log_size += s.st_size; | |
387 | } else { | |
388 | misc_size += s.st_size; | |
389 | } | |
390 | } | |
391 | ||
392 | total_size = sst_size + log_size + misc_size; | |
393 | ||
394 | extra["sst"] = sst_size; | |
395 | extra["log"] = log_size; | |
396 | extra["misc"] = misc_size; | |
397 | extra["total"] = total_size; | |
398 | ||
399 | err: | |
400 | closedir(store_dir); | |
401 | return total_size; | |
402 | } | |
403 | ||
404 | ||
f67539c2 | 405 | WholeSpaceIterator get_wholespace_iterator(IteratorOpts opts = 0) override { |
7c673cae FG |
406 | return std::make_shared<LevelDBWholeSpaceIteratorImpl>( |
407 | db->NewIterator(leveldb::ReadOptions())); | |
408 | } | |
409 | ||
410 | }; | |
411 | ||
412 | #endif |