]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | #include "LevelDBStore.h" | |
4 | ||
5 | #include <set> | |
6 | #include <map> | |
7 | #include <string> | |
31f18b77 FG |
8 | #include <cerrno> |
9 | ||
7c673cae | 10 | using std::string; |
31f18b77 | 11 | |
7c673cae FG |
12 | #include "common/debug.h" |
13 | #include "common/perf_counters.h" | |
14 | ||
31f18b77 | 15 | // re-include our assert to clobber the system one; fix dout: |
11fdf7f2 | 16 | #include "include/ceph_assert.h" |
31f18b77 | 17 | |
7c673cae FG |
18 | #define dout_context cct |
19 | #define dout_subsys ceph_subsys_leveldb | |
20 | #undef dout_prefix | |
21 | #define dout_prefix *_dout << "leveldb: " | |
22 | ||
23 | class CephLevelDBLogger : public leveldb::Logger { | |
24 | CephContext *cct; | |
25 | public: | |
26 | explicit CephLevelDBLogger(CephContext *c) : cct(c) { | |
27 | cct->get(); | |
28 | } | |
29 | ~CephLevelDBLogger() override { | |
30 | cct->put(); | |
31 | } | |
32 | ||
33 | // Write an entry to the log file with the specified format. | |
34 | void Logv(const char* format, va_list ap) override { | |
35 | dout(1); | |
36 | char buf[65536]; | |
37 | vsnprintf(buf, sizeof(buf), format, ap); | |
38 | *_dout << buf << dendl; | |
39 | } | |
40 | }; | |
41 | ||
42 | leveldb::Logger *create_leveldb_ceph_logger() | |
43 | { | |
44 | return new CephLevelDBLogger(g_ceph_context); | |
45 | } | |
46 | ||
47 | int LevelDBStore::init(string option_str) | |
48 | { | |
49 | // init defaults. caller can override these if they want | |
50 | // prior to calling open. | |
11fdf7f2 TL |
51 | options.write_buffer_size = g_conf()->leveldb_write_buffer_size; |
52 | options.cache_size = g_conf()->leveldb_cache_size; | |
53 | options.block_size = g_conf()->leveldb_block_size; | |
54 | options.bloom_size = g_conf()->leveldb_bloom_size; | |
55 | options.compression_enabled = g_conf()->leveldb_compression; | |
56 | options.paranoid_checks = g_conf()->leveldb_paranoid; | |
57 | options.max_open_files = g_conf()->leveldb_max_open_files; | |
58 | options.log_file = g_conf()->leveldb_log; | |
7c673cae FG |
59 | return 0; |
60 | } | |
61 | ||
11fdf7f2 TL |
62 | int LevelDBStore::open(ostream &out, const vector<ColumnFamily>& cfs) { |
63 | if (!cfs.empty()) { | |
64 | ceph_abort_msg("Not implemented"); | |
65 | } | |
66 | return do_open(out, false); | |
67 | } | |
7c673cae | 68 | |
11fdf7f2 TL |
69 | int LevelDBStore::create_and_open(ostream &out, const vector<ColumnFamily>& cfs) { |
70 | if (!cfs.empty()) { | |
71 | ceph_abort_msg("Not implemented"); | |
72 | } | |
73 | return do_open(out, true); | |
74 | } | |
75 | ||
76 | int LevelDBStore::load_leveldb_options(bool create_if_missing, leveldb::Options &ldoptions) | |
77 | { | |
7c673cae FG |
78 | if (options.write_buffer_size) |
79 | ldoptions.write_buffer_size = options.write_buffer_size; | |
80 | if (options.max_open_files) | |
81 | ldoptions.max_open_files = options.max_open_files; | |
82 | if (options.cache_size) { | |
83 | leveldb::Cache *_db_cache = leveldb::NewLRUCache(options.cache_size); | |
84 | db_cache.reset(_db_cache); | |
85 | ldoptions.block_cache = db_cache.get(); | |
86 | } | |
87 | if (options.block_size) | |
88 | ldoptions.block_size = options.block_size; | |
89 | if (options.bloom_size) { | |
90 | #ifdef HAVE_LEVELDB_FILTER_POLICY | |
91 | const leveldb::FilterPolicy *_filterpolicy = | |
92 | leveldb::NewBloomFilterPolicy(options.bloom_size); | |
93 | filterpolicy.reset(_filterpolicy); | |
94 | ldoptions.filter_policy = filterpolicy.get(); | |
95 | #else | |
11fdf7f2 | 96 | ceph_abort_msg(0 == "bloom size set but installed leveldb doesn't support bloom filters"); |
7c673cae FG |
97 | #endif |
98 | } | |
99 | if (options.compression_enabled) | |
100 | ldoptions.compression = leveldb::kSnappyCompression; | |
101 | else | |
102 | ldoptions.compression = leveldb::kNoCompression; | |
103 | if (options.block_restart_interval) | |
104 | ldoptions.block_restart_interval = options.block_restart_interval; | |
105 | ||
106 | ldoptions.error_if_exists = options.error_if_exists; | |
107 | ldoptions.paranoid_checks = options.paranoid_checks; | |
108 | ldoptions.create_if_missing = create_if_missing; | |
109 | ||
11fdf7f2 | 110 | if (g_conf()->leveldb_log_to_ceph_log) { |
7c673cae FG |
111 | ceph_logger = new CephLevelDBLogger(g_ceph_context); |
112 | ldoptions.info_log = ceph_logger; | |
113 | } | |
114 | ||
115 | if (options.log_file.length()) { | |
116 | leveldb::Env *env = leveldb::Env::Default(); | |
117 | env->NewLogger(options.log_file, &ldoptions.info_log); | |
118 | } | |
11fdf7f2 TL |
119 | return 0; |
120 | } | |
121 | ||
122 | int LevelDBStore::do_open(ostream &out, bool create_if_missing) | |
123 | { | |
124 | leveldb::Options ldoptions; | |
125 | int r = load_leveldb_options(create_if_missing, ldoptions); | |
126 | if (r) { | |
127 | dout(1) << "load leveldb options failed" << dendl; | |
128 | return r; | |
129 | } | |
7c673cae FG |
130 | |
131 | leveldb::DB *_db; | |
132 | leveldb::Status status = leveldb::DB::Open(ldoptions, path, &_db); | |
133 | db.reset(_db); | |
134 | if (!status.ok()) { | |
135 | out << status.ToString() << std::endl; | |
136 | return -EINVAL; | |
137 | } | |
138 | ||
139 | PerfCountersBuilder plb(g_ceph_context, "leveldb", l_leveldb_first, l_leveldb_last); | |
140 | plb.add_u64_counter(l_leveldb_gets, "leveldb_get", "Gets"); | |
141 | plb.add_u64_counter(l_leveldb_txns, "leveldb_transaction", "Transactions"); | |
142 | plb.add_time_avg(l_leveldb_get_latency, "leveldb_get_latency", "Get Latency"); | |
143 | plb.add_time_avg(l_leveldb_submit_latency, "leveldb_submit_latency", "Submit Latency"); | |
144 | plb.add_time_avg(l_leveldb_submit_sync_latency, "leveldb_submit_sync_latency", "Submit Sync Latency"); | |
145 | plb.add_u64_counter(l_leveldb_compact, "leveldb_compact", "Compactions"); | |
146 | plb.add_u64_counter(l_leveldb_compact_range, "leveldb_compact_range", "Compactions by range"); | |
147 | plb.add_u64_counter(l_leveldb_compact_queue_merge, "leveldb_compact_queue_merge", "Mergings of ranges in compaction queue"); | |
148 | plb.add_u64(l_leveldb_compact_queue_len, "leveldb_compact_queue_len", "Length of compaction queue"); | |
149 | logger = plb.create_perf_counters(); | |
150 | cct->get_perfcounters_collection()->add(logger); | |
151 | ||
11fdf7f2 | 152 | if (g_conf()->leveldb_compact_on_mount) { |
7c673cae FG |
153 | derr << "Compacting leveldb store..." << dendl; |
154 | compact(); | |
155 | derr << "Finished compacting leveldb store" << dendl; | |
156 | } | |
157 | return 0; | |
158 | } | |
159 | ||
160 | int LevelDBStore::_test_init(const string& dir) | |
161 | { | |
162 | leveldb::Options options; | |
163 | options.create_if_missing = true; | |
164 | leveldb::DB *db; | |
165 | leveldb::Status status = leveldb::DB::Open(options, dir, &db); | |
166 | delete db; | |
167 | return status.ok() ? 0 : -EIO; | |
168 | } | |
169 | ||
170 | LevelDBStore::~LevelDBStore() | |
171 | { | |
172 | close(); | |
173 | delete logger; | |
7c673cae FG |
174 | |
175 | // Ensure db is destroyed before dependent db_cache and filterpolicy | |
176 | db.reset(); | |
c07f9fc5 | 177 | delete ceph_logger; |
7c673cae FG |
178 | } |
179 | ||
180 | void LevelDBStore::close() | |
181 | { | |
182 | // stop compaction thread | |
9f95a23c | 183 | compact_queue_lock.lock(); |
7c673cae FG |
184 | if (compact_thread.is_started()) { |
185 | compact_queue_stop = true; | |
9f95a23c TL |
186 | compact_queue_cond.notify_all(); |
187 | compact_queue_lock.unlock(); | |
7c673cae FG |
188 | compact_thread.join(); |
189 | } else { | |
9f95a23c | 190 | compact_queue_lock.unlock(); |
7c673cae FG |
191 | } |
192 | ||
193 | if (logger) | |
194 | cct->get_perfcounters_collection()->remove(logger); | |
195 | } | |
196 | ||
11fdf7f2 TL |
197 | int LevelDBStore::repair(std::ostream &out) |
198 | { | |
199 | leveldb::Options ldoptions; | |
200 | int r = load_leveldb_options(false, ldoptions); | |
201 | if (r) { | |
202 | dout(1) << "load leveldb options failed" << dendl; | |
203 | out << "load leveldb options failed" << std::endl; | |
204 | return r; | |
205 | } | |
206 | leveldb::Status status = leveldb::RepairDB(path, ldoptions); | |
207 | if (status.ok()) { | |
208 | return 0; | |
209 | } else { | |
210 | out << "repair leveldb failed : " << status.ToString() << std::endl; | |
211 | return 1; | |
212 | } | |
213 | } | |
214 | ||
7c673cae FG |
215 | int LevelDBStore::submit_transaction(KeyValueDB::Transaction t) |
216 | { | |
217 | utime_t start = ceph_clock_now(); | |
218 | LevelDBTransactionImpl * _t = | |
219 | static_cast<LevelDBTransactionImpl *>(t.get()); | |
220 | leveldb::Status s = db->Write(leveldb::WriteOptions(), &(_t->bat)); | |
221 | utime_t lat = ceph_clock_now() - start; | |
222 | logger->inc(l_leveldb_txns); | |
223 | logger->tinc(l_leveldb_submit_latency, lat); | |
224 | return s.ok() ? 0 : -1; | |
225 | } | |
226 | ||
227 | int LevelDBStore::submit_transaction_sync(KeyValueDB::Transaction t) | |
228 | { | |
229 | utime_t start = ceph_clock_now(); | |
230 | LevelDBTransactionImpl * _t = | |
231 | static_cast<LevelDBTransactionImpl *>(t.get()); | |
232 | leveldb::WriteOptions options; | |
233 | options.sync = true; | |
234 | leveldb::Status s = db->Write(options, &(_t->bat)); | |
235 | utime_t lat = ceph_clock_now() - start; | |
236 | logger->inc(l_leveldb_txns); | |
237 | logger->tinc(l_leveldb_submit_sync_latency, lat); | |
238 | return s.ok() ? 0 : -1; | |
239 | } | |
240 | ||
241 | void LevelDBStore::LevelDBTransactionImpl::set( | |
242 | const string &prefix, | |
243 | const string &k, | |
244 | const bufferlist &to_set_bl) | |
245 | { | |
246 | string key = combine_strings(prefix, k); | |
247 | size_t bllen = to_set_bl.length(); | |
248 | // bufferlist::c_str() is non-constant, so we can't call c_str() | |
249 | if (to_set_bl.is_contiguous() && bllen > 0) { | |
250 | // bufferlist contains just one ptr or they're contiguous | |
251 | bat.Put(leveldb::Slice(key), leveldb::Slice(to_set_bl.buffers().front().c_str(), bllen)); | |
252 | } else if ((bllen <= 32 * 1024) && (bllen > 0)) { | |
253 | // 2+ bufferptrs that are not contiguopus | |
254 | // allocate buffer on stack and copy bl contents to that buffer | |
255 | // make sure the buffer isn't too large or we might crash here... | |
256 | char* slicebuf = (char*) alloca(bllen); | |
257 | leveldb::Slice newslice(slicebuf, bllen); | |
11fdf7f2 TL |
258 | for (const auto& node : to_set_bl.buffers()) { |
259 | const size_t ptrlen = node.length(); | |
260 | memcpy(static_cast<void*>(slicebuf), node.c_str(), ptrlen); | |
7c673cae FG |
261 | slicebuf += ptrlen; |
262 | } | |
263 | bat.Put(leveldb::Slice(key), newslice); | |
264 | } else { | |
265 | // 2+ bufferptrs that are not contiguous, and enormous in size | |
266 | bufferlist val = to_set_bl; | |
267 | bat.Put(leveldb::Slice(key), leveldb::Slice(val.c_str(), val.length())); | |
268 | } | |
269 | } | |
270 | ||
271 | void LevelDBStore::LevelDBTransactionImpl::rmkey(const string &prefix, | |
272 | const string &k) | |
273 | { | |
274 | string key = combine_strings(prefix, k); | |
275 | bat.Delete(leveldb::Slice(key)); | |
276 | } | |
277 | ||
278 | void LevelDBStore::LevelDBTransactionImpl::rmkeys_by_prefix(const string &prefix) | |
279 | { | |
280 | KeyValueDB::Iterator it = db->get_iterator(prefix); | |
281 | for (it->seek_to_first(); | |
282 | it->valid(); | |
283 | it->next()) { | |
284 | bat.Delete(leveldb::Slice(combine_strings(prefix, it->key()))); | |
285 | } | |
286 | } | |
287 | ||
288 | void LevelDBStore::LevelDBTransactionImpl::rm_range_keys(const string &prefix, const string &start, const string &end) | |
289 | { | |
290 | KeyValueDB::Iterator it = db->get_iterator(prefix); | |
291 | it->lower_bound(start); | |
292 | while (it->valid()) { | |
293 | if (it->key() >= end) { | |
294 | break; | |
295 | } | |
296 | bat.Delete(combine_strings(prefix, it->key())); | |
297 | it->next(); | |
298 | } | |
299 | } | |
300 | ||
301 | int LevelDBStore::get( | |
302 | const string &prefix, | |
303 | const std::set<string> &keys, | |
304 | std::map<string, bufferlist> *out) | |
305 | { | |
306 | utime_t start = ceph_clock_now(); | |
307 | for (std::set<string>::const_iterator i = keys.begin(); | |
308 | i != keys.end(); ++i) { | |
309 | std::string value; | |
310 | std::string bound = combine_strings(prefix, *i); | |
311 | auto status = db->Get(leveldb::ReadOptions(), leveldb::Slice(bound), &value); | |
312 | if (status.ok()) | |
313 | (*out)[*i].append(value); | |
314 | } | |
315 | utime_t lat = ceph_clock_now() - start; | |
316 | logger->inc(l_leveldb_gets); | |
317 | logger->tinc(l_leveldb_get_latency, lat); | |
318 | return 0; | |
319 | } | |
320 | ||
321 | int LevelDBStore::get(const string &prefix, | |
322 | const string &key, | |
323 | bufferlist *out) | |
324 | { | |
11fdf7f2 | 325 | ceph_assert(out && (out->length() == 0)); |
7c673cae FG |
326 | utime_t start = ceph_clock_now(); |
327 | int r = 0; | |
328 | string value, k; | |
329 | leveldb::Status s; | |
330 | k = combine_strings(prefix, key); | |
331 | s = db->Get(leveldb::ReadOptions(), leveldb::Slice(k), &value); | |
332 | if (s.ok()) { | |
333 | out->append(value); | |
334 | } else { | |
335 | r = -ENOENT; | |
336 | } | |
337 | utime_t lat = ceph_clock_now() - start; | |
338 | logger->inc(l_leveldb_gets); | |
339 | logger->tinc(l_leveldb_get_latency, lat); | |
340 | return r; | |
341 | } | |
342 | ||
343 | string LevelDBStore::combine_strings(const string &prefix, const string &value) | |
344 | { | |
345 | string out = prefix; | |
346 | out.push_back(0); | |
347 | out.append(value); | |
348 | return out; | |
349 | } | |
350 | ||
351 | bufferlist LevelDBStore::to_bufferlist(leveldb::Slice in) | |
352 | { | |
353 | bufferlist bl; | |
354 | bl.append(bufferptr(in.data(), in.size())); | |
355 | return bl; | |
356 | } | |
357 | ||
358 | int LevelDBStore::split_key(leveldb::Slice in, string *prefix, string *key) | |
359 | { | |
360 | size_t prefix_len = 0; | |
361 | ||
362 | // Find separator inside Slice | |
363 | char* separator = (char*) memchr(in.data(), 0, in.size()); | |
364 | if (separator == NULL) | |
365 | return -EINVAL; | |
366 | prefix_len = size_t(separator - in.data()); | |
367 | if (prefix_len >= in.size()) | |
368 | return -EINVAL; | |
369 | ||
370 | if (prefix) | |
371 | *prefix = string(in.data(), prefix_len); | |
372 | if (key) | |
373 | *key = string(separator+1, in.size() - prefix_len - 1); | |
374 | return 0; | |
375 | } | |
376 | ||
377 | void LevelDBStore::compact() | |
378 | { | |
379 | logger->inc(l_leveldb_compact); | |
380 | db->CompactRange(NULL, NULL); | |
381 | } | |
382 | ||
383 | ||
384 | void LevelDBStore::compact_thread_entry() | |
385 | { | |
9f95a23c | 386 | std::unique_lock l{compact_queue_lock}; |
7c673cae FG |
387 | while (!compact_queue_stop) { |
388 | while (!compact_queue.empty()) { | |
389 | pair<string,string> range = compact_queue.front(); | |
390 | compact_queue.pop_front(); | |
391 | logger->set(l_leveldb_compact_queue_len, compact_queue.size()); | |
9f95a23c | 392 | l.unlock(); |
7c673cae | 393 | logger->inc(l_leveldb_compact_range); |
11fdf7f2 TL |
394 | if (range.first.empty() && range.second.empty()) { |
395 | compact(); | |
396 | } else { | |
397 | compact_range(range.first, range.second); | |
398 | } | |
9f95a23c | 399 | l.lock(); |
7c673cae FG |
400 | continue; |
401 | } | |
11fdf7f2 TL |
402 | if (compact_queue_stop) |
403 | break; | |
9f95a23c | 404 | compact_queue_cond.wait(l); |
7c673cae | 405 | } |
7c673cae FG |
406 | } |
407 | ||
408 | void LevelDBStore::compact_range_async(const string& start, const string& end) | |
409 | { | |
11fdf7f2 | 410 | std::lock_guard l(compact_queue_lock); |
7c673cae FG |
411 | |
412 | // try to merge adjacent ranges. this is O(n), but the queue should | |
413 | // be short. note that we do not cover all overlap cases and merge | |
414 | // opportunities here, but we capture the ones we currently need. | |
415 | list< pair<string,string> >::iterator p = compact_queue.begin(); | |
416 | while (p != compact_queue.end()) { | |
417 | if (p->first == start && p->second == end) { | |
418 | // dup; no-op | |
419 | return; | |
420 | } | |
421 | if (p->first <= end && p->first > start) { | |
422 | // merge with existing range to the right | |
423 | compact_queue.push_back(make_pair(start, p->second)); | |
424 | compact_queue.erase(p); | |
425 | logger->inc(l_leveldb_compact_queue_merge); | |
426 | break; | |
427 | } | |
428 | if (p->second >= start && p->second < end) { | |
429 | // merge with existing range to the left | |
430 | compact_queue.push_back(make_pair(p->first, end)); | |
431 | compact_queue.erase(p); | |
432 | logger->inc(l_leveldb_compact_queue_merge); | |
433 | break; | |
434 | } | |
435 | ++p; | |
436 | } | |
437 | if (p == compact_queue.end()) { | |
438 | // no merge, new entry. | |
439 | compact_queue.push_back(make_pair(start, end)); | |
440 | logger->set(l_leveldb_compact_queue_len, compact_queue.size()); | |
441 | } | |
9f95a23c | 442 | compact_queue_cond.notify_all(); |
7c673cae FG |
443 | if (!compact_thread.is_started()) { |
444 | compact_thread.create("levdbst_compact"); | |
445 | } | |
446 | } |